Skip to content

Commit

Permalink
More regression tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Mar 7, 2025
1 parent 82210c2 commit d865281
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 40 deletions.
166 changes: 127 additions & 39 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -972,67 +972,122 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig

debug_refresh_window(cagg, &refresh_window, "begin");

Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
const Dimension *time_dim;
time_dim = hyperspace_get_open_dimension(ht->space, 0);
const Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
const Dimension *time_dim = hyperspace_get_open_dimension(ht->space, 0);

/* If refresh window range start is NULL then get the first bucket from the original hypertable
/*
* Cap the refresh window to the min and max time of the hypertable
*
* In order to don't produce unecessary batches we need to check if the start and end of the
* refresh window is NULL then get the min/max slice from the original hypertable
*
*/
if (refresh_window.start_isnull)
{
debug_refresh_window(cagg, &refresh_window, "START IS NULL");
DimensionSlice *slice = ts_dimension_slice_nth_earliest_slice(time_dim->fd.id, 1);

/* If still there's no MIN range then produce only one range */
/* If still there's no MIN slice range start then return no batches */
if (NULL == slice || TS_TIME_IS_MIN(slice->fd.range_start, refresh_window.type) ||
TS_TIME_IS_NOBEGIN(slice->fd.range_start, refresh_window.type))
{
elog(LOG,
"no min slice range start for continuous aggregate \"%s.%s\", falling back to "
"single "
"batch processing",
NameStr(cagg->data.user_view_schema),
NameStr(cagg->data.user_view_name));
return NIL;
}
refresh_window.start = slice->fd.range_start;
refresh_window.start_isnull = false;
}

int64 bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function);
if (cagg->bucket_function->bucket_fixed_interval == false)
{
ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
}
else
{
refresh_window =
compute_inscribed_bucketed_refresh_window(cagg, &refresh_window, bucket_width);
}

if (refresh_window.end_isnull)
{
debug_refresh_window(cagg, &refresh_window, "END IS NULL");
DimensionSlice *slice = ts_dimension_slice_nth_latest_slice(time_dim->fd.id, 1);

/* If still there's no MAX range then produce only one range */
/* If still there's no MAX slice range start then return no batches */
if (NULL == slice || TS_TIME_IS_MAX(slice->fd.range_end, refresh_window.type) ||
TS_TIME_IS_NOEND(slice->fd.range_end, refresh_window.type))
{
elog(LOG,
"no min slice range start for continuous aggregate \"%s.%s\", falling back to "
"single "
"batch processing",
NameStr(cagg->data.user_view_schema),
NameStr(cagg->data.user_view_name));
return NIL;
}
refresh_window.end = slice->fd.range_end;
refresh_window.end_isnull = false;
}

int64 refresh_size = refresh_window.end - refresh_window.start;
int64 batch_size = (bucket_width * buckets_per_batch);
/* Compute the inscribed bucket for the capped refresh window range */
const int64 bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function);
if (cagg->bucket_function->bucket_fixed_interval == false)
{
ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
}
else
{
refresh_window =
compute_inscribed_bucketed_refresh_window(cagg, &refresh_window, bucket_width);
}

if (refresh_size <= batch_size)
/* Check if the refresh size is large enough to produce bathes, if not then return no batches */
const int64 refresh_window_size = refresh_window.end - refresh_window.start;
const int64 batch_size = (bucket_width * buckets_per_batch);

if (refresh_window_size <= batch_size)
{
Oid type = IS_TIMESTAMP_TYPE(refresh_window.type) ? INTERVALOID : refresh_window.type;
Datum refresh_size_interval = ts_internal_to_interval_value(refresh_window_size, type);
Datum batch_size_interval = ts_internal_to_interval_value(batch_size, type);
Oid typoutputfunc;
bool isvarlena;
FmgrInfo typoutputinfo;

getTypeOutputInfo(type, &typoutputfunc, &isvarlena);
fmgr_info(typoutputfunc, &typoutputinfo);

elog(LOG,
"refresh window size (%s) is smaller than or equal to batch size (%s), falling back "
"to single batch processing",
OutputFunctionCall(&typoutputinfo, refresh_size_interval),
OutputFunctionCall(&typoutputinfo, batch_size_interval));
return NIL;
}

debug_refresh_window(cagg, &refresh_window, "before produce ranges");
debug_refresh_window(cagg, &refresh_window, "before produce batches");

/*
* Produce the batches to be processed
*
* The refresh window is split into multiple batches of size `batch_size` each. The batches are
* produced in reverse order so that the first range produced is the last range to be processed.
*
* The batches are produced in reverse order because the most recent data should be the first to
* be processed and be visible for the users.
*
* It takes in account the invalidation logs (hypertable and materialization hypertable) to
* avoid producing wholes that have no data to be processed.
*
* The logic is somethinkg like the following:
* 1. Get dimension slices from the original hypertables
* 2. Get either hypertable and materialization hypertable invalidation logs
* 3. Produce the batches in reverse order
* 4. Check if the produced batch overlaps either with dimension slices #1 and invalidation logs
* #2
* 5. If the batch overlaps with both then it's a valid batch to be processed
* 6. If the batch overlaps with only one of them then it's not a valid batch to be processed
* 7. If the batch does not overlap with any of them then it's not a valid batch to be processed
*/
const char *query_str = " \
WITH chunk_ranges AS ( \
WITH dimension_slices AS ( \
SELECT \
range_start AS start, \
range_end AS end \
Expand Down Expand Up @@ -1069,11 +1124,11 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
pg_catalog.generate_series($5, $6, $4) AS refresh_start \
WHERE \
EXISTS ( \
SELECT FROM chunk_ranges \
SELECT FROM dimension_slices \
WHERE \
pg_catalog.int8range(refresh_start, LEAST($6::numeric, refresh_start::numeric + $4::numeric)::bigint) \
OPERATOR(pg_catalog.&&) \
pg_catalog.int8range(chunk_ranges.start, chunk_ranges.end) \
pg_catalog.int8range(dimension_slices.start, dimension_slices.end) \
) \
AND EXISTS ( \
SELECT FROM \
Expand All @@ -1088,7 +1143,10 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
ORDER BY \
refresh_start DESC;";

/* List of InternalTimeRange elements to be returned */
List *refresh_window_list = NIL;

/* Prepare for SPI call */
int res;
Oid types[] = { INT4OID, INT4OID, INT4OID, INT8OID, INT8OID, INT8OID };
Datum values[] = { Int32GetDatum(ht->fd.id),
Expand All @@ -1100,9 +1158,6 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
char nulls[] = { false, false, false, false, false, false };
MemoryContext oldcontext = CurrentMemoryContext;

/*
* Query for the oldest chunk in the hypertable.
*/
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

Expand All @@ -1115,8 +1170,24 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
0 /* count */);

if (res < 0)
elog(ERROR, "%s: could not get the last bucket of the materialized data", __func__);
elog(ERROR, "%s: could not produce batches for the policy cagg refresh", __func__);

if (SPI_processed == 1)
{
elog(LOG,
"only one batch produced for continuous aggregate \"%s.%s\", falling back to single "
"batch processing",
NameStr(cagg->data.user_view_schema),
NameStr(cagg->data.user_view_name));

res = SPI_finish();
if (res != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return NIL;
}

/* Build the batches list */
for (uint64 i = 0; i < SPI_processed; i++)
{
bool range_start_isnull, range_end_isnull;
Expand All @@ -1135,29 +1206,46 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
range->end_isnull = range_end_isnull;
range->type = original_refresh_window->type;

/* When dropping chunks we need to align the start of the first range to cover dropped
* chunks if they exist */
if (i == (SPI_processed - 1) && original_refresh_window->start_isnull)
/*
* To make sure that the first range is aligned with the end of the refresh window
* we need to set the end to the maximum value of the time type if the original refresh
* window end is NULL.
*/
if (i == 0 && original_refresh_window->end_isnull)
{
range->start = original_refresh_window->start;
range->start_isnull = true;
range->end = ts_time_get_noend_or_max(range->type);
range->end_isnull = true;
}

if (i == 0 && original_refresh_window->end_isnull)
/*
* To make sure that the last range is aligned with the start of the refresh window
* we need to set the start to the maximum value of the time type if the original refresh
* window start is NULL.
*/
if (i == (SPI_processed - 1) && original_refresh_window->start_isnull)
{
range->end = original_refresh_window->end;
range->end_isnull = true;
range->start = ts_time_get_nobegin_or_min(range->type);
range->start_isnull = true;
}

refresh_window_list = lappend(refresh_window_list, range);
MemoryContextSwitchTo(saved_context);

debug_refresh_window(cagg, range, "range refresh");
debug_refresh_window(cagg, range, "batch produced");
}

res = SPI_finish();
if (res != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

if (refresh_window_list == NIL)
{
elog(LOG,
"no valid batches produced for continuous aggregate \"%s.%s\", falling back to single "
"batch processing",
NameStr(cagg->data.user_view_schema),
NameStr(cagg->data.user_view_name));
}

return refresh_window_list;
}
113 changes: 113 additions & 0 deletions tsl/test/expected/cagg_refresh_policy_incremental.out
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,116 @@ DETAIL: buckets_per_batch: -1
HINT: The buckets per batch should be greater than or equal to zero.
\set VERBOSITY terse
\set ON_ERROR_STOP 1
-- Truncate all data from the original hypertable
TRUNCATE bgw_log, conditions;
-- advance time by 4h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '4 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------

(1 row)

-- Should fallback to single batch processing because there's no data to be refreshed on the original hypertable
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------

(1 row)

SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 14400000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 14400000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | no min slice range start for continuous aggregate "public.conditions_by_day", falling back to single batch processing
1 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Nov 23 16:07:02 4714 LMT BC, Wed Mar 05 16:00:00 2025 PST ]
2 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | deleted 295 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | inserted 0 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(6 rows)

-- Should return zero rows
SELECT count(*) FROM conditions_by_day;
count
-------
0
(1 row)

-- 1 day of data
INSERT INTO conditions
SELECT
t, d, 10
FROM
generate_series(
'2020-02-05 00:00:00-03',
'2020-02-06 00:00:00-03',
'1 hour'::interval) AS t,
generate_series(1,5) AS d;
TRUNCATE bgw_log;
-- advance time by 5h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '5 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------

(1 row)

-- Should fallback to single batch processing because the refresh size is too small
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------

(1 row)

SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 18000000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 18000000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | only one batch produced for continuous aggregate "public.conditions_by_day", falling back to single batch processing
1 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Tue Feb 04 16:00:00 2020 PST, Thu Feb 06 16:00:00 2020 PST ]
2 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | inserted 10 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(6 rows)

-- Should return 10 rows because the bucket width is `1 day` and buckets per batch is `10`
SELECT count(*) FROM conditions_by_day;
count
-------
10
(1 row)

TRUNCATE conditions_by_day, conditions, bgw_log;
-- Less than 1 day of data (smaller than the bucket width)
INSERT INTO conditions
VALUES ('2020-02-05 00:00:00-03', 1, 10);
-- advance time by 6h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '6 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------

(1 row)

-- Should fallback to single batch processing because the refresh size is too small
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------

(1 row)

SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 21600000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 21600000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | refresh window size (7 days) is smaller than or equal to batch size (10 days), falling back to single batch processing
1 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Nov 23 16:07:02 4714 LMT BC, Wed Mar 05 16:00:00 2025 PST ]
2 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(6 rows)

-- Should return 1 row
SELECT count(*) FROM conditions_by_day;
count
-------
1
(1 row)

Loading

0 comments on commit d865281

Please sign in to comment.