Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental CAgg Refresh Policy #7790

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_7790
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7790 Introduce configurable Incremental CAgg Refresh Policy
10 changes: 7 additions & 3 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ CREATE OR REPLACE PROCEDURE @[email protected]_columnstore_policy(

/* continuous aggregates policy */
CREATE OR REPLACE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
include_tiered_data BOOL = NULL,
buckets_per_batch INTEGER = NULL,
max_batches_per_execution INTEGER = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
Expand Down
29 changes: 29 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,32 @@ CREATE INDEX compression_settings_compress_relid_idx ON _timescaledb_catalog.com
DROP TABLE _timescaledb_catalog.tempsettings CASCADE;
GRANT SELECT ON _timescaledb_catalog.compression_settings TO PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_settings', '');


-- New add_continuous_aggregate_policy API for incremental refresh policy
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT,
include_tiered_data BOOL
);

CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL,
buckets_per_batch INTEGER = NULL,
max_batches_per_execution INTEGER = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
28 changes: 28 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,31 @@ FROM
DROP TABLE _timescaledb_catalog.tempsettings CASCADE;
GRANT SELECT ON _timescaledb_catalog.compression_settings TO PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_settings', '');

-- Revert add_continuous_aggregate_policy API for incremental refresh policy
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT,
include_tiered_data BOOL,
buckets_per_batch INTEGER,
max_batches_per_execution INTEGER
);

CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
30 changes: 30 additions & 0 deletions src/dimension_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,36 @@ ts_dimension_slice_nth_latest_slice(int32 dimension_id, int n)
return ret;
}

DimensionSlice *
ts_dimension_slice_nth_earliest_slice(int32 dimension_id, int n)
{
ScanKeyData scankey[1];
int num_tuples;
DimensionSlice *ret = NULL;

ScanKeyInit(&scankey[0],
Anum_dimension_slice_dimension_id_range_start_range_end_idx_dimension_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_id));

num_tuples = dimension_slice_scan_limit_direction_internal(
DIMENSION_SLICE_DIMENSION_ID_RANGE_START_RANGE_END_IDX,
scankey,
1,
dimension_slice_nth_tuple_found,
(void *) &ret,
n,
ForwardScanDirection,
AccessShareLock,
NULL,
CurrentMemoryContext);
if (num_tuples < n)
return NULL;

return ret;
}

int32
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
StrategyNumber start_strategy, int64 start_value,
Expand Down
1 change: 1 addition & 0 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSli
extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord);

extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_latest_slice(int32 dimension_id, int n);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_earliest_slice(int32 dimension_id, int n);
extern TSDLLEXPORT int32 ts_dimension_slice_oldest_valid_chunk_for_reorder(
int32 job_id, int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
StrategyNumber end_strategy, int64 end_value);
Expand Down
44 changes: 44 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1680,3 +1680,47 @@ ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_
return bucket_function->bucket_integer_width;
}
}

/*
* Get the width of a bucket
*/
int64
ts_continuous_agg_bucket_width(const ContinuousAggsBucketFunction *bucket_function)
{
int64 bucket_width;

if (bucket_function->bucket_fixed_interval == false)
{
/*
* There are several cases of variable-sized buckets:
* 1. Monthly buckets
* 2. Buckets with timezones
* 3. Cases 1 and 2 at the same time
*
* For months we simply take 30 days like on interval_to_int64 and
* multiply this number by the number of months in the bucket. This
* reduces the task to days/hours/minutes scenario.
*
* Days/hours/minutes case is handled the same way as for fixed-sized
* buckets. The refresh window at least two buckets in size is adequate
* for such corner cases as DST.
*/

/* bucket_function should always be specified for variable-sized buckets */
Assert(bucket_function != NULL);
/* ... and bucket_function->bucket_time_width too */
Assert(bucket_function->bucket_time_width != NULL);

/* Make a temporary copy of bucket_width */
Interval interval = *bucket_function->bucket_time_width;
interval.day += 30 * interval.month;
interval.month = 0;
bucket_width = ts_interval_value_to_internal(IntervalPGetDatum(&interval), INTERVALOID);
}
else
{
bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function);
}

return bucket_width;
}
2 changes: 2 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,5 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

extern TSDLLEXPORT int64
ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function);
extern TSDLLEXPORT int64
ts_continuous_agg_bucket_width(const ContinuousAggsBucketFunction *bucket_function);
85 changes: 50 additions & 35 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,30 @@
return res;
}

int32
policy_refresh_cagg_get_buckets_per_batch(const Jsonb *config, bool *isnull)
{
bool found;
int32 res = ts_jsonb_get_int32_field(config, POL_REFRESH_CONF_KEY_BUCKETS_PER_BATCH, &found);

*isnull = !found;
return res;
}

int32
policy_refresh_cagg_get_max_batches_per_execution(const Jsonb *config, bool *isnull)
{
bool found;
int32 res =
ts_jsonb_get_int32_field(config, POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_EXECUTION, &found);

if (!found)
res = 10; /* default value */

*isnull = !found;
return res;
}

/* returns false if a policy could not be found */
bool
policy_refresh_cagg_exists(int32 materialization_id)
Expand Down Expand Up @@ -445,39 +469,7 @@
else
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);

if (cagg->bucket_function->bucket_fixed_interval == false)
{
/*
* There are several cases of variable-sized buckets:
* 1. Monthly buckets
* 2. Buckets with timezones
* 3. Cases 1 and 2 at the same time
*
* For months we simply take 30 days like on interval_to_int64 and
* multiply this number by the number of months in the bucket. This
* reduces the task to days/hours/minutes scenario.
*
* Days/hours/minutes case is handled the same way as for fixed-sized
* buckets. The refresh window at least two buckets in size is adequate
* for such corner cases as DST.
*/

/* bucket_function should always be specified for variable-sized buckets */
Assert(cagg->bucket_function != NULL);
/* ... and bucket_function->bucket_time_width too */
Assert(cagg->bucket_function->bucket_time_width != NULL);

/* Make a temporary copy of bucket_width */
Interval interval = *cagg->bucket_function->bucket_time_width;
interval.day += 30 * interval.month;
interval.month = 0;
bucket_width = ts_interval_value_to_internal(IntervalPGetDatum(&interval), INTERVALOID);
}
else
{
bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
}

bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function);
Assert(bucket_width > 0);

if (ts_time_saturating_add(end_offset, bucket_width * 2, INT8OID) > start_offset)
Expand Down Expand Up @@ -530,7 +522,8 @@
Oid end_offset_type, NullableDatum end_offset,
Interval refresh_interval, bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data)
NullableDatum include_tiered_data, NullableDatum buckets_per_batch,
NullableDatum max_batches_per_execution)
{
NameData application_name;
NameData proc_name, proc_schema, check_name, check_schema, owner;
Expand Down Expand Up @@ -627,24 +620,38 @@
ts_jsonb_add_int32(parse_state,
POL_REFRESH_CONF_KEY_MAT_HYPERTABLE_ID,
cagg->data.mat_hypertable_id);

if (!policyconf.offset_start.isnull)
json_add_dim_interval_value(parse_state,
POL_REFRESH_CONF_KEY_START_OFFSET,
policyconf.offset_start.type,
policyconf.offset_start.value);
else
ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_START_OFFSET);

if (!policyconf.offset_end.isnull)
json_add_dim_interval_value(parse_state,
POL_REFRESH_CONF_KEY_END_OFFSET,
policyconf.offset_end.type,
policyconf.offset_end.value);
else
ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_END_OFFSET);

if (!include_tiered_data.isnull)
ts_jsonb_add_bool(parse_state,
POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA,
include_tiered_data.value);

if (!buckets_per_batch.isnull)
ts_jsonb_add_int32(parse_state,
POL_REFRESH_CONF_KEY_BUCKETS_PER_BATCH,
buckets_per_batch.value);

if (!max_batches_per_execution.isnull)
ts_jsonb_add_int32(parse_state,

Check warning on line 651 in tsl/src/bgw_policy/continuous_aggregate_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/continuous_aggregate_api.c#L651

Added line #L651 was not covered by tests
POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_EXECUTION,
max_batches_per_execution.value);

Check warning on line 653 in tsl/src/bgw_policy/continuous_aggregate_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/continuous_aggregate_api.c#L653

Added line #L653 was not covered by tests

JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
Jsonb *config = JsonbValueToJsonb(result);

Expand Down Expand Up @@ -676,6 +683,8 @@
bool if_not_exists;
NullableDatum start_offset, end_offset;
NullableDatum include_tiered_data;
NullableDatum buckets_per_batch;
NullableDatum max_batches_per_execution;

ts_feature_flag_check(FEATURE_POLICY);

Expand All @@ -700,6 +709,10 @@
char *valid_timezone = NULL;
include_tiered_data.value = PG_GETARG_DATUM(7);
include_tiered_data.isnull = PG_ARGISNULL(7);
buckets_per_batch.value = PG_GETARG_DATUM(8);
buckets_per_batch.isnull = PG_ARGISNULL(8);
max_batches_per_execution.value = PG_GETARG_DATUM(9);
max_batches_per_execution.isnull = PG_ARGISNULL(9);

Datum retval;
/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
Expand All @@ -723,7 +736,9 @@
fixed_schedule,
initial_start,
valid_timezone,
include_tiered_data);
include_tiered_data,
buckets_per_batch,
max_batches_per_execution);
if (!TIMESTAMP_NOT_FINITE(initial_start))
{
int32 job_id = DatumGetInt32(retval);
Expand Down
13 changes: 7 additions & 6 deletions tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dim
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull);
int32 policy_refresh_cagg_get_buckets_per_batch(const Jsonb *config, bool *isnull);
int32 policy_refresh_cagg_get_max_batches_per_execution(const Jsonb *config, bool *isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);

Datum policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type,
NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval,
bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data);
Datum policy_refresh_cagg_add_internal(
Oid cagg_oid, Oid start_offset_type, NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval, bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone, NullableDatum include_tiered_data,
NullableDatum buckets_per_batch, NullableDatum max_batches_per_execution);
Datum policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists);
Loading
Loading