From c05d7306d4e860f238251034beaceb05eb9a4667 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Mon, 18 Mar 2024 17:03:26 +0100 Subject: [PATCH] Refactor parameter of invalidation_state_init So far, the invalidation_state_init function used a loop to find the proper CAgg bucket function. This PR refactors the parameter of the function and passes the needed information directly. --- tsl/src/continuous_aggs/invalidation.c | 42 +++++++------------------- tsl/src/continuous_aggs/invalidation.h | 15 ++++----- tsl/src/continuous_aggs/refresh.c | 8 ++--- 3 files changed, 21 insertions(+), 44 deletions(-) diff --git a/tsl/src/continuous_aggs/invalidation.c b/tsl/src/continuous_aggs/invalidation.c index 0a826325124..e09478ff8b0 100644 --- a/tsl/src/continuous_aggs/invalidation.c +++ b/tsl/src/continuous_aggs/invalidation.c @@ -141,9 +141,8 @@ static Invalidation cut_cagg_invalidation_and_compute_remainder( const Invalidation *mergedentry, const Invalidation *current_remainder); static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state, const InternalTimeRange *refresh_window); -static void invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id, - int32 raw_hypertable_id, Oid dimtype, - const CaggsInfo *all_caggs); +static void invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg, + Oid dimtype, const CaggsInfo *all_caggs); static void invalidation_state_cleanup(const CaggInvalidationState *state); static Relation @@ -944,14 +943,12 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state, } static void -invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id, - int32 raw_hypertable_id, Oid dimtype, const CaggsInfo *all_caggs) +invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg, Oid dimtype, + const CaggsInfo *all_caggs) { - ListCell *lc1, *lc2; - bool PG_USED_FOR_ASSERTS_ONLY found = false; - - state->mat_hypertable_id = mat_hypertable_id; - state->raw_hypertable_id = raw_hypertable_id; + state->bucket_function = cagg->bucket_function; + state->mat_hypertable_id = cagg->data.mat_hypertable_id; + state->raw_hypertable_id = cagg->data.raw_hypertable_id; state->dimtype = dimtype; state->all_caggs = all_caggs; state->cagg_log_rel = open_invalidation_log(LOG_CAGG, RowExclusiveLock); @@ -959,18 +956,6 @@ invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id, "Continuous aggregate invalidations", ALLOCSET_DEFAULT_SIZES); state->snapshot = RegisterSnapshot(GetTransactionSnapshot()); - forboth (lc1, all_caggs->mat_hypertable_ids, lc2, all_caggs->bucket_functions) - { - int32 cagg_hyper_id = lfirst_int(lc1); - - if (cagg_hyper_id == mat_hypertable_id) - { - state->bucket_function = lfirst(lc2); - found = true; - break; - } - } - Assert(found); } static void @@ -982,19 +967,18 @@ invalidation_state_cleanup(const CaggInvalidationState *state) } void -invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hypertable_id, Oid dimtype, +invalidation_process_hypertable_log(const ContinuousAgg *cagg, Oid dimtype, const CaggsInfo *all_caggs) { CaggInvalidationState state; - invalidation_state_init(&state, mat_hypertable_id, raw_hypertable_id, dimtype, all_caggs); + invalidation_state_init(&state, cagg, dimtype, all_caggs); move_invalidations_from_hyper_to_cagg_log(&state); invalidation_state_cleanup(&state); } InvalidationStore * -invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id, - const InternalTimeRange *refresh_window, +invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, const CaggsInfo *all_caggs_info, const long max_materializations, bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window, const CaggRefreshCallContext callctx) @@ -1005,11 +989,7 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id, *do_merged_refresh = false; - invalidation_state_init(&state, - mat_hypertable_id, - raw_hypertable_id, - refresh_window->type, - all_caggs_info); + invalidation_state_init(&state, cagg, refresh_window->type, all_caggs_info); state.invalidations = tuplestore_begin_heap(false, false, work_mem); clear_cagg_invalidations_for_refresh(&state, refresh_window); count = tuplestore_tuple_count(state.invalidations); diff --git a/tsl/src/continuous_aggs/invalidation.h b/tsl/src/continuous_aggs/invalidation.h index 300975a85c6..0b3a785170f 100644 --- a/tsl/src/continuous_aggs/invalidation.h +++ b/tsl/src/continuous_aggs/invalidation.h @@ -42,12 +42,13 @@ extern void continuous_agg_invalidate_raw_ht(const Hypertable *raw_ht, int64 sta extern void continuous_agg_invalidate_mat_ht(const Hypertable *raw_ht, const Hypertable *mat_ht, int64 start, int64 end); -extern void invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hypertable_id, - Oid dimtype, const CaggsInfo *all_caggs_info); - -extern InvalidationStore *invalidation_process_cagg_log( - int32 mat_hypertable_id, int32 raw_hypertable_id, const InternalTimeRange *refresh_window, - const CaggsInfo *all_caggs_info, const long max_materializations, bool *do_merged_refresh, - InternalTimeRange *ret_merged_refresh_window, const CaggRefreshCallContext callctx); +extern void invalidation_process_hypertable_log(const ContinuousAgg *cagg, Oid dimtype, + const CaggsInfo *all_caggs_info); + +extern InvalidationStore * +invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, + const CaggsInfo *all_caggs_info, const long max_materializations, + bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window, + const CaggRefreshCallContext callctx); extern void invalidation_store_free(InvalidationStore *store); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 8469a8cf0af..37c92a688db 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -635,8 +635,7 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, LockRelationOid(hyper_relid, ExclusiveLock); const CaggsInfo all_caggs_info = ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id); - invalidations = invalidation_process_cagg_log(cagg->data.mat_hypertable_id, - cagg->data.raw_hypertable_id, + invalidations = invalidation_process_cagg_log(cagg, refresh_window, &all_caggs_info, ts_guc_cagg_max_individual_materializations, @@ -787,10 +786,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, /* Process invalidations in the hypertable invalidation log */ const CaggsInfo all_caggs_info = ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id); - invalidation_process_hypertable_log(cagg->data.mat_hypertable_id, - cagg->data.raw_hypertable_id, - refresh_window.type, - &all_caggs_info); + invalidation_process_hypertable_log(cagg, refresh_window.type, &all_caggs_info); /* Commit and Start a new transaction */ SPI_commit_and_chain();