Skip to content

Commit

Permalink
Refactor parameter of invalidation_state_init
Browse files Browse the repository at this point in the history
So far, the invalidation_state_init function used a loop to find the
proper CAgg bucket function. This PR refactors the parameter of the
function and passes the needed information directly.
  • Loading branch information
jnidzwetzki committed Mar 19, 2024
1 parent 50cc8e3 commit c05d730
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 44 deletions.
42 changes: 11 additions & 31 deletions tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ static Invalidation cut_cagg_invalidation_and_compute_remainder(
const Invalidation *mergedentry, const Invalidation *current_remainder);
static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window);
static void invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id,
int32 raw_hypertable_id, Oid dimtype,
const CaggsInfo *all_caggs);
static void invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg,
Oid dimtype, const CaggsInfo *all_caggs);
static void invalidation_state_cleanup(const CaggInvalidationState *state);

static Relation
Expand Down Expand Up @@ -944,33 +943,19 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
}

static void
invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id,
int32 raw_hypertable_id, Oid dimtype, const CaggsInfo *all_caggs)
invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg, Oid dimtype,
const CaggsInfo *all_caggs)
{
ListCell *lc1, *lc2;
bool PG_USED_FOR_ASSERTS_ONLY found = false;

state->mat_hypertable_id = mat_hypertable_id;
state->raw_hypertable_id = raw_hypertable_id;
state->bucket_function = cagg->bucket_function;
state->mat_hypertable_id = cagg->data.mat_hypertable_id;
state->raw_hypertable_id = cagg->data.raw_hypertable_id;
state->dimtype = dimtype;
state->all_caggs = all_caggs;
state->cagg_log_rel = open_invalidation_log(LOG_CAGG, RowExclusiveLock);
state->per_tuple_mctx = AllocSetContextCreate(CurrentMemoryContext,
"Continuous aggregate invalidations",
ALLOCSET_DEFAULT_SIZES);
state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
forboth (lc1, all_caggs->mat_hypertable_ids, lc2, all_caggs->bucket_functions)
{
int32 cagg_hyper_id = lfirst_int(lc1);

if (cagg_hyper_id == mat_hypertable_id)
{
state->bucket_function = lfirst(lc2);
found = true;
break;
}
}
Assert(found);
}

static void
Expand All @@ -982,19 +967,18 @@ invalidation_state_cleanup(const CaggInvalidationState *state)
}

void
invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hypertable_id, Oid dimtype,
invalidation_process_hypertable_log(const ContinuousAgg *cagg, Oid dimtype,
const CaggsInfo *all_caggs)
{
CaggInvalidationState state;

invalidation_state_init(&state, mat_hypertable_id, raw_hypertable_id, dimtype, all_caggs);
invalidation_state_init(&state, cagg, dimtype, all_caggs);
move_invalidations_from_hyper_to_cagg_log(&state);
invalidation_state_cleanup(&state);
}

InvalidationStore *
invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
const InternalTimeRange *refresh_window,
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx)
Expand All @@ -1005,11 +989,7 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,

*do_merged_refresh = false;

invalidation_state_init(&state,
mat_hypertable_id,
raw_hypertable_id,
refresh_window->type,
all_caggs_info);
invalidation_state_init(&state, cagg, refresh_window->type, all_caggs_info);
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
clear_cagg_invalidations_for_refresh(&state, refresh_window);
count = tuplestore_tuple_count(state.invalidations);
Expand Down
15 changes: 8 additions & 7 deletions tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ extern void continuous_agg_invalidate_raw_ht(const Hypertable *raw_ht, int64 sta
extern void continuous_agg_invalidate_mat_ht(const Hypertable *raw_ht, const Hypertable *mat_ht,
int64 start, int64 end);

extern void invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
Oid dimtype, const CaggsInfo *all_caggs_info);

extern InvalidationStore *invalidation_process_cagg_log(
int32 mat_hypertable_id, int32 raw_hypertable_id, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations, bool *do_merged_refresh,
InternalTimeRange *ret_merged_refresh_window, const CaggRefreshCallContext callctx);
extern void invalidation_process_hypertable_log(const ContinuousAgg *cagg, Oid dimtype,
const CaggsInfo *all_caggs_info);

extern InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx);

extern void invalidation_store_free(InvalidationStore *store);
8 changes: 2 additions & 6 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,7 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
LockRelationOid(hyper_relid, ExclusiveLock);
const CaggsInfo all_caggs_info =
ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id);
invalidations = invalidation_process_cagg_log(cagg->data.mat_hypertable_id,
cagg->data.raw_hypertable_id,
invalidations = invalidation_process_cagg_log(cagg,
refresh_window,
&all_caggs_info,
ts_guc_cagg_max_individual_materializations,
Expand Down Expand Up @@ -787,10 +786,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
/* Process invalidations in the hypertable invalidation log */
const CaggsInfo all_caggs_info =
ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id);
invalidation_process_hypertable_log(cagg->data.mat_hypertable_id,
cagg->data.raw_hypertable_id,
refresh_window.type,
&all_caggs_info);
invalidation_process_hypertable_log(cagg, refresh_window.type, &all_caggs_info);

/* Commit and Start a new transaction */
SPI_commit_and_chain();
Expand Down

0 comments on commit c05d730

Please sign in to comment.