diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index 6c071200923..8eb967ca87f 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -45,3 +45,13 @@ clean: clean-symlinks clean-symlinks: rm -f link-canary.c + +# For postgres_fdw test +export PG_PORT=5432 +installcheck: install prep_postgres +clean: clean_postgres +prep_postgres: + ./postgres_setup.bash +clean_postgres: + ./postgres_clean.bash +.PHONY: prep_postgres clean_postgres diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 19d8e9cc831..51e3c5cfb42 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -3449,14 +3449,31 @@ appendLimitClause(deparse_expr_cxt *context) * This may reduce the number of tuples that we need to fetch from remote servers. */ Node *precount = copyObject(root->parse->limitCount); + Node *offset = copyObject(root->parse->limitOffset); + bool offset_needed = ((precount != NULL) && (offset != NULL)); /* * If we've specified both OFFSET and LIMIT clause, * it's enough to fetch tuples from 0 to limitCount + limitOffset from remote servers. + * optimize (N > 0): + * LIMIT 0 OFFSET N to LIMIT 0 + * LIMIT 0 OFFSET NULL to LIMIT 0 + * LIMIT N OFFSET NULL to LIMIT N + * LIMIT N OFFSET 0 to LIMIT N */ + if (offset_needed && + IsA(precount, Const) && + (((Const *) precount)->constisnull || ((Const *) precount)->constvalue == 0)) + offset_needed = false; + + if (offset_needed && + IsA(offset, Const) && + (((Const *) offset)->constisnull || ((Const *) offset)->constvalue == 0)) + offset_needed = false; + if (precount) { - if (root->parse->limitOffset) + if (offset_needed) { ParseState *pstate = make_parsestate(NULL); /* @@ -3465,7 +3482,7 @@ appendLimitClause(deparse_expr_cxt *context) */ precount = (Node *) make_op(pstate, list_make2(makeString("pg_catalog"), makeString(pstrdup("+"))), - copyObject(root->parse->limitOffset), + offset, precount, NULL, -1); diff --git a/contrib/postgres_fdw/expected/mpp_gp2pg_postgres_fdw.out b/contrib/postgres_fdw/expected/mpp_gp2pg_postgres_fdw.out index 61496856b2d..097ea641e01 100644 --- a/contrib/postgres_fdw/expected/mpp_gp2pg_postgres_fdw.out +++ b/contrib/postgres_fdw/expected/mpp_gp2pg_postgres_fdw.out @@ -981,6 +981,105 @@ SELECT c1, c2 FROM mpp_ft2 order by c1 offset 998; 1000 | 0 (2 rows) +-- test LIMIT 0, OFFSET null/0 +ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'true'); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit null offset 998; + QUERY PLAN +------------------------------------------------------------------------------------------------------------ + Limit + Output: c1, c2 + -> Gather Motion 2:1 (slice1; segments: 2) + Output: c1, c2 + Merge Key: c1 + -> Foreign Scan on public.mpp_ft2 + Output: c1, c2 + Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT NULL::bigint + Optimizer: Postgres-based planner + Settings: gp_enable_minmax_optimization = 'off' +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit all offset 998; + QUERY PLAN +------------------------------------------------------------------------------------------------------------ + Limit + Output: c1, c2 + -> Gather Motion 2:1 (slice1; segments: 2) + Output: c1, c2 + Merge Key: c1 + -> Foreign Scan on public.mpp_ft2 + Output: c1, c2 + Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT NULL::bigint + Optimizer: Postgres-based planner + Settings: gp_enable_minmax_optimization = 'off' +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset 998; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------- + Limit + Output: c1, c2 + -> Gather Motion 2:1 (slice1; segments: 2) + Output: c1, c2 + Merge Key: c1 + -> Foreign Scan on public.mpp_ft2 + Output: c1, c2 + Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 0::bigint + Optimizer: Postgres-based planner + Settings: gp_enable_minmax_optimization = 'off' +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset null; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------- + Limit + Output: c1, c2 + -> Gather Motion 2:1 (slice1; segments: 2) + Output: c1, c2 + Merge Key: c1 + -> Foreign Scan on public.mpp_ft2 + Output: c1, c2 + Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 0::bigint + Optimizer: Postgres-based planner + Settings: gp_enable_minmax_optimization = 'off' +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset null; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------- + Limit + Output: c1, c2 + -> Gather Motion 2:1 (slice1; segments: 2) + Output: c1, c2 + Merge Key: c1 + -> Foreign Scan on public.mpp_ft2 + Output: c1, c2 + Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 3::bigint + Optimizer: Postgres-based planner + Settings: gp_enable_minmax_optimization = 'off' +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset 0; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------- + Limit + Output: c1, c2 + -> Gather Motion 2:1 (slice1; segments: 2) + Output: c1, c2 + Merge Key: c1 + -> Foreign Scan on public.mpp_ft2 + Output: c1, c2 + Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 3::bigint + Optimizer: Postgres-based planner + Settings: gp_enable_minmax_optimization = 'off' +(10 rows) + +ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'false'); -- Query with aggregates and limit clause together is NOT pushed down. -- Because it's unsafe to do partial aggregate and limit in multiple remote servers. EXPLAIN (VERBOSE, COSTS OFF) diff --git a/contrib/postgres_fdw/sql/mpp_gp2pg_postgres_fdw.sql b/contrib/postgres_fdw/sql/mpp_gp2pg_postgres_fdw.sql index 2ebb297c7dc..cc9bd2fe2fe 100644 --- a/contrib/postgres_fdw/sql/mpp_gp2pg_postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/mpp_gp2pg_postgres_fdw.sql @@ -233,6 +233,23 @@ SELECT c1, c2 FROM mpp_ft2 order by c1 offset 2 limit 3; EXPLAIN VERBOSE SELECT c1, c2 FROM mpp_ft2 order by c1 offset 998; SELECT c1, c2 FROM mpp_ft2 order by c1 offset 998; + +-- test LIMIT 0, OFFSET null/0 +ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'true'); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit null offset 998; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit all offset 998; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset 998; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset null; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset null; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset 0; +ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'false'); + -- Query with aggregates and limit clause together is NOT pushed down. -- Because it's unsafe to do partial aggregate and limit in multiple remote servers. EXPLAIN (VERBOSE, COSTS OFF) diff --git a/gpMgmt/bin/gppylib/util/ssh_utils.py b/gpMgmt/bin/gppylib/util/ssh_utils.py index bab39ef6ef2..dd18b982f43 100644 --- a/gpMgmt/bin/gppylib/util/ssh_utils.py +++ b/gpMgmt/bin/gppylib/util/ssh_utils.py @@ -192,35 +192,50 @@ def login(self, hostList=None, userName=None, delaybeforesend=0.05, sync_multipl good_list = [] print_lock = threading.Lock() - def connect_host(hostname, p): - self.hostList.append(hostname) - try: - # The sync_multiplier value is passed onto pexpect.pxssh which is used to determine timeout - # values for prompt verification after an ssh connection is established. - p.login(hostname, self.userName, sync_multiplier=sync_multiplier) - p.x_peer = hostname - p.x_pid = p.pid - good_list.append(p) - if self.verbose: + def connect_host(hostname): + retry_login = True + + while True: + # create a new PxsshWrapper object for each retry to avoid using the + # same object which can cause unexpected behaviours + p = gppxssh_wrapper.PxsshWrapper(delaybeforesend=delaybeforesend, + sync_retries=sync_retries, + options={"StrictHostKeyChecking": "no", + "BatchMode": "yes"}) + + try: + # The sync_multiplier value is passed onto pexpect.pxssh which is used to determine timeout + # values for prompt verification after an ssh connection is established. + p.login(hostname, self.userName, sync_multiplier=sync_multiplier) + p.x_peer = hostname + p.x_pid = p.pid + good_list.append(p) + if self.verbose: + with print_lock: + print('[INFO] login %s' % hostname) + except Exception as e: + # some logins fail due to the clearing of the TERM env variable + # retry by restoring the TERM variable to see if it succeeds or else error out + if origTERM and retry_login: + retry_login = False + os.putenv('TERM', origTERM) + continue + with print_lock: - print('[INFO] login %s' % hostname) - except Exception as e: - with print_lock: - print('[ERROR] unable to login to %s' % hostname) - if type(e) is pxssh.ExceptionPxssh: - print(e) - elif type(e) is pxssh.EOF: - print('Could not acquire connection.') - else: - print('hint: use gpssh-exkeys to setup public-key authentication between hosts') + print('[ERROR] unable to login to %s' % hostname) + if type(e) is pxssh.ExceptionPxssh: + print(e) + elif type(e) is pxssh.EOF: + print('Could not acquire connection.') + print(e) + else: + print('hint: use gpssh-exkeys to setup public-key authentication between hosts') + + break thread_list = [] for host in hostList: - p = gppxssh_wrapper.PxsshWrapper(delaybeforesend=delaybeforesend, - sync_retries=sync_retries, - options={"StrictHostKeyChecking": "no", - "BatchMode": "yes"}) - t = threading.Thread(target=connect_host, args=(host, p)) + t = threading.Thread(target=connect_host, args=[host]) t.start() thread_list.append(t) diff --git a/gpMgmt/bin/gppylib/util/test/unit/test_cluster_ssh_utils.py b/gpMgmt/bin/gppylib/util/test/unit/test_cluster_ssh_utils.py index 0a24eeb5b7c..33f56b8a1fd 100644 --- a/gpMgmt/bin/gppylib/util/test/unit/test_cluster_ssh_utils.py +++ b/gpMgmt/bin/gppylib/util/test/unit/test_cluster_ssh_utils.py @@ -4,7 +4,7 @@ import sys, os, pwd import unittest from io import StringIO -from mock import patch +from mock import patch, call try: gphome = os.environ.get('GPHOME') @@ -91,7 +91,38 @@ def test04_exceptions(self, mock_stdout): session2 = Session() session2.login(['localhost'], 'gpadmin', 0.05, 1.0) self.assertIn('[ERROR] unable to login to localhost\nhint: use gpssh-exkeys to setup public-key authentication between hosts\n', mock_stdout.getvalue()) + + @patch('os.getenv', return_value="term") + @patch('os.putenv') + @patch('sys.stdout', new_callable=StringIO) + def test05_login_retry_when_term_variable_is_set(self, mock_stdout, mock_putenv, mock_getenv): + ''' + Test pxssh.login() retry when there is an exception and TERM env variable is set + ''' + + with mock.patch.object(pxssh.pxssh, 'login', side_effect=pxssh.EOF('foo')) as mock_login: + session = Session() + session.login(['localhost'], 'gpadmin', 0.05, 1.0) + self.assertIn('[ERROR] unable to login to localhost\nCould not acquire connection.\n', mock_stdout.getvalue()) + mock_stdout.truncate(0) + assert mock_putenv.call_count == 3 + mock_putenv.assert_has_calls([call('TERM', ''), call('TERM', 'term'), call('TERM', 'term')]) + + @patch('os.getenv', return_value=None) + @patch('os.putenv') + @patch('sys.stdout', new_callable=StringIO) + def test06_login_does_not_retry_when_term_variable_is_not_set(self, mock_stdout, mock_putenv, mock_getenv): + ''' + Test pxssh.login() does not retry when there is an exception and TERM env variable is not set + ''' + + with mock.patch.object(pxssh.pxssh, 'login', side_effect=pxssh.EOF('foo')) as mock_login: + session = Session() + session.login(['localhost'], 'gpadmin', 0.05, 1.0) + self.assertIn('[ERROR] unable to login to localhost\nCould not acquire connection.\n', mock_stdout.getvalue()) + self.assertNotIn('Retrying by restoring the TERM env variable.\n', mock_stdout.getvalue()) mock_stdout.truncate(0) + mock_putenv.assert_called_once_with('TERM', '') if __name__ == "__main__": unittest.main() diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index dba50f9759d..12355b4749d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1825,17 +1825,23 @@ RecordTransactionCommit(void) /* * RecordDistributedForgetCommitted */ -void +XLogRecPtr RecordDistributedForgetCommitted(DistributedTransactionId gxid) { xl_xact_distributed_forget xlrec; + XLogRecPtr recptr; xlrec.gxid = gxid; XLogBeginInsert(); XLogRegisterData((char *) &xlrec, sizeof(xl_xact_distributed_forget)); - XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET); + /* only flush immediately if we want to wait for remote_apply */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + XLogFlush(recptr); + + return recptr; } /* diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index e32d7060ea0..d271dc694b3 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -25,6 +25,7 @@ #include "libpq/libpq-be.h" #include "libpq/pqformat.h" #include "miscadmin.h" +#include "replication/syncrep.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" #include "storage/s_lock.h" @@ -121,7 +122,7 @@ int gp_gxid_prefetch_num; * FUNCTIONS PROTOTYPES */ static void doPrepareTransaction(void); -static void doInsertForgetCommitted(void); +static XLogRecPtr doInsertForgetCommitted(void); static void doNotifyingOnePhaseCommit(void); static void doNotifyingCommitPrepared(void); static void doNotifyingAbort(void); @@ -515,17 +516,21 @@ doPrepareTransaction(void) /* * Insert FORGET COMMITTED into the xlog. */ -static void +static XLogRecPtr doInsertForgetCommitted(void) { + XLogRecPtr recptr; + elog(DTM_DEBUG5, "doInsertForgetCommitted entering in state = %s", DtxStateToString(MyTmGxactLocal->state)); setCurrentDtxState(DTX_STATE_INSERTING_FORGET_COMMITTED); - RecordDistributedForgetCommitted(getDistributedTransactionId()); + recptr = RecordDistributedForgetCommitted(getDistributedTransactionId()); setCurrentDtxState(DTX_STATE_INSERTED_FORGET_COMMITTED); MyTmGxact->includeInCkpt = false; + + return recptr; } static void @@ -561,6 +566,7 @@ doNotifyingCommitPrepared(void) MemoryContext oldcontext = CurrentMemoryContext;; time_t retry_time_start; bool retry_timedout; + XLogRecPtr recptr; elog(DTM_DEBUG5, "doNotifyingCommitPrepared entering in state = %s", DtxStateToString(MyTmGxactLocal->state)); @@ -665,7 +671,7 @@ doNotifyingCommitPrepared(void) SIMPLE_FAULT_INJECTOR("dtm_before_insert_forget_comitted"); - doInsertForgetCommitted(); + recptr = doInsertForgetCommitted(); /* * We release the TwophaseCommitLock only after writing our distributed @@ -673,6 +679,10 @@ doNotifyingCommitPrepared(void) * their commit prepared records. */ LWLockRelease(TwophaseCommitLock); + + /* wait for sync'ing the FORGET commit to hot standby, if remote_apply or higher is requested. */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + SyncRepWaitForLSN(recptr, true); } static void diff --git a/src/include/access/xact.h b/src/include/access/xact.h index c38e30ea38d..bc1df57ba20 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -508,7 +508,7 @@ extern void UnregisterXactCallbackOnce(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); -extern void RecordDistributedForgetCommitted(DistributedTransactionId gxid); +extern XLogRecPtr RecordDistributedForgetCommitted(DistributedTransactionId gxid); extern bool IsSubTransactionAssignmentPending(void); extern void MarkSubTransactionAssigned(void);