Skip to content

Commit

Permalink
API changes for add_continuous_aggregate_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Mar 4, 2025
1 parent a6f82d2 commit d8a3abb
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 35 deletions.
10 changes: 7 additions & 3 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ CREATE OR REPLACE PROCEDURE @[email protected]_columnstore_policy(

/* continuous aggregates policy */
CREATE OR REPLACE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
include_tiered_data BOOL = NULL,
nbuckets_per_batch INTEGER = NULL,
max_batches_per_job_execution INTEGER = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
Expand Down
29 changes: 29 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,32 @@ CREATE INDEX compression_settings_compress_relid_idx ON _timescaledb_catalog.com
DROP TABLE _timescaledb_catalog.tempsettings CASCADE;
GRANT SELECT ON _timescaledb_catalog.compression_settings TO PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_settings', '');


-- New add_continuous_aggregate_policy API for incremental refresh policy
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT,
include_tiered_data BOOL
);

CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL,
nbuckets_per_batch INTEGER = NULL,
max_batches_per_job_execution INTEGER = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
28 changes: 28 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,31 @@ FROM
DROP TABLE _timescaledb_catalog.tempsettings CASCADE;
GRANT SELECT ON _timescaledb_catalog.compression_settings TO PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_settings', '');

-- Revert add_continuous_aggregate_policy API for incremental refresh policy
DROP FUNCTION IF EXISTS @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS,
start_offset "any",
end_offset "any",
schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL,
nbuckets_per_batch INTEGER = NULL,
max_batches_per_job_execution INTEGER = NULL
);

-- CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
-- continuous_aggregate REGCLASS,
-- start_offset "any",
-- end_offset "any",
-- schedule_interval INTERVAL,
-- if_not_exists BOOL = false,
-- initial_start TIMESTAMPTZ = NULL,
-- timezone TEXT = NULL,
-- include_tiered_data BOOL = NULL
-- )
-- RETURNS INTEGER
-- AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
-- LANGUAGE C VOLATILE;
53 changes: 51 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull)
return res;
}

int32
policy_refresh_cagg_get_nbuckets_per_batch(const Jsonb *config, bool *isnull)
{
bool found;
int32 res = ts_jsonb_get_int32_field(config, POL_REFRESH_CONF_KEY_NBUCKETS_PER_BATCH, &found);

*isnull = !found;
return res;
}

int32
policy_refresh_cagg_get_max_batches_per_job_execution(const Jsonb *config, bool *isnull)
{
bool found;
int32 res = ts_jsonb_get_int32_field(config,
POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_JOB_EXECUTION,
&found);

if (!found)
res = 10; /* default value */

*isnull = !found;
return res;
}

/* returns false if a policy could not be found */
bool
policy_refresh_cagg_exists(int32 materialization_id)
Expand Down Expand Up @@ -498,7 +523,9 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
Oid end_offset_type, NullableDatum end_offset,
Interval refresh_interval, bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data)
NullableDatum include_tiered_data,
NullableDatum nbuckets_per_batch,
NullableDatum max_batches_per_job_execution)
{
NameData application_name;
NameData proc_name, proc_schema, check_name, check_schema, owner;
Expand Down Expand Up @@ -595,24 +622,38 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
ts_jsonb_add_int32(parse_state,
POL_REFRESH_CONF_KEY_MAT_HYPERTABLE_ID,
cagg->data.mat_hypertable_id);

if (!policyconf.offset_start.isnull)
json_add_dim_interval_value(parse_state,
POL_REFRESH_CONF_KEY_START_OFFSET,
policyconf.offset_start.type,
policyconf.offset_start.value);
else
ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_START_OFFSET);

if (!policyconf.offset_end.isnull)
json_add_dim_interval_value(parse_state,
POL_REFRESH_CONF_KEY_END_OFFSET,
policyconf.offset_end.type,
policyconf.offset_end.value);
else
ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_END_OFFSET);

if (!include_tiered_data.isnull)
ts_jsonb_add_bool(parse_state,
POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA,
include_tiered_data.value);

if (!nbuckets_per_batch.isnull)
ts_jsonb_add_int32(parse_state,

Check warning on line 648 in tsl/src/bgw_policy/continuous_aggregate_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/continuous_aggregate_api.c#L648

Added line #L648 was not covered by tests
POL_REFRESH_CONF_KEY_NBUCKETS_PER_BATCH,
nbuckets_per_batch.value);

Check warning on line 650 in tsl/src/bgw_policy/continuous_aggregate_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/continuous_aggregate_api.c#L650

Added line #L650 was not covered by tests

if (!max_batches_per_job_execution.isnull)
ts_jsonb_add_int32(parse_state,

Check warning on line 653 in tsl/src/bgw_policy/continuous_aggregate_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/continuous_aggregate_api.c#L653

Added line #L653 was not covered by tests
POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_JOB_EXECUTION,
max_batches_per_job_execution.value);

Check warning on line 655 in tsl/src/bgw_policy/continuous_aggregate_api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/continuous_aggregate_api.c#L655

Added line #L655 was not covered by tests

JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
Jsonb *config = JsonbValueToJsonb(result);

Expand Down Expand Up @@ -644,6 +685,8 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
bool if_not_exists;
NullableDatum start_offset, end_offset;
NullableDatum include_tiered_data;
NullableDatum nbuckets_per_batch;
NullableDatum max_batches_per_job_execution;

ts_feature_flag_check(FEATURE_POLICY);

Expand All @@ -668,6 +711,10 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
char *valid_timezone = NULL;
include_tiered_data.value = PG_GETARG_DATUM(7);
include_tiered_data.isnull = PG_ARGISNULL(7);
nbuckets_per_batch.value = PG_GETARG_DATUM(8);
nbuckets_per_batch.isnull = PG_ARGISNULL(8);
max_batches_per_job_execution.value = PG_GETARG_DATUM(9);
max_batches_per_job_execution.isnull = PG_ARGISNULL(9);

Datum retval;
/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
Expand All @@ -691,7 +738,9 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
fixed_schedule,
initial_start,
valid_timezone,
include_tiered_data);
include_tiered_data,
nbuckets_per_batch,
max_batches_per_job_execution);
if (!TIMESTAMP_NOT_FINITE(initial_start))
{
int32 job_id = DatumGetInt32(retval);
Expand Down
13 changes: 7 additions & 6 deletions tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dim
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull);
int32 policy_refresh_cagg_get_nbuckets_per_batch(const Jsonb *config, bool *isnull);
int32 policy_refresh_cagg_get_max_batches_per_job_execution(const Jsonb *config, bool *isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);

Datum policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type,
NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval,
bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data);
Datum policy_refresh_cagg_add_internal(
Oid cagg_oid, Oid start_offset_type, NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval, bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone, NullableDatum include_tiered_data,
NullableDatum nbuckets_per_batch, NullableDatum max_batches_per_job_execution);
Datum policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists);
12 changes: 11 additions & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
/* Try to split window range into a list of ranges */
List *refresh_window_list = continuous_agg_split_refresh_window(policy_data.cagg,
&policy_data.refresh_window,
0 /* disabled */);
policy_data.nbuckets_per_batch);
if (refresh_window_list == NIL)
{
refresh_window_list = lappend(refresh_window_list, &policy_data.refresh_window);
Expand Down Expand Up @@ -435,8 +435,10 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
const Dimension *open_dim;
Oid dim_type;
int64 refresh_start, refresh_end;
int32 nbuckets_per_batch, max_batches_per_job_execution;
bool start_isnull, end_isnull;
bool include_tiered_data, include_tiered_data_isnull;
bool nbuckets_per_batch_isnull, max_batches_per_job_execution_isnull;

materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config);
mat_ht = ts_hypertable_get_by_id(materialization_id);
Expand Down Expand Up @@ -466,6 +468,12 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
include_tiered_data =
policy_refresh_cagg_get_include_tiered_data(config, &include_tiered_data_isnull);

nbuckets_per_batch =
policy_refresh_cagg_get_nbuckets_per_batch(config, &nbuckets_per_batch_isnull);

max_batches_per_job_execution = policy_refresh_cagg_get_max_batches_per_job_execution(
config, &max_batches_per_job_execution_isnull);

if (policy_data)
{
policy_data->refresh_window.type = dim_type;
Expand All @@ -476,6 +484,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->cagg = cagg;
policy_data->include_tiered_data = include_tiered_data;
policy_data->include_tiered_data_isnull = include_tiered_data_isnull;
policy_data->nbuckets_per_batch = nbuckets_per_batch;
policy_data->max_batches_per_job_execution = max_batches_per_job_execution;
}
}

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/bgw_policy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ typedef struct PolicyContinuousAggData
ContinuousAgg *cagg;
bool include_tiered_data;
bool include_tiered_data_isnull;
int32 nbuckets_per_batch;
int32 max_batches_per_job_execution;
} PolicyContinuousAggData;

typedef struct PolicyCompressionData
Expand Down
6 changes: 5 additions & 1 deletion tsl/src/bgw_policy/policies_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
if (all_policies.refresh && all_policies.refresh->create_policy)
{
NullableDatum include_tiered_data = { .isnull = true };
NullableDatum nbuckets_per_refresh = { .isnull = true };
NullableDatum max_batches_per_job_execution = { .isnull = true };

if (all_policies.is_alter_policy)
policy_refresh_cagg_remove_internal(all_policies.rel_oid, if_exists);
Expand All @@ -220,7 +222,9 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
false,
DT_NOBEGIN,
NULL,
include_tiered_data);
include_tiered_data,
nbuckets_per_refresh,
max_batches_per_job_execution);
}
if (all_policies.compress && all_policies.compress->create_policy)
{
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/bgw_policy/policies_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#define POL_REFRESH_CONF_KEY_START_OFFSET "start_offset"
#define POL_REFRESH_CONF_KEY_END_OFFSET "end_offset"
#define POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA "include_tiered_data"
#define POL_REFRESH_CONF_KEY_NBUCKETS_PER_BATCH "nbuckets_per_batch"
#define POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_JOB_EXECUTION "max_batches_per_job_execution"

#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_COMPRESSION_CHECK_NAME "policy_compression_check"
Expand Down
Loading

0 comments on commit d8a3abb

Please sign in to comment.