diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index 6d9b08fee65..b9184797998 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -368,7 +368,7 @@ vector_agg_exec(CustomScanState *node) /* * Finally, pass the compressed batch to the grouping policy. */ - grouping->gp_add_batch(grouping, batch_state); + grouping->gp_add_batch(grouping, &batch_state->decompressed_scan_slot_data.base); } /* diff --git a/tsl/src/nodes/vector_agg/grouping_policy.h b/tsl/src/nodes/vector_agg/grouping_policy.h index 30a6dfd4690..9c7a7a30095 100644 --- a/tsl/src/nodes/vector_agg/grouping_policy.h +++ b/tsl/src/nodes/vector_agg/grouping_policy.h @@ -5,7 +5,8 @@ */ #pragma once -typedef struct DecompressBatchState DecompressBatchState; +#include +#include typedef struct GroupingPolicy GroupingPolicy; @@ -31,7 +32,7 @@ typedef struct GroupingPolicy /* * Aggregate a single compressed batch. */ - void (*gp_add_batch)(GroupingPolicy *gp, DecompressBatchState *batch_state); + void (*gp_add_batch)(GroupingPolicy *gp, TupleTableSlot *vector_slot); /* * Is a partial aggregation result ready? diff --git a/tsl/src/nodes/vector_agg/grouping_policy_batch.c b/tsl/src/nodes/vector_agg/grouping_policy_batch.c index c9b4249c3f4..ff847891c37 100644 --- a/tsl/src/nodes/vector_agg/grouping_policy_batch.c +++ b/tsl/src/nodes/vector_agg/grouping_policy_batch.c @@ -12,13 +12,14 @@ #include +#include #include #include #include "grouping_policy.h" -#include "nodes/decompress_chunk/compressed_batch.h" #include "nodes/vector_agg/exec.h" +#include "nodes/vector_agg/vector_slot.h" typedef struct { @@ -113,13 +114,15 @@ gp_batch_reset(GroupingPolicy *obj) } static void -compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batch_state, +compute_single_aggregate(GroupingPolicyBatch *policy, TupleTableSlot *vector_slot, VectorAggDef *agg_def, void *agg_state, MemoryContext agg_extra_mctx) { - ArrowArray *arg_arrow = NULL; + const ArrowArray *arg_arrow = NULL; const uint64 *arg_validity_bitmap = NULL; Datum arg_datum = 0; bool arg_isnull = true; + uint16 total_batch_rows = 0; + const uint64 *vector_qual_result = vector_slot_get_qual_result(vector_slot, &total_batch_rows); /* * We have functions with one argument, and one function with no arguments @@ -127,7 +130,10 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc */ if (agg_def->input_offset >= 0) { - CompressedColumnValues *values = &batch_state->compressed_columns[agg_def->input_offset]; + const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset); + const CompressedColumnValues *values = + vector_slot_get_compressed_column_values(vector_slot, attnum); + Assert(values->decompression_type != DT_Invalid); Assert(values->decompression_type != DT_Iterator); @@ -147,10 +153,10 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc /* * Compute the unified validity bitmap. */ - const size_t num_words = (batch_state->total_batch_rows + 63) / 64; + const size_t num_words = (total_batch_rows + 63) / 64; const uint64 *filter = arrow_combine_validity(num_words, policy->tmp_filter, - batch_state->vector_qual_result, + vector_qual_result, agg_def->filter_result, arg_validity_bitmap); @@ -172,7 +178,7 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc * have been skipped by the caller, but we also have to check for the * case when no rows match the aggregate FILTER clause. */ - const int n = arrow_num_valid(filter, batch_state->total_batch_rows); + const int n = arrow_num_valid(filter, total_batch_rows); if (n > 0) { agg_def->func.agg_scalar(agg_state, arg_datum, arg_isnull, n, agg_extra_mctx); @@ -181,15 +187,17 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc } static void -gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) +gp_batch_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot) { GroupingPolicyBatch *policy = (GroupingPolicyBatch *) gp; + uint16 total_batch_rows = 0; + vector_slot_get_qual_result(vector_slot, &total_batch_rows); /* * Allocate the temporary filter array for computing the combined results of * batch filter, aggregate filter and column validity. */ - const size_t num_words = (batch_state->total_batch_rows + 63) / 64; + const size_t num_words = (total_batch_rows + 63) / 64; if (num_words > policy->num_tmp_filter_words) { const size_t new_words = (num_words * 2) + 1; @@ -210,7 +218,7 @@ gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) { VectorAggDef *agg_def = &policy->agg_defs[i]; void *agg_state = policy->agg_states[i]; - compute_single_aggregate(policy, batch_state, agg_def, agg_state, policy->agg_extra_mctx); + compute_single_aggregate(policy, vector_slot, agg_def, agg_state, policy->agg_extra_mctx); } /* @@ -220,10 +228,12 @@ gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) for (int i = 0; i < ngrp; i++) { GroupingColumn *col = &policy->grouping_columns[i]; + const AttrNumber attnum = AttrOffsetGetAttrNumber(col->input_offset); Assert(col->input_offset >= 0); Assert(col->output_offset >= 0); - CompressedColumnValues *values = &batch_state->compressed_columns[col->input_offset]; + const CompressedColumnValues *values = + vector_slot_get_compressed_column_values(vector_slot, attnum); Assert(values->decompression_type == DT_Scalar); /* diff --git a/tsl/src/nodes/vector_agg/grouping_policy_hash.c b/tsl/src/nodes/vector_agg/grouping_policy_hash.c index 77acbe81ec4..f92bb973c4a 100644 --- a/tsl/src/nodes/vector_agg/grouping_policy_hash.c +++ b/tsl/src/nodes/vector_agg/grouping_policy_hash.c @@ -11,13 +11,15 @@ #include +#include +#include #include #include #include "grouping_policy.h" -#include "nodes/decompress_chunk/compressed_batch.h" #include "nodes/vector_agg/exec.h" +#include "nodes/vector_agg/vector_slot.h" #include "grouping_policy_hash.h" @@ -106,16 +108,17 @@ gp_hash_reset(GroupingPolicy *obj) } static void -compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState *batch_state, - int start_row, int end_row, const VectorAggDef *agg_def, void *agg_states) +compute_single_aggregate(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row, + int end_row, const VectorAggDef *agg_def, void *agg_states) { const ArrowArray *arg_arrow = NULL; const uint64 *arg_validity_bitmap = NULL; Datum arg_datum = 0; bool arg_isnull = true; - + uint16 total_batch_rows = 0; const uint32 *offsets = policy->key_index_for_row; MemoryContext agg_extra_mctx = policy->agg_extra_mctx; + const uint64 *vector_qual_result = vector_slot_get_qual_result(vector_slot, &total_batch_rows); /* * We have functions with one argument, and one function with no arguments @@ -123,8 +126,10 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState */ if (agg_def->input_offset >= 0) { + const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset); const CompressedColumnValues *values = - &batch_state->compressed_columns[agg_def->input_offset]; + vector_slot_get_compressed_column_values(vector_slot, attnum); + Assert(values->decompression_type != DT_Invalid); Assert(values->decompression_type != DT_Iterator); @@ -144,11 +149,11 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState /* * Compute the unified validity bitmap. */ - const size_t num_words = (batch_state->total_batch_rows + 63) / 64; + const size_t num_words = (total_batch_rows + 63) / 64; const uint64 *filter = arrow_combine_validity(num_words, policy->tmp_filter, agg_def->filter_result, - batch_state->vector_qual_result, + vector_qual_result, arg_validity_bitmap); /* @@ -199,13 +204,11 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState } static void -add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, const int start_row, +add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int start_row, const int end_row) { const int num_fns = policy->num_agg_defs; - Assert(start_row < end_row); - Assert(end_row <= batch_state->total_batch_rows); /* * Remember which aggregation states have already existed, and which we @@ -218,7 +221,7 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con * Match rows to aggregation states using a hash table. */ Assert((size_t) end_row <= policy->num_key_index_for_row); - policy->hashing.fill_offsets(policy, batch_state, start_row, end_row); + policy->hashing.fill_offsets(policy, vector_slot, start_row, end_row); /* * Process the aggregate function states. We are processing single aggregate @@ -256,7 +259,7 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con * Add this batch to the states of this aggregate function. */ compute_single_aggregate(policy, - batch_state, + vector_slot, start_row, end_row, agg_def, @@ -275,14 +278,14 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con } static void -gp_hash_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) +gp_hash_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot) { GroupingPolicyHash *policy = (GroupingPolicyHash *) gp; + uint16 n; + const uint64 *restrict filter = vector_slot_get_qual_result(vector_slot, &n); Assert(!policy->returning_results); - const int n = batch_state->total_batch_rows; - /* * Initialize the array for storing the aggregate state offsets corresponding * to a given batch row. We don't need the offsets for the previous batch @@ -317,24 +320,24 @@ gp_hash_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) for (int i = 0; i < policy->num_grouping_columns; i++) { const GroupingColumn *def = &policy->grouping_columns[i]; - const CompressedColumnValues *values = &batch_state->compressed_columns[def->input_offset]; - policy->current_batch_grouping_column_values[i] = *values; - } + policy->current_batch_grouping_column_values[i] = + *vector_slot_get_compressed_column_values(vector_slot, + AttrOffsetGetAttrNumber(def->input_offset)); + } /* * Call the per-batch initialization function of the hashing strategy. */ - policy->hashing.prepare_for_batch(policy, batch_state); + policy->hashing.prepare_for_batch(policy, vector_slot); /* * Add the batch rows to aggregate function states. */ - const uint64 *restrict filter = batch_state->vector_qual_result; - add_one_range(policy, batch_state, 0, n); + add_one_range(policy, vector_slot, 0, n); - policy->stat_input_total_rows += batch_state->total_batch_rows; - policy->stat_input_valid_rows += arrow_num_valid(filter, batch_state->total_batch_rows); + policy->stat_input_total_rows += n; + policy->stat_input_valid_rows += arrow_num_valid(filter, n); } static bool diff --git a/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h b/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h index 0f18a0f5fe0..ed4c0ed12bb 100644 --- a/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h +++ b/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h @@ -6,6 +6,9 @@ #pragma once +#include "nodes/vector_agg/grouping_policy_hash.h" +#include "nodes/vector_agg/vector_slot.h" + /* * The data required to map the rows of the given compressed batch to the unique * indexes of grouping keys, using a hash table. @@ -21,11 +24,12 @@ typedef struct BatchHashingParams } BatchHashingParams; static pg_attribute_always_inline BatchHashingParams -build_batch_hashing_params(GroupingPolicyHash *policy, DecompressBatchState *batch_state) +build_batch_hashing_params(GroupingPolicyHash *policy, TupleTableSlot *vector_slot) { + uint16 nrows; BatchHashingParams params = { .policy = policy, - .batch_filter = batch_state->vector_qual_result, + .batch_filter = vector_slot_get_qual_result(vector_slot, &nrows), .result_key_indexes = policy->key_index_for_row, }; diff --git a/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c b/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c index ead986dc600..c743a3171e8 100644 --- a/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c +++ b/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c @@ -15,11 +15,11 @@ * allocations in the hot loop that fills the hash table. */ void -hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state) +hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows) { HashingStrategy *hashing = &policy->hashing; - const int n = batch_state->total_batch_rows; - const uint32 num_possible_keys = policy->last_used_key_index + 1 + n; + const uint32 num_possible_keys = policy->last_used_key_index + 1 + nrows; + if (num_possible_keys > hashing->num_allocated_output_keys) { hashing->num_allocated_output_keys = num_possible_keys * 2 + 1; diff --git a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c index 319ec06c6fb..0cc6426cbae 100644 --- a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c +++ b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c @@ -5,6 +5,7 @@ */ #include "batch_hashing_params.h" +#include "nodes/vector_agg/vector_slot.h" /* * The hash table maps the value of the grouping key to its unique index. @@ -61,10 +62,12 @@ FUNCTION_NAME(hash_strategy_reset)(HashingStrategy *hashing) static void FUNCTION_NAME(hash_strategy_prepare_for_batch)(GroupingPolicyHash *policy, - DecompressBatchState *batch_state) + TupleTableSlot *vector_slot) { - hash_strategy_output_key_alloc(policy, batch_state); - FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, batch_state); + uint16 nrows = 0; + vector_slot_get_qual_result(vector_slot, &nrows); + hash_strategy_output_key_alloc(policy, nrows); + FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, vector_slot); } /* @@ -158,12 +161,12 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e } static void -FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, DecompressBatchState *batch_state, - int start_row, int end_row) +FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row, + int end_row) { Assert((size_t) end_row <= policy->num_key_index_for_row); - BatchHashingParams params = build_batch_hashing_params(policy, batch_state); + BatchHashingParams params = build_batch_hashing_params(policy, vector_slot); FUNCTION_NAME(fill_offsets_impl)(params, start_row, end_row); } diff --git a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c index 972567dcc10..1e0ddabc187 100644 --- a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c +++ b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c @@ -17,7 +17,7 @@ FUNCTION_NAME(key_hashing_init)(HashingStrategy *hashing) static void FUNCTION_NAME(key_hashing_prepare_for_batch)(GroupingPolicyHash *policy, - DecompressBatchState *batch_state) + TupleTableSlot *vector_slot) { } diff --git a/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h b/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h index 73c6130ffca..b900f002bdd 100644 --- a/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h +++ b/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h @@ -25,9 +25,9 @@ typedef struct HashingStrategy void (*init)(HashingStrategy *hashing, GroupingPolicyHash *policy); void (*reset)(HashingStrategy *hashing); uint64 (*get_size_bytes)(HashingStrategy *hashing); - void (*prepare_for_batch)(GroupingPolicyHash *policy, DecompressBatchState *batch_state); - void (*fill_offsets)(GroupingPolicyHash *policy, DecompressBatchState *batch_state, - int start_row, int end_row); + void (*prepare_for_batch)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot); + void (*fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row, + int end_row); void (*emit_key)(GroupingPolicyHash *policy, uint32 current_key, TupleTableSlot *aggregated_slot); @@ -56,6 +56,6 @@ typedef struct HashingStrategy uint32 null_key_index; } HashingStrategy; -void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state); +void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows); void hash_strategy_output_key_single_emit(GroupingPolicyHash *policy, uint32 current_key, TupleTableSlot *aggregated_slot); diff --git a/tsl/src/nodes/vector_agg/vector_slot.h b/tsl/src/nodes/vector_agg/vector_slot.h new file mode 100644 index 00000000000..01a1a117b56 --- /dev/null +++ b/tsl/src/nodes/vector_agg/vector_slot.h @@ -0,0 +1,42 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#pragma once + +#include +#include +#include +#include + +/* + * Vector slot functions. + * + * These functions provide a common interface for arrow slots and compressed + * batches. + * + */ +static inline const uint64 * +vector_slot_get_qual_result(const TupleTableSlot *slot, uint16 *num_rows) +{ + const DecompressBatchState *batch_state = (const DecompressBatchState *) slot; + *num_rows = batch_state->total_batch_rows; + return batch_state->vector_qual_result; +} + +/* + * Return the arrow array or the datum (in case of single scalar value) for a + * given attribute. + * + * This is essentially doing the same thing as the separate functions above, + * but with a common return type. + */ +static inline const CompressedColumnValues * +vector_slot_get_compressed_column_values(TupleTableSlot *slot, const AttrNumber attnum) +{ + const uint16 offset = AttrNumberGetAttrOffset(attnum); + const DecompressBatchState *batch_state = (const DecompressBatchState *) slot; + const CompressedColumnValues *values = &batch_state->compressed_columns[offset]; + return values; +}