From d86528172dace98adcd782d35be921bd9204b4cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Fri, 7 Mar 2025 19:20:38 -0300 Subject: [PATCH] More regression tests --- tsl/src/continuous_aggs/refresh.c | 166 ++++++++++++++---- .../cagg_refresh_policy_incremental.out | 113 ++++++++++++ .../sql/cagg_refresh_policy_incremental.sql | 54 +++++- 3 files changed, 293 insertions(+), 40 deletions(-) diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index f78f01a51ae..d9a6d286afc 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -972,67 +972,122 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig debug_refresh_window(cagg, &refresh_window, "begin"); - Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id); - const Dimension *time_dim; - time_dim = hyperspace_get_open_dimension(ht->space, 0); + const Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id); + const Dimension *time_dim = hyperspace_get_open_dimension(ht->space, 0); - /* If refresh window range start is NULL then get the first bucket from the original hypertable + /* + * Cap the refresh window to the min and max time of the hypertable + * + * In order to don't produce unecessary batches we need to check if the start and end of the + * refresh window is NULL then get the min/max slice from the original hypertable + * */ if (refresh_window.start_isnull) { debug_refresh_window(cagg, &refresh_window, "START IS NULL"); DimensionSlice *slice = ts_dimension_slice_nth_earliest_slice(time_dim->fd.id, 1); - /* If still there's no MIN range then produce only one range */ + /* If still there's no MIN slice range start then return no batches */ if (NULL == slice || TS_TIME_IS_MIN(slice->fd.range_start, refresh_window.type) || TS_TIME_IS_NOBEGIN(slice->fd.range_start, refresh_window.type)) { + elog(LOG, + "no min slice range start for continuous aggregate \"%s.%s\", falling back to " + "single " + "batch processing", + NameStr(cagg->data.user_view_schema), + NameStr(cagg->data.user_view_name)); return NIL; } refresh_window.start = slice->fd.range_start; refresh_window.start_isnull = false; } - int64 bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function); - if (cagg->bucket_function->bucket_fixed_interval == false) - { - ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start, - &refresh_window.end, - cagg->bucket_function); - } - else - { - refresh_window = - compute_inscribed_bucketed_refresh_window(cagg, &refresh_window, bucket_width); - } - if (refresh_window.end_isnull) { debug_refresh_window(cagg, &refresh_window, "END IS NULL"); DimensionSlice *slice = ts_dimension_slice_nth_latest_slice(time_dim->fd.id, 1); - /* If still there's no MAX range then produce only one range */ + /* If still there's no MAX slice range start then return no batches */ if (NULL == slice || TS_TIME_IS_MAX(slice->fd.range_end, refresh_window.type) || TS_TIME_IS_NOEND(slice->fd.range_end, refresh_window.type)) { + elog(LOG, + "no min slice range start for continuous aggregate \"%s.%s\", falling back to " + "single " + "batch processing", + NameStr(cagg->data.user_view_schema), + NameStr(cagg->data.user_view_name)); return NIL; } refresh_window.end = slice->fd.range_end; refresh_window.end_isnull = false; } - int64 refresh_size = refresh_window.end - refresh_window.start; - int64 batch_size = (bucket_width * buckets_per_batch); + /* Compute the inscribed bucket for the capped refresh window range */ + const int64 bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function); + if (cagg->bucket_function->bucket_fixed_interval == false) + { + ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start, + &refresh_window.end, + cagg->bucket_function); + } + else + { + refresh_window = + compute_inscribed_bucketed_refresh_window(cagg, &refresh_window, bucket_width); + } - if (refresh_size <= batch_size) + /* Check if the refresh size is large enough to produce bathes, if not then return no batches */ + const int64 refresh_window_size = refresh_window.end - refresh_window.start; + const int64 batch_size = (bucket_width * buckets_per_batch); + + if (refresh_window_size <= batch_size) { + Oid type = IS_TIMESTAMP_TYPE(refresh_window.type) ? INTERVALOID : refresh_window.type; + Datum refresh_size_interval = ts_internal_to_interval_value(refresh_window_size, type); + Datum batch_size_interval = ts_internal_to_interval_value(batch_size, type); + Oid typoutputfunc; + bool isvarlena; + FmgrInfo typoutputinfo; + + getTypeOutputInfo(type, &typoutputfunc, &isvarlena); + fmgr_info(typoutputfunc, &typoutputinfo); + + elog(LOG, + "refresh window size (%s) is smaller than or equal to batch size (%s), falling back " + "to single batch processing", + OutputFunctionCall(&typoutputinfo, refresh_size_interval), + OutputFunctionCall(&typoutputinfo, batch_size_interval)); return NIL; } - debug_refresh_window(cagg, &refresh_window, "before produce ranges"); + debug_refresh_window(cagg, &refresh_window, "before produce batches"); + /* + * Produce the batches to be processed + * + * The refresh window is split into multiple batches of size `batch_size` each. The batches are + * produced in reverse order so that the first range produced is the last range to be processed. + * + * The batches are produced in reverse order because the most recent data should be the first to + * be processed and be visible for the users. + * + * It takes in account the invalidation logs (hypertable and materialization hypertable) to + * avoid producing wholes that have no data to be processed. + * + * The logic is somethinkg like the following: + * 1. Get dimension slices from the original hypertables + * 2. Get either hypertable and materialization hypertable invalidation logs + * 3. Produce the batches in reverse order + * 4. Check if the produced batch overlaps either with dimension slices #1 and invalidation logs + * #2 + * 5. If the batch overlaps with both then it's a valid batch to be processed + * 6. If the batch overlaps with only one of them then it's not a valid batch to be processed + * 7. If the batch does not overlap with any of them then it's not a valid batch to be processed + */ const char *query_str = " \ - WITH chunk_ranges AS ( \ + WITH dimension_slices AS ( \ SELECT \ range_start AS start, \ range_end AS end \ @@ -1069,11 +1124,11 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig pg_catalog.generate_series($5, $6, $4) AS refresh_start \ WHERE \ EXISTS ( \ - SELECT FROM chunk_ranges \ + SELECT FROM dimension_slices \ WHERE \ pg_catalog.int8range(refresh_start, LEAST($6::numeric, refresh_start::numeric + $4::numeric)::bigint) \ OPERATOR(pg_catalog.&&) \ - pg_catalog.int8range(chunk_ranges.start, chunk_ranges.end) \ + pg_catalog.int8range(dimension_slices.start, dimension_slices.end) \ ) \ AND EXISTS ( \ SELECT FROM \ @@ -1088,7 +1143,10 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig ORDER BY \ refresh_start DESC;"; + /* List of InternalTimeRange elements to be returned */ List *refresh_window_list = NIL; + + /* Prepare for SPI call */ int res; Oid types[] = { INT4OID, INT4OID, INT4OID, INT8OID, INT8OID, INT8OID }; Datum values[] = { Int32GetDatum(ht->fd.id), @@ -1100,9 +1158,6 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig char nulls[] = { false, false, false, false, false, false }; MemoryContext oldcontext = CurrentMemoryContext; - /* - * Query for the oldest chunk in the hypertable. - */ if (SPI_connect() != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI"); @@ -1115,8 +1170,24 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig 0 /* count */); if (res < 0) - elog(ERROR, "%s: could not get the last bucket of the materialized data", __func__); + elog(ERROR, "%s: could not produce batches for the policy cagg refresh", __func__); + if (SPI_processed == 1) + { + elog(LOG, + "only one batch produced for continuous aggregate \"%s.%s\", falling back to single " + "batch processing", + NameStr(cagg->data.user_view_schema), + NameStr(cagg->data.user_view_name)); + + res = SPI_finish(); + if (res != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res)); + + return NIL; + } + + /* Build the batches list */ for (uint64 i = 0; i < SPI_processed; i++) { bool range_start_isnull, range_end_isnull; @@ -1135,29 +1206,46 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig range->end_isnull = range_end_isnull; range->type = original_refresh_window->type; - /* When dropping chunks we need to align the start of the first range to cover dropped - * chunks if they exist */ - if (i == (SPI_processed - 1) && original_refresh_window->start_isnull) + /* + * To make sure that the first range is aligned with the end of the refresh window + * we need to set the end to the maximum value of the time type if the original refresh + * window end is NULL. + */ + if (i == 0 && original_refresh_window->end_isnull) { - range->start = original_refresh_window->start; - range->start_isnull = true; + range->end = ts_time_get_noend_or_max(range->type); + range->end_isnull = true; } - if (i == 0 && original_refresh_window->end_isnull) + /* + * To make sure that the last range is aligned with the start of the refresh window + * we need to set the start to the maximum value of the time type if the original refresh + * window start is NULL. + */ + if (i == (SPI_processed - 1) && original_refresh_window->start_isnull) { - range->end = original_refresh_window->end; - range->end_isnull = true; + range->start = ts_time_get_nobegin_or_min(range->type); + range->start_isnull = true; } refresh_window_list = lappend(refresh_window_list, range); MemoryContextSwitchTo(saved_context); - debug_refresh_window(cagg, range, "range refresh"); + debug_refresh_window(cagg, range, "batch produced"); } res = SPI_finish(); if (res != SPI_OK_FINISH) elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res)); + if (refresh_window_list == NIL) + { + elog(LOG, + "no valid batches produced for continuous aggregate \"%s.%s\", falling back to single " + "batch processing", + NameStr(cagg->data.user_view_schema), + NameStr(cagg->data.user_view_name)); + } + return refresh_window_list; } diff --git a/tsl/test/expected/cagg_refresh_policy_incremental.out b/tsl/test/expected/cagg_refresh_policy_incremental.out index 094ec25ea33..f14e0e5cf27 100644 --- a/tsl/test/expected/cagg_refresh_policy_incremental.out +++ b/tsl/test/expected/cagg_refresh_policy_incremental.out @@ -387,3 +387,116 @@ DETAIL: buckets_per_batch: -1 HINT: The buckets per batch should be greater than or equal to zero. \set VERBOSITY terse \set ON_ERROR_STOP 1 +-- Truncate all data from the original hypertable +TRUNCATE bgw_log, conditions; +-- advance time by 4h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '4 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +-- Should fallback to single batch processing because there's no data to be refreshed on the original hypertable +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------- + 0 | 14400000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 14400000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | no min slice range start for continuous aggregate "public.conditions_by_day", falling back to single batch processing + 1 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Nov 23 16:07:02 4714 LMT BC, Wed Mar 05 16:00:00 2025 PST ] + 2 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | deleted 295 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2" + 3 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | inserted 0 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2" +(6 rows) + +-- Should return zero rows +SELECT count(*) FROM conditions_by_day; + count +------- + 0 +(1 row) + +-- 1 day of data +INSERT INTO conditions +SELECT + t, d, 10 +FROM + generate_series( + '2020-02-05 00:00:00-03', + '2020-02-06 00:00:00-03', + '1 hour'::interval) AS t, + generate_series(1,5) AS d; +TRUNCATE bgw_log; +-- advance time by 5h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '5 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +-- Should fallback to single batch processing because the refresh size is too small +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------- + 0 | 18000000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 18000000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | only one batch produced for continuous aggregate "public.conditions_by_day", falling back to single batch processing + 1 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Tue Feb 04 16:00:00 2020 PST, Thu Feb 06 16:00:00 2020 PST ] + 2 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2" + 3 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | inserted 10 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2" +(6 rows) + +-- Should return 10 rows because the bucket width is `1 day` and buckets per batch is `10` +SELECT count(*) FROM conditions_by_day; + count +------- + 10 +(1 row) + +TRUNCATE conditions_by_day, conditions, bgw_log; +-- Less than 1 day of data (smaller than the bucket width) +INSERT INTO conditions +VALUES ('2020-02-05 00:00:00-03', 1, 10); +-- advance time by 6h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '6 hour')::bigint * 1000000, true); + ts_bgw_params_reset_time +-------------------------- + +(1 row) + +-- Should fallback to single batch processing because the refresh size is too small +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +SELECT * FROM sorted_bgw_log; + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------- + 0 | 21600000000 | DB Scheduler | [TESTING] Registered new background worker + 1 | 21600000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM) + 0 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | refresh window size (7 days) is smaller than or equal to batch size (10 days), falling back to single batch processing + 1 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Nov 23 16:07:02 4714 LMT BC, Wed Mar 05 16:00:00 2025 PST ] + 2 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2" + 3 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2" +(6 rows) + +-- Should return 1 row +SELECT count(*) FROM conditions_by_day; + count +------- + 1 +(1 row) + diff --git a/tsl/test/sql/cagg_refresh_policy_incremental.sql b/tsl/test/sql/cagg_refresh_policy_incremental.sql index 3c427d3b223..08255b6e802 100644 --- a/tsl/test/sql/cagg_refresh_policy_incremental.sql +++ b/tsl/test/sql/cagg_refresh_policy_incremental.sql @@ -228,4 +228,56 @@ FROM config => jsonb_set(:'config', '{buckets_per_batch}', '-1') ); \set VERBOSITY terse -\set ON_ERROR_STOP 1 \ No newline at end of file +\set ON_ERROR_STOP 1 + +-- Truncate all data from the original hypertable +TRUNCATE bgw_log, conditions; + +-- advance time by 4h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '4 hour')::bigint * 1000000, true); + +-- Should fallback to single batch processing because there's no data to be refreshed on the original hypertable +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); +SELECT * FROM sorted_bgw_log; + +-- Should return zero rows +SELECT count(*) FROM conditions_by_day; + +-- 1 day of data +INSERT INTO conditions +SELECT + t, d, 10 +FROM + generate_series( + '2020-02-05 00:00:00-03', + '2020-02-06 00:00:00-03', + '1 hour'::interval) AS t, + generate_series(1,5) AS d; + +TRUNCATE bgw_log; + +-- advance time by 5h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '5 hour')::bigint * 1000000, true); + +-- Should fallback to single batch processing because the refresh size is too small +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); +SELECT * FROM sorted_bgw_log; + +-- Should return 10 rows because the bucket width is `1 day` and buckets per batch is `10` +SELECT count(*) FROM conditions_by_day; + +TRUNCATE conditions_by_day, conditions, bgw_log; + +-- Less than 1 day of data (smaller than the bucket width) +INSERT INTO conditions +VALUES ('2020-02-05 00:00:00-03', 1, 10); + +-- advance time by 6h so that job runs one more time +SELECT ts_bgw_params_reset_time(extract(epoch from interval '6 hour')::bigint * 1000000, true); + +-- Should fallback to single batch processing because the refresh size is too small +SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); +SELECT * FROM sorted_bgw_log; + +-- Should return 1 row +SELECT count(*) FROM conditions_by_day;