Skip to content

Commit

Permalink
Fix pathtarget adjustment for MergeAppend paths
Browse files Browse the repository at this point in the history
The chunk-wise aggregation pushdown code creates a copy of the existing
paths; it copies the existing paths to create new ones with pushed-down
aggregates. However, the copy_merge_append_path function behaves
differently than other copy functions (e.g., copy_append_path); it
resets the pathtarget on the copy. This leads to a wrong pathlist and
crashes. This PR fixes the wrong pathtarget by setting it after the path
is copied.
  • Loading branch information
jnidzwetzki committed Jan 29, 2024
1 parent 50c757c commit d878b9e
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 12 deletions.
1 change: 1 addition & 0 deletions .unreleased/fix_6571
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6571 Fix pathtarget adjustment for MergeAppend paths in aggregation pushdown code
3 changes: 2 additions & 1 deletion src/nodes/chunk_append/chunk_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ create_group_subpath(PlannerInfo *root, RelOptInfo *rel, List *group, List *path
}

ChunkAppendPath *
ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths)
ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths, PathTarget *pathtarget)
{
ListCell *lc;
double total_cost = 0, rows = 0;
Expand All @@ -85,6 +85,7 @@ ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths)
}
new->cpath.path.total_cost = total_cost;
new->cpath.path.rows = rows;
new->cpath.path.pathtarget = copy_pathtarget(pathtarget);

return new;
}
Expand Down
3 changes: 2 additions & 1 deletion src/nodes/chunk_append/chunk_append.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ typedef struct ChunkAppendPath
int first_partial_path;
} ChunkAppendPath;

extern TSDLLEXPORT ChunkAppendPath *ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths);
extern TSDLLEXPORT ChunkAppendPath *ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths,
PathTarget *pathtarget);
extern Path *ts_chunk_append_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,
Path *subpath, bool parallel_aware, bool ordered,
List *nested_oids);
Expand Down
24 changes: 15 additions & 9 deletions src/planner/partialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,13 @@ get_subpaths_from_append_path(Path *path, bool handle_gather_path)
* Copy an AppendPath and set new subpaths.
*/
static AppendPath *
copy_append_path(AppendPath *path, List *subpaths)
copy_append_path(AppendPath *path, List *subpaths, PathTarget *pathtarget)
{
AppendPath *newPath = makeNode(AppendPath);
memcpy(newPath, path, sizeof(AppendPath));
newPath->subpaths = subpaths;
newPath->path.pathtarget = copy_pathtarget(pathtarget);

cost_append(newPath);

return newPath;
Expand All @@ -278,7 +280,8 @@ copy_append_path(AppendPath *path, List *subpaths)
* Copy a MergeAppendPath and set new subpaths.
*/
static MergeAppendPath *
copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths)
copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths,
PathTarget *pathtarget)
{
MergeAppendPath *newPath = create_merge_append_path_compat(root,
path->path.parent,
Expand All @@ -292,6 +295,7 @@ copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths)
#endif

newPath->path.param_info = path->path.param_info;
newPath->path.pathtarget = copy_pathtarget(pathtarget);

return newPath;
}
Expand All @@ -305,21 +309,23 @@ copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths, PathTar
if (IsA(path, AppendPath))
{
AppendPath *append_path = castNode(AppendPath, path);
append_path->path.pathtarget = pathtarget;
return (Path *) copy_append_path(append_path, new_subpaths);
AppendPath *new_append_path = copy_append_path(append_path, new_subpaths, pathtarget);
return &new_append_path->path;
}
else if (IsA(path, MergeAppendPath))
{
MergeAppendPath *merge_append_path = castNode(MergeAppendPath, path);
merge_append_path->path.pathtarget = pathtarget;
return (Path *) copy_merge_append_path(root, merge_append_path, new_subpaths);
MergeAppendPath *new_merge_append_path =
copy_merge_append_path(root, merge_append_path, new_subpaths, pathtarget);
return &new_merge_append_path->path;
}
else if (ts_is_chunk_append_path(path))
{
CustomPath *custom_path = castNode(CustomPath, path);
ChunkAppendPath *chunk_append_path = (ChunkAppendPath *) custom_path;
chunk_append_path->cpath.path.pathtarget = pathtarget;
return (Path *) ts_chunk_append_path_copy(chunk_append_path, new_subpaths);
ChunkAppendPath *new_chunk_append_path =
ts_chunk_append_path_copy(chunk_append_path, new_subpaths, pathtarget);
return &new_chunk_append_path->cpath.path;
}

/* Should never happen, already checked by caller */
Expand Down Expand Up @@ -767,7 +773,7 @@ ts_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_rel
if (!parse->hasAggs)
return;

/* Groupting sets are not supported by the partial aggregation pushdown */
/* Grouping sets are not supported by the partial aggregation pushdown */
if (parse->groupingSets)
return;

Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/skip_scan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ tsl_skip_scan_paths_add(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ou
* information used for creating the original one and we don't want to
* duplicate all the checks done when creating the original one.
*/
subpath = (Path *) ts_chunk_append_path_copy(ca, new_paths);
subpath = (Path *) ts_chunk_append_path_copy(ca, new_paths, ca->cpath.path.pathtarget);
}
else
{
Expand Down
85 changes: 85 additions & 0 deletions tsl/test/expected/agg_partials_pushdown.out
Original file line number Diff line number Diff line change
Expand Up @@ -510,3 +510,88 @@ SELECT timeCustom t, min(series_0) FROM PUBLIC.testtable2 GROUP BY t ORDER BY t
Worker 0: actual rows=1 loops=1
(33 rows)

RESET timescaledb.enable_chunkwise_aggregation;
RESET enable_hashagg;
-- Test aggregation pushdown with MergeAppend node
CREATE TABLE merge_append_test (start_time timestamptz, sensor_id int, cluster varchar (253), cost_recommendation_memory numeric);
SELECT * FROM create_hypertable('merge_append_test', 'start_time');
WARNING: column type "character varying" used for "cluster" does not follow best practices
NOTICE: adding not-null constraint to column "start_time"
hypertable_id | schema_name | table_name | created
---------------+-------------+-------------------+---------
4 | public | merge_append_test | t
(1 row)

CREATE INDEX merge_append_test_sensorid ON merge_append_test USING btree (start_time, sensor_id);
INSERT INTO merge_append_test
SELECT
date_series,
1,
'production-1',
random() * 100
FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series
;
INSERT INTO merge_append_test
SELECT
date_series,
sensor_id,
'production-2',
random() * 100
FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series,
generate_series(1, 100, 1) AS sensor_id
;
ANALYZE merge_append_test;
SET enable_seqscan = off;
SET random_page_cost = 0;
SET cpu_operator_cost = 0;
SET enable_hashagg = off;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SELECT set_config(CASE WHEN current_setting('server_version_num')::int < 160000 THEN 'force_parallel_mode' ELSE 'debug_parallel_query' END, 'off', false);
set_config
------------
off
(1 row)

:PREFIX
SELECT
start_time, sensor_id,
SUM(cost_recommendation_memory)
FROM
merge_append_test
WHERE
start_time >= '2023-11-27 00:00:00Z'
AND start_time <= '2023-12-01 00:00:00Z'
AND sensor_id < 10
AND CLUSTER = 'production-2'
GROUP BY
1, 2;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (actual rows=873 loops=1)
Output: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id, sum(_hyper_4_17_chunk.cost_recommendation_memory)
Group Key: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id
-> Merge Append (actual rows=873 loops=1)
Sort Key: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id
-> Partial GroupAggregate (actual rows=648 loops=1)
Output: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id, PARTIAL sum(_hyper_4_17_chunk.cost_recommendation_memory)
Group Key: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id
-> Index Scan using _hyper_4_17_chunk_merge_append_test_sensorid on _timescaledb_internal._hyper_4_17_chunk (actual rows=648 loops=1)
Output: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id, _hyper_4_17_chunk.cost_recommendation_memory
Index Cond: ((_hyper_4_17_chunk.start_time >= 'Sun Nov 26 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_17_chunk.start_time <= 'Thu Nov 30 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_17_chunk.sensor_id < 10))
Filter: ((_hyper_4_17_chunk.cluster)::text = 'production-2'::text)
Rows Removed by Filter: 72
-> Partial GroupAggregate (actual rows=225 loops=1)
Output: _hyper_4_18_chunk.start_time, _hyper_4_18_chunk.sensor_id, PARTIAL sum(_hyper_4_18_chunk.cost_recommendation_memory)
Group Key: _hyper_4_18_chunk.start_time, _hyper_4_18_chunk.sensor_id
-> Index Scan using _hyper_4_18_chunk_merge_append_test_sensorid on _timescaledb_internal._hyper_4_18_chunk (actual rows=225 loops=1)
Output: _hyper_4_18_chunk.start_time, _hyper_4_18_chunk.sensor_id, _hyper_4_18_chunk.cost_recommendation_memory
Index Cond: ((_hyper_4_18_chunk.start_time >= 'Sun Nov 26 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_18_chunk.start_time <= 'Thu Nov 30 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_18_chunk.sensor_id < 10))
Filter: ((_hyper_4_18_chunk.cluster)::text = 'production-2'::text)
Rows Removed by Filter: 25
(21 rows)

RESET enable_seqscan;
RESET random_page_cost;
RESET cpu_operator_cost;
RESET enable_hashagg;
56 changes: 56 additions & 0 deletions tsl/test/sql/agg_partials_pushdown.sql
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,59 @@ SELECT timeCustom t, min(series_0) FROM PUBLIC.testtable2 GROUP BY t ORDER BY t

:PREFIX
SELECT timeCustom t, min(series_0) FROM PUBLIC.testtable2 GROUP BY t ORDER BY t DESC NULLS LAST limit 2;

RESET timescaledb.enable_chunkwise_aggregation;
RESET enable_hashagg;

-- Test aggregation pushdown with MergeAppend node
CREATE TABLE merge_append_test (start_time timestamptz, sensor_id int, cluster varchar (253), cost_recommendation_memory numeric);
SELECT * FROM create_hypertable('merge_append_test', 'start_time');
CREATE INDEX merge_append_test_sensorid ON merge_append_test USING btree (start_time, sensor_id);

INSERT INTO merge_append_test
SELECT
date_series,
1,
'production-1',
random() * 100
FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series
;

INSERT INTO merge_append_test
SELECT
date_series,
sensor_id,
'production-2',
random() * 100
FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series,
generate_series(1, 100, 1) AS sensor_id
;

ANALYZE merge_append_test;

SET enable_seqscan = off;
SET random_page_cost = 0;
SET cpu_operator_cost = 0;
SET enable_hashagg = off;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SELECT set_config(CASE WHEN current_setting('server_version_num')::int < 160000 THEN 'force_parallel_mode' ELSE 'debug_parallel_query' END, 'off', false);

:PREFIX
SELECT
start_time, sensor_id,
SUM(cost_recommendation_memory)
FROM
merge_append_test
WHERE
start_time >= '2023-11-27 00:00:00Z'
AND start_time <= '2023-12-01 00:00:00Z'
AND sensor_id < 10
AND CLUSTER = 'production-2'
GROUP BY
1, 2;

RESET enable_seqscan;
RESET random_page_cost;
RESET cpu_operator_cost;
RESET enable_hashagg;

0 comments on commit d878b9e

Please sign in to comment.