Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize MPP FDW LIMIT/OFFSET push down when there is NULL/0. (#17246) #896

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions contrib/postgres_fdw/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ clean: clean-symlinks

clean-symlinks:
rm -f link-canary.c

# For postgres_fdw test
export PG_PORT=5432
installcheck: install prep_postgres
clean: clean_postgres
prep_postgres:
./postgres_setup.bash
clean_postgres:
./postgres_clean.bash
.PHONY: prep_postgres clean_postgres
21 changes: 19 additions & 2 deletions contrib/postgres_fdw/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -3449,14 +3449,31 @@ appendLimitClause(deparse_expr_cxt *context)
* This may reduce the number of tuples that we need to fetch from remote servers.
*/
Node *precount = copyObject(root->parse->limitCount);
Node *offset = copyObject(root->parse->limitOffset);
bool offset_needed = ((precount != NULL) && (offset != NULL));

/*
* If we've specified both OFFSET and LIMIT clause,
* it's enough to fetch tuples from 0 to limitCount + limitOffset from remote servers.
* optimize (N > 0):
* LIMIT 0 OFFSET N to LIMIT 0
* LIMIT 0 OFFSET NULL to LIMIT 0
* LIMIT N OFFSET NULL to LIMIT N
* LIMIT N OFFSET 0 to LIMIT N
*/
if (offset_needed &&
IsA(precount, Const) &&
(((Const *) precount)->constisnull || ((Const *) precount)->constvalue == 0))
offset_needed = false;

if (offset_needed &&
IsA(offset, Const) &&
(((Const *) offset)->constisnull || ((Const *) offset)->constvalue == 0))
offset_needed = false;

if (precount)
{
if (root->parse->limitOffset)
if (offset_needed)
{
ParseState *pstate = make_parsestate(NULL);
/*
Expand All @@ -3465,7 +3482,7 @@ appendLimitClause(deparse_expr_cxt *context)
*/
precount = (Node *) make_op(pstate,
list_make2(makeString("pg_catalog"), makeString(pstrdup("+"))),
copyObject(root->parse->limitOffset),
offset,
precount,
NULL,
-1);
Expand Down
99 changes: 99 additions & 0 deletions contrib/postgres_fdw/expected/mpp_gp2pg_postgres_fdw.out
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,105 @@ SELECT c1, c2 FROM mpp_ft2 order by c1 offset 998;
1000 | 0
(2 rows)

-- test LIMIT 0, OFFSET null/0
ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'true');
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit null offset 998;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Limit
Output: c1, c2
-> Gather Motion 2:1 (slice1; segments: 2)
Output: c1, c2
Merge Key: c1
-> Foreign Scan on public.mpp_ft2
Output: c1, c2
Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT NULL::bigint
Optimizer: Postgres-based planner
Settings: gp_enable_minmax_optimization = 'off'
(10 rows)

EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit all offset 998;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Limit
Output: c1, c2
-> Gather Motion 2:1 (slice1; segments: 2)
Output: c1, c2
Merge Key: c1
-> Foreign Scan on public.mpp_ft2
Output: c1, c2
Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT NULL::bigint
Optimizer: Postgres-based planner
Settings: gp_enable_minmax_optimization = 'off'
(10 rows)

EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset 998;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Limit
Output: c1, c2
-> Gather Motion 2:1 (slice1; segments: 2)
Output: c1, c2
Merge Key: c1
-> Foreign Scan on public.mpp_ft2
Output: c1, c2
Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 0::bigint
Optimizer: Postgres-based planner
Settings: gp_enable_minmax_optimization = 'off'
(10 rows)

EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset null;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Limit
Output: c1, c2
-> Gather Motion 2:1 (slice1; segments: 2)
Output: c1, c2
Merge Key: c1
-> Foreign Scan on public.mpp_ft2
Output: c1, c2
Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 0::bigint
Optimizer: Postgres-based planner
Settings: gp_enable_minmax_optimization = 'off'
(10 rows)

EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset null;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Limit
Output: c1, c2
-> Gather Motion 2:1 (slice1; segments: 2)
Output: c1, c2
Merge Key: c1
-> Foreign Scan on public.mpp_ft2
Output: c1, c2
Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 3::bigint
Optimizer: Postgres-based planner
Settings: gp_enable_minmax_optimization = 'off'
(10 rows)

EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset 0;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Limit
Output: c1, c2
-> Gather Motion 2:1 (slice1; segments: 2)
Output: c1, c2
Merge Key: c1
-> Foreign Scan on public.mpp_ft2
Output: c1, c2
Remote SQL: SELECT c1, c2 FROM "MPP_S 1"."T 2" ORDER BY c1 ASC NULLS LAST LIMIT 3::bigint
Optimizer: Postgres-based planner
Settings: gp_enable_minmax_optimization = 'off'
(10 rows)

ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'false');
-- Query with aggregates and limit clause together is NOT pushed down.
-- Because it's unsafe to do partial aggregate and limit in multiple remote servers.
EXPLAIN (VERBOSE, COSTS OFF)
Expand Down
17 changes: 17 additions & 0 deletions contrib/postgres_fdw/sql/mpp_gp2pg_postgres_fdw.sql
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,23 @@ SELECT c1, c2 FROM mpp_ft2 order by c1 offset 2 limit 3;
EXPLAIN VERBOSE
SELECT c1, c2 FROM mpp_ft2 order by c1 offset 998;
SELECT c1, c2 FROM mpp_ft2 order by c1 offset 998;

-- test LIMIT 0, OFFSET null/0
ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'true');
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit null offset 998;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit all offset 998;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset 998;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 0 offset null;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset null;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT c1, c2 FROM mpp_ft2 order by c1 limit 3 offset 0;
ALTER FOREIGN TABLE mpp_ft2 OPTIONS(set use_remote_estimate 'false');

-- Query with aggregates and limit clause together is NOT pushed down.
-- Because it's unsafe to do partial aggregate and limit in multiple remote servers.
EXPLAIN (VERBOSE, COSTS OFF)
Expand Down
65 changes: 40 additions & 25 deletions gpMgmt/bin/gppylib/util/ssh_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,35 +192,50 @@ def login(self, hostList=None, userName=None, delaybeforesend=0.05, sync_multipl
good_list = []
print_lock = threading.Lock()

def connect_host(hostname, p):
self.hostList.append(hostname)
try:
# The sync_multiplier value is passed onto pexpect.pxssh which is used to determine timeout
# values for prompt verification after an ssh connection is established.
p.login(hostname, self.userName, sync_multiplier=sync_multiplier)
p.x_peer = hostname
p.x_pid = p.pid
good_list.append(p)
if self.verbose:
def connect_host(hostname):
retry_login = True

while True:
# create a new PxsshWrapper object for each retry to avoid using the
# same object which can cause unexpected behaviours
p = gppxssh_wrapper.PxsshWrapper(delaybeforesend=delaybeforesend,
sync_retries=sync_retries,
options={"StrictHostKeyChecking": "no",
"BatchMode": "yes"})

try:
# The sync_multiplier value is passed onto pexpect.pxssh which is used to determine timeout
# values for prompt verification after an ssh connection is established.
p.login(hostname, self.userName, sync_multiplier=sync_multiplier)
p.x_peer = hostname
p.x_pid = p.pid
good_list.append(p)
if self.verbose:
with print_lock:
print('[INFO] login %s' % hostname)
except Exception as e:
# some logins fail due to the clearing of the TERM env variable
# retry by restoring the TERM variable to see if it succeeds or else error out
if origTERM and retry_login:
retry_login = False
os.putenv('TERM', origTERM)
continue

with print_lock:
print('[INFO] login %s' % hostname)
except Exception as e:
with print_lock:
print('[ERROR] unable to login to %s' % hostname)
if type(e) is pxssh.ExceptionPxssh:
print(e)
elif type(e) is pxssh.EOF:
print('Could not acquire connection.')
else:
print('hint: use gpssh-exkeys to setup public-key authentication between hosts')
print('[ERROR] unable to login to %s' % hostname)
if type(e) is pxssh.ExceptionPxssh:
print(e)
elif type(e) is pxssh.EOF:
print('Could not acquire connection.')
print(e)
else:
print('hint: use gpssh-exkeys to setup public-key authentication between hosts')

break

thread_list = []
for host in hostList:
p = gppxssh_wrapper.PxsshWrapper(delaybeforesend=delaybeforesend,
sync_retries=sync_retries,
options={"StrictHostKeyChecking": "no",
"BatchMode": "yes"})
t = threading.Thread(target=connect_host, args=(host, p))
t = threading.Thread(target=connect_host, args=[host])
t.start()
thread_list.append(t)

Expand Down
33 changes: 32 additions & 1 deletion gpMgmt/bin/gppylib/util/test/unit/test_cluster_ssh_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys, os, pwd
import unittest
from io import StringIO
from mock import patch
from mock import patch, call

try:
gphome = os.environ.get('GPHOME')
Expand Down Expand Up @@ -91,7 +91,38 @@ def test04_exceptions(self, mock_stdout):
session2 = Session()
session2.login(['localhost'], 'gpadmin', 0.05, 1.0)
self.assertIn('[ERROR] unable to login to localhost\nhint: use gpssh-exkeys to setup public-key authentication between hosts\n', mock_stdout.getvalue())

@patch('os.getenv', return_value="term")
@patch('os.putenv')
@patch('sys.stdout', new_callable=StringIO)
def test05_login_retry_when_term_variable_is_set(self, mock_stdout, mock_putenv, mock_getenv):
'''
Test pxssh.login() retry when there is an exception and TERM env variable is set
'''

with mock.patch.object(pxssh.pxssh, 'login', side_effect=pxssh.EOF('foo')) as mock_login:
session = Session()
session.login(['localhost'], 'gpadmin', 0.05, 1.0)
self.assertIn('[ERROR] unable to login to localhost\nCould not acquire connection.\n', mock_stdout.getvalue())
mock_stdout.truncate(0)
assert mock_putenv.call_count == 3
mock_putenv.assert_has_calls([call('TERM', ''), call('TERM', 'term'), call('TERM', 'term')])

@patch('os.getenv', return_value=None)
@patch('os.putenv')
@patch('sys.stdout', new_callable=StringIO)
def test06_login_does_not_retry_when_term_variable_is_not_set(self, mock_stdout, mock_putenv, mock_getenv):
'''
Test pxssh.login() does not retry when there is an exception and TERM env variable is not set
'''

with mock.patch.object(pxssh.pxssh, 'login', side_effect=pxssh.EOF('foo')) as mock_login:
session = Session()
session.login(['localhost'], 'gpadmin', 0.05, 1.0)
self.assertIn('[ERROR] unable to login to localhost\nCould not acquire connection.\n', mock_stdout.getvalue())
self.assertNotIn('Retrying by restoring the TERM env variable.\n', mock_stdout.getvalue())
mock_stdout.truncate(0)
mock_putenv.assert_called_once_with('TERM', '')

if __name__ == "__main__":
unittest.main()
10 changes: 8 additions & 2 deletions src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -1825,17 +1825,23 @@ RecordTransactionCommit(void)
/*
* RecordDistributedForgetCommitted
*/
void
XLogRecPtr
RecordDistributedForgetCommitted(DistributedTransactionId gxid)
{
xl_xact_distributed_forget xlrec;
XLogRecPtr recptr;

xlrec.gxid = gxid;

XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xl_xact_distributed_forget));

XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET);
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET);
/* only flush immediately if we want to wait for remote_apply */
if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
XLogFlush(recptr);

return recptr;
}

/*
Expand Down
Loading
Loading