diff --git a/sql/policy_api.sql b/sql/policy_api.sql index be8cc33cce1..57bb8ac5554 100644 --- a/sql/policy_api.sql +++ b/sql/policy_api.sql @@ -81,12 +81,16 @@ CREATE OR REPLACE PROCEDURE @extschema@.remove_columnstore_policy( /* continuous aggregates policy */ CREATE OR REPLACE FUNCTION @extschema@.add_continuous_aggregate_policy( - continuous_aggregate REGCLASS, start_offset "any", - end_offset "any", schedule_interval INTERVAL, + continuous_aggregate REGCLASS, + start_offset "any", + end_offset "any", + schedule_interval INTERVAL, if_not_exists BOOL = false, initial_start TIMESTAMPTZ = NULL, timezone TEXT = NULL, - include_tiered_data BOOL = NULL + include_tiered_data BOOL = NULL, + nbuckets_per_batch INTEGER = NULL, + max_batches_per_job_execution INTEGER = NULL ) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add' diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index f87547a7539..968c4fb7d42 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -66,3 +66,32 @@ CREATE INDEX compression_settings_compress_relid_idx ON _timescaledb_catalog.com DROP TABLE _timescaledb_catalog.tempsettings CASCADE; GRANT SELECT ON _timescaledb_catalog.compression_settings TO PUBLIC; SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_settings', ''); + + +-- New add_continuous_aggregate_policy API for incremental refresh policy +DROP FUNCTION @extschema@.add_continuous_aggregate_policy( + continuous_aggregate REGCLASS, + start_offset "any", + end_offset "any", + schedule_interval INTERVAL, + if_not_exists BOOL, + initial_start TIMESTAMPTZ, + timezone TEXT, + include_tiered_data BOOL +); + +CREATE FUNCTION @extschema@.add_continuous_aggregate_policy( + continuous_aggregate REGCLASS, + start_offset "any", + end_offset "any", + schedule_interval INTERVAL, + if_not_exists BOOL = false, + initial_start TIMESTAMPTZ = NULL, + timezone TEXT = NULL, + include_tiered_data BOOL = NULL, + nbuckets_per_batch INTEGER = NULL, + max_batches_per_job_execution INTEGER = NULL +) +RETURNS INTEGER +AS '@MODULE_PATHNAME@', 'ts_update_placeholder' +LANGUAGE C VOLATILE; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 1a5ee284824..db4846a1cf7 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -39,3 +39,31 @@ FROM DROP TABLE _timescaledb_catalog.tempsettings CASCADE; GRANT SELECT ON _timescaledb_catalog.compression_settings TO PUBLIC; SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_settings', ''); + +-- Revert add_continuous_aggregate_policy API for incremental refresh policy +DROP FUNCTION IF EXISTS @extschema@.add_continuous_aggregate_policy( + continuous_aggregate REGCLASS, + start_offset "any", + end_offset "any", + schedule_interval INTERVAL, + if_not_exists BOOL = false, + initial_start TIMESTAMPTZ = NULL, + timezone TEXT = NULL, + include_tiered_data BOOL = NULL, + nbuckets_per_batch INTEGER = NULL, + max_batches_per_job_execution INTEGER = NULL +); + +-- CREATE FUNCTION @extschema@.add_continuous_aggregate_policy( +-- continuous_aggregate REGCLASS, +-- start_offset "any", +-- end_offset "any", +-- schedule_interval INTERVAL, +-- if_not_exists BOOL = false, +-- initial_start TIMESTAMPTZ = NULL, +-- timezone TEXT = NULL, +-- include_tiered_data BOOL = NULL +-- ) +-- RETURNS INTEGER +-- AS '@MODULE_PATHNAME@', 'ts_update_placeholder' +-- LANGUAGE C VOLATILE; diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.c b/tsl/src/bgw_policy/continuous_aggregate_api.c index 179cf6c8297..bcd25a6b0cc 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.c +++ b/tsl/src/bgw_policy/continuous_aggregate_api.c @@ -146,6 +146,31 @@ policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull) return res; } +int32 +policy_refresh_cagg_get_nbuckets_per_batch(const Jsonb *config, bool *isnull) +{ + bool found; + int32 res = ts_jsonb_get_int32_field(config, POL_REFRESH_CONF_KEY_NBUCKETS_PER_BATCH, &found); + + *isnull = !found; + return res; +} + +int32 +policy_refresh_cagg_get_max_batches_per_job_execution(const Jsonb *config, bool *isnull) +{ + bool found; + int32 res = ts_jsonb_get_int32_field(config, + POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_JOB_EXECUTION, + &found); + + if (!found) + res = 10; /* default value */ + + *isnull = !found; + return res; +} + /* returns false if a policy could not be found */ bool policy_refresh_cagg_exists(int32 materialization_id) @@ -498,7 +523,9 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa Oid end_offset_type, NullableDatum end_offset, Interval refresh_interval, bool if_not_exists, bool fixed_schedule, TimestampTz initial_start, const char *timezone, - NullableDatum include_tiered_data) + NullableDatum include_tiered_data, + NullableDatum nbuckets_per_batch, + NullableDatum max_batches_per_job_execution) { NameData application_name; NameData proc_name, proc_schema, check_name, check_schema, owner; @@ -595,6 +622,7 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa ts_jsonb_add_int32(parse_state, POL_REFRESH_CONF_KEY_MAT_HYPERTABLE_ID, cagg->data.mat_hypertable_id); + if (!policyconf.offset_start.isnull) json_add_dim_interval_value(parse_state, POL_REFRESH_CONF_KEY_START_OFFSET, @@ -602,6 +630,7 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa policyconf.offset_start.value); else ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_START_OFFSET); + if (!policyconf.offset_end.isnull) json_add_dim_interval_value(parse_state, POL_REFRESH_CONF_KEY_END_OFFSET, @@ -609,10 +638,22 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa policyconf.offset_end.value); else ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_END_OFFSET); + if (!include_tiered_data.isnull) ts_jsonb_add_bool(parse_state, POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA, include_tiered_data.value); + + if (!nbuckets_per_batch.isnull) + ts_jsonb_add_int32(parse_state, + POL_REFRESH_CONF_KEY_NBUCKETS_PER_BATCH, + nbuckets_per_batch.value); + + if (!max_batches_per_job_execution.isnull) + ts_jsonb_add_int32(parse_state, + POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_JOB_EXECUTION, + max_batches_per_job_execution.value); + JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL); Jsonb *config = JsonbValueToJsonb(result); @@ -644,6 +685,8 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) bool if_not_exists; NullableDatum start_offset, end_offset; NullableDatum include_tiered_data; + NullableDatum nbuckets_per_batch; + NullableDatum max_batches_per_job_execution; ts_feature_flag_check(FEATURE_POLICY); @@ -668,6 +711,10 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) char *valid_timezone = NULL; include_tiered_data.value = PG_GETARG_DATUM(7); include_tiered_data.isnull = PG_ARGISNULL(7); + nbuckets_per_batch.value = PG_GETARG_DATUM(8); + nbuckets_per_batch.isnull = PG_ARGISNULL(8); + max_batches_per_job_execution.value = PG_GETARG_DATUM(9); + max_batches_per_job_execution.isnull = PG_ARGISNULL(9); Datum retval; /* if users pass in -infinity for initial_start, then use the current_timestamp instead */ @@ -691,7 +738,9 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) fixed_schedule, initial_start, valid_timezone, - include_tiered_data); + include_tiered_data, + nbuckets_per_batch, + max_batches_per_job_execution); if (!TIMESTAMP_NOT_FINITE(initial_start)) { int32 job_id = DatumGetInt32(retval); diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.h b/tsl/src/bgw_policy/continuous_aggregate_api.h index 50588b424d8..8fbd858d9b9 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.h +++ b/tsl/src/bgw_policy/continuous_aggregate_api.h @@ -21,14 +21,15 @@ int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dim int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config, bool *end_isnull); bool policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull); +int32 policy_refresh_cagg_get_nbuckets_per_batch(const Jsonb *config, bool *isnull); +int32 policy_refresh_cagg_get_max_batches_per_job_execution(const Jsonb *config, bool *isnull); bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type, Datum cmp_interval); bool policy_refresh_cagg_exists(int32 materialization_id); -Datum policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, - NullableDatum start_offset, Oid end_offset_type, - NullableDatum end_offset, Interval refresh_interval, - bool if_not_exists, bool fixed_schedule, - TimestampTz initial_start, const char *timezone, - NullableDatum include_tiered_data); +Datum policy_refresh_cagg_add_internal( + Oid cagg_oid, Oid start_offset_type, NullableDatum start_offset, Oid end_offset_type, + NullableDatum end_offset, Interval refresh_interval, bool if_not_exists, bool fixed_schedule, + TimestampTz initial_start, const char *timezone, NullableDatum include_tiered_data, + NullableDatum nbuckets_per_batch, NullableDatum max_batches_per_job_execution); Datum policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists); diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index 4a6a5aa9f1b..fe4b9173399 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -392,7 +392,7 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config) /* Try to split window range into a list of ranges */ List *refresh_window_list = continuous_agg_split_refresh_window(policy_data.cagg, &policy_data.refresh_window, - 0 /* disabled */); + policy_data.nbuckets_per_batch); if (refresh_window_list == NIL) { refresh_window_list = lappend(refresh_window_list, &policy_data.refresh_window); @@ -435,8 +435,10 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD const Dimension *open_dim; Oid dim_type; int64 refresh_start, refresh_end; + int32 nbuckets_per_batch, max_batches_per_job_execution; bool start_isnull, end_isnull; bool include_tiered_data, include_tiered_data_isnull; + bool nbuckets_per_batch_isnull, max_batches_per_job_execution_isnull; materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config); mat_ht = ts_hypertable_get_by_id(materialization_id); @@ -466,6 +468,12 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD include_tiered_data = policy_refresh_cagg_get_include_tiered_data(config, &include_tiered_data_isnull); + nbuckets_per_batch = + policy_refresh_cagg_get_nbuckets_per_batch(config, &nbuckets_per_batch_isnull); + + max_batches_per_job_execution = policy_refresh_cagg_get_max_batches_per_job_execution( + config, &max_batches_per_job_execution_isnull); + if (policy_data) { policy_data->refresh_window.type = dim_type; @@ -476,6 +484,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD policy_data->cagg = cagg; policy_data->include_tiered_data = include_tiered_data; policy_data->include_tiered_data_isnull = include_tiered_data_isnull; + policy_data->nbuckets_per_batch = nbuckets_per_batch; + policy_data->max_batches_per_job_execution = max_batches_per_job_execution; } } diff --git a/tsl/src/bgw_policy/job.h b/tsl/src/bgw_policy/job.h index 93a8738b4c4..84a8db2b988 100644 --- a/tsl/src/bgw_policy/job.h +++ b/tsl/src/bgw_policy/job.h @@ -38,6 +38,8 @@ typedef struct PolicyContinuousAggData ContinuousAgg *cagg; bool include_tiered_data; bool include_tiered_data_isnull; + int32 nbuckets_per_batch; + int32 max_batches_per_job_execution; } PolicyContinuousAggData; typedef struct PolicyCompressionData diff --git a/tsl/src/bgw_policy/policies_v2.c b/tsl/src/bgw_policy/policies_v2.c index eeb1643abcf..366c1f5353b 100644 --- a/tsl/src/bgw_policy/policies_v2.c +++ b/tsl/src/bgw_policy/policies_v2.c @@ -207,6 +207,8 @@ validate_and_create_policies(policies_info all_policies, bool if_exists) if (all_policies.refresh && all_policies.refresh->create_policy) { NullableDatum include_tiered_data = { .isnull = true }; + NullableDatum nbuckets_per_refresh = { .isnull = true }; + NullableDatum max_batches_per_job_execution = { .isnull = true }; if (all_policies.is_alter_policy) policy_refresh_cagg_remove_internal(all_policies.rel_oid, if_exists); @@ -220,7 +222,9 @@ validate_and_create_policies(policies_info all_policies, bool if_exists) false, DT_NOBEGIN, NULL, - include_tiered_data); + include_tiered_data, + nbuckets_per_refresh, + max_batches_per_job_execution); } if (all_policies.compress && all_policies.compress->create_policy) { diff --git a/tsl/src/bgw_policy/policies_v2.h b/tsl/src/bgw_policy/policies_v2.h index 03535adb4e4..885b6171b30 100644 --- a/tsl/src/bgw_policy/policies_v2.h +++ b/tsl/src/bgw_policy/policies_v2.h @@ -20,6 +20,8 @@ #define POL_REFRESH_CONF_KEY_START_OFFSET "start_offset" #define POL_REFRESH_CONF_KEY_END_OFFSET "end_offset" #define POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA "include_tiered_data" +#define POL_REFRESH_CONF_KEY_NBUCKETS_PER_BATCH "nbuckets_per_batch" +#define POL_REFRESH_CONF_KEY_MAX_BATCHES_PER_JOB_EXECUTION "max_batches_per_job_execution" #define POLICY_COMPRESSION_PROC_NAME "policy_compression" #define POLICY_COMPRESSION_CHECK_NAME "policy_compression_check" diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index fbdd5f49cfa..d0f637872cf 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -909,7 +909,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, } static void -debug_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, +debug_refresh_window(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, const char *msg) { return; @@ -923,7 +923,7 @@ debug_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRa getTypeOutputInfo(refresh_window->type, &outfuncid, &isvarlena); Assert(!isvarlena); - elog(elevel, + elog(DEBUG1, "%s \"%s\" in window [ %s, %s ] internal [ " INT64_FORMAT ", " INT64_FORMAT " ] minimum [ %s ]", msg, @@ -939,13 +939,11 @@ debug_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRa List * continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *original_refresh_window, - int32 range_factor) + int32 nbuckets_per_batch) { - /* Do not produce batches when the range_range factor = 0 (disabled) */ - if (range_factor == 0) + /* Do not produce batches when the number of buckets per batch is zero (disabled) */ + if (nbuckets_per_batch == 0) { - // refresh_window_list = lappend(refresh_window_list, &original_refresh_window); - // return refresh_window_list; return NIL; } @@ -957,7 +955,7 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig .end_isnull = original_refresh_window->end_isnull, }; - debug_refresh_window(INFO, cagg, &refresh_window, "begin"); + debug_refresh_window(cagg, &refresh_window, "begin"); Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id); @@ -965,7 +963,7 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig */ if (refresh_window.start_isnull) { - debug_refresh_window(INFO, cagg, &refresh_window, "START IS NULL"); + debug_refresh_window(cagg, &refresh_window, "START IS NULL"); refresh_window.start = ts_hypertable_get_min_dimension_slice(ht, 0, &refresh_window.start_isnull); @@ -974,15 +972,13 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig TS_TIME_IS_MIN(refresh_window.start, refresh_window.type) || TS_TIME_IS_NOBEGIN(refresh_window.start, refresh_window.type)) { - // MemoryContextSwitchTo(oldcontext); - // refresh_window_list = lappend(refresh_window_list, &original_refresh_window); return NIL; } } if (refresh_window.end_isnull) { - debug_refresh_window(INFO, cagg, &refresh_window, "END IS NULL"); + debug_refresh_window(cagg, &refresh_window, "END IS NULL"); refresh_window.end = ts_hypertable_get_max_dimension_slice(ht, 0, &refresh_window.end_isnull); @@ -990,9 +986,6 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig if (refresh_window.end_isnull || TS_TIME_IS_MAX(refresh_window.end, refresh_window.type) || TS_TIME_IS_NOEND(refresh_window.end, refresh_window.type)) { - // MemoryContextSwitchTo(oldcontext); - // refresh_window_list = lappend(refresh_window_list, &original_refresh_window); - // return refresh_window_list; return NIL; } } @@ -1001,17 +994,16 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig * executions */ int64 bucket_width = ts_continuous_agg_bucket_width(cagg->bucket_function); int64 refresh_size = refresh_window.end - refresh_window.start; - int64 batch_size = (bucket_width * range_factor); + int64 batch_size = (bucket_width * nbuckets_per_batch); int64 estimated_batches = refresh_size / batch_size; + if (estimated_batches > ts_guc_cagg_max_individual_materializations || refresh_size <= batch_size) { - // refresh_window_list = lappend(refresh_window_list, &original_refresh_window); - // return refresh_window_list; return NIL; } - debug_refresh_window(INFO, cagg, &refresh_window, "before produce ranges"); + debug_refresh_window(cagg, &refresh_window, "before produce ranges"); const Dimension *time_dim; time_dim = hyperspace_get_open_dimension(ht->space, 0); @@ -1106,7 +1098,7 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig refresh_window_list = lappend(refresh_window_list, range); MemoryContextSwitchTo(saved_context); - debug_refresh_window(INFO, cagg, range, "range refresh"); + debug_refresh_window(cagg, range, "range refresh"); } res = SPI_finish(); diff --git a/tsl/src/continuous_aggs/refresh.h b/tsl/src/continuous_aggs/refresh.h index 3518131b206..b7919535f81 100644 --- a/tsl/src/continuous_aggs/refresh.h +++ b/tsl/src/continuous_aggs/refresh.h @@ -23,4 +23,4 @@ extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg, bool force); extern List *continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *original_refresh_window, - int32 range_factor); + int32 nbuckets_per_batch); diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index 24ac7500e8f..891aef9c5fa 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -214,7 +214,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text ts_now_mock() add_columnstore_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval,boolean) add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval,boolean) - add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text,boolean) + add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text,boolean,integer,integer) add_dimension(regclass,_timescaledb_internal.dimension_info,boolean) add_dimension(regclass,name,integer,anyelement,regproc,boolean) add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text)