From 64e5ffceb769311d7a5137f3d849fdd751cee590 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 23 Jan 2025 18:02:54 +0100 Subject: [PATCH] Refactor VectorAgg to use TupleTableSlot interface To support VectorAgg on top of Hypercore TAM, change the vector agg processing functions to pass around tuple table slots instead of compressed batches. This makes it possible to pass any "compatible" "vector slot" to the vector agg functions, which is required since TAM uses a slightly different slot implementation for arrow/vector data. In addition, add some functions to handle reading vector data from compatible vector slot implementations. This commit only adds the code to read from compressed batches. Arrow slots will be supported as part of a later change. --- tsl/src/nodes/vector_agg/exec.c | 2 +- tsl/src/nodes/vector_agg/grouping_policy.h | 5 +- .../nodes/vector_agg/grouping_policy_batch.c | 32 +++++++----- .../nodes/vector_agg/grouping_policy_hash.c | 49 ++++++++++--------- .../vector_agg/hashing/batch_hashing_params.h | 8 ++- .../vector_agg/hashing/hash_strategy_common.c | 6 +-- .../vector_agg/hashing/hash_strategy_impl.c | 15 +++--- .../hash_strategy_impl_single_fixed_key.c | 2 +- .../vector_agg/hashing/hashing_strategy.h | 8 +-- tsl/src/nodes/vector_agg/vector_slot.h | 42 ++++++++++++++++ 10 files changed, 116 insertions(+), 53 deletions(-) create mode 100644 tsl/src/nodes/vector_agg/vector_slot.h diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index 6d9b08fee65..b9184797998 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -368,7 +368,7 @@ vector_agg_exec(CustomScanState *node) /* * Finally, pass the compressed batch to the grouping policy. */ - grouping->gp_add_batch(grouping, batch_state); + grouping->gp_add_batch(grouping, &batch_state->decompressed_scan_slot_data.base); } /* diff --git a/tsl/src/nodes/vector_agg/grouping_policy.h b/tsl/src/nodes/vector_agg/grouping_policy.h index 30a6dfd4690..9c7a7a30095 100644 --- a/tsl/src/nodes/vector_agg/grouping_policy.h +++ b/tsl/src/nodes/vector_agg/grouping_policy.h @@ -5,7 +5,8 @@ */ #pragma once -typedef struct DecompressBatchState DecompressBatchState; +#include +#include typedef struct GroupingPolicy GroupingPolicy; @@ -31,7 +32,7 @@ typedef struct GroupingPolicy /* * Aggregate a single compressed batch. */ - void (*gp_add_batch)(GroupingPolicy *gp, DecompressBatchState *batch_state); + void (*gp_add_batch)(GroupingPolicy *gp, TupleTableSlot *vector_slot); /* * Is a partial aggregation result ready? diff --git a/tsl/src/nodes/vector_agg/grouping_policy_batch.c b/tsl/src/nodes/vector_agg/grouping_policy_batch.c index c9b4249c3f4..ff847891c37 100644 --- a/tsl/src/nodes/vector_agg/grouping_policy_batch.c +++ b/tsl/src/nodes/vector_agg/grouping_policy_batch.c @@ -12,13 +12,14 @@ #include +#include #include #include #include "grouping_policy.h" -#include "nodes/decompress_chunk/compressed_batch.h" #include "nodes/vector_agg/exec.h" +#include "nodes/vector_agg/vector_slot.h" typedef struct { @@ -113,13 +114,15 @@ gp_batch_reset(GroupingPolicy *obj) } static void -compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batch_state, +compute_single_aggregate(GroupingPolicyBatch *policy, TupleTableSlot *vector_slot, VectorAggDef *agg_def, void *agg_state, MemoryContext agg_extra_mctx) { - ArrowArray *arg_arrow = NULL; + const ArrowArray *arg_arrow = NULL; const uint64 *arg_validity_bitmap = NULL; Datum arg_datum = 0; bool arg_isnull = true; + uint16 total_batch_rows = 0; + const uint64 *vector_qual_result = vector_slot_get_qual_result(vector_slot, &total_batch_rows); /* * We have functions with one argument, and one function with no arguments @@ -127,7 +130,10 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc */ if (agg_def->input_offset >= 0) { - CompressedColumnValues *values = &batch_state->compressed_columns[agg_def->input_offset]; + const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset); + const CompressedColumnValues *values = + vector_slot_get_compressed_column_values(vector_slot, attnum); + Assert(values->decompression_type != DT_Invalid); Assert(values->decompression_type != DT_Iterator); @@ -147,10 +153,10 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc /* * Compute the unified validity bitmap. */ - const size_t num_words = (batch_state->total_batch_rows + 63) / 64; + const size_t num_words = (total_batch_rows + 63) / 64; const uint64 *filter = arrow_combine_validity(num_words, policy->tmp_filter, - batch_state->vector_qual_result, + vector_qual_result, agg_def->filter_result, arg_validity_bitmap); @@ -172,7 +178,7 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc * have been skipped by the caller, but we also have to check for the * case when no rows match the aggregate FILTER clause. */ - const int n = arrow_num_valid(filter, batch_state->total_batch_rows); + const int n = arrow_num_valid(filter, total_batch_rows); if (n > 0) { agg_def->func.agg_scalar(agg_state, arg_datum, arg_isnull, n, agg_extra_mctx); @@ -181,15 +187,17 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc } static void -gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) +gp_batch_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot) { GroupingPolicyBatch *policy = (GroupingPolicyBatch *) gp; + uint16 total_batch_rows = 0; + vector_slot_get_qual_result(vector_slot, &total_batch_rows); /* * Allocate the temporary filter array for computing the combined results of * batch filter, aggregate filter and column validity. */ - const size_t num_words = (batch_state->total_batch_rows + 63) / 64; + const size_t num_words = (total_batch_rows + 63) / 64; if (num_words > policy->num_tmp_filter_words) { const size_t new_words = (num_words * 2) + 1; @@ -210,7 +218,7 @@ gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) { VectorAggDef *agg_def = &policy->agg_defs[i]; void *agg_state = policy->agg_states[i]; - compute_single_aggregate(policy, batch_state, agg_def, agg_state, policy->agg_extra_mctx); + compute_single_aggregate(policy, vector_slot, agg_def, agg_state, policy->agg_extra_mctx); } /* @@ -220,10 +228,12 @@ gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) for (int i = 0; i < ngrp; i++) { GroupingColumn *col = &policy->grouping_columns[i]; + const AttrNumber attnum = AttrOffsetGetAttrNumber(col->input_offset); Assert(col->input_offset >= 0); Assert(col->output_offset >= 0); - CompressedColumnValues *values = &batch_state->compressed_columns[col->input_offset]; + const CompressedColumnValues *values = + vector_slot_get_compressed_column_values(vector_slot, attnum); Assert(values->decompression_type == DT_Scalar); /* diff --git a/tsl/src/nodes/vector_agg/grouping_policy_hash.c b/tsl/src/nodes/vector_agg/grouping_policy_hash.c index 77acbe81ec4..f92bb973c4a 100644 --- a/tsl/src/nodes/vector_agg/grouping_policy_hash.c +++ b/tsl/src/nodes/vector_agg/grouping_policy_hash.c @@ -11,13 +11,15 @@ #include +#include +#include #include #include #include "grouping_policy.h" -#include "nodes/decompress_chunk/compressed_batch.h" #include "nodes/vector_agg/exec.h" +#include "nodes/vector_agg/vector_slot.h" #include "grouping_policy_hash.h" @@ -106,16 +108,17 @@ gp_hash_reset(GroupingPolicy *obj) } static void -compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState *batch_state, - int start_row, int end_row, const VectorAggDef *agg_def, void *agg_states) +compute_single_aggregate(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row, + int end_row, const VectorAggDef *agg_def, void *agg_states) { const ArrowArray *arg_arrow = NULL; const uint64 *arg_validity_bitmap = NULL; Datum arg_datum = 0; bool arg_isnull = true; - + uint16 total_batch_rows = 0; const uint32 *offsets = policy->key_index_for_row; MemoryContext agg_extra_mctx = policy->agg_extra_mctx; + const uint64 *vector_qual_result = vector_slot_get_qual_result(vector_slot, &total_batch_rows); /* * We have functions with one argument, and one function with no arguments @@ -123,8 +126,10 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState */ if (agg_def->input_offset >= 0) { + const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset); const CompressedColumnValues *values = - &batch_state->compressed_columns[agg_def->input_offset]; + vector_slot_get_compressed_column_values(vector_slot, attnum); + Assert(values->decompression_type != DT_Invalid); Assert(values->decompression_type != DT_Iterator); @@ -144,11 +149,11 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState /* * Compute the unified validity bitmap. */ - const size_t num_words = (batch_state->total_batch_rows + 63) / 64; + const size_t num_words = (total_batch_rows + 63) / 64; const uint64 *filter = arrow_combine_validity(num_words, policy->tmp_filter, agg_def->filter_result, - batch_state->vector_qual_result, + vector_qual_result, arg_validity_bitmap); /* @@ -199,13 +204,11 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState } static void -add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, const int start_row, +add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int start_row, const int end_row) { const int num_fns = policy->num_agg_defs; - Assert(start_row < end_row); - Assert(end_row <= batch_state->total_batch_rows); /* * Remember which aggregation states have already existed, and which we @@ -218,7 +221,7 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con * Match rows to aggregation states using a hash table. */ Assert((size_t) end_row <= policy->num_key_index_for_row); - policy->hashing.fill_offsets(policy, batch_state, start_row, end_row); + policy->hashing.fill_offsets(policy, vector_slot, start_row, end_row); /* * Process the aggregate function states. We are processing single aggregate @@ -256,7 +259,7 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con * Add this batch to the states of this aggregate function. */ compute_single_aggregate(policy, - batch_state, + vector_slot, start_row, end_row, agg_def, @@ -275,14 +278,14 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con } static void -gp_hash_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) +gp_hash_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot) { GroupingPolicyHash *policy = (GroupingPolicyHash *) gp; + uint16 n; + const uint64 *restrict filter = vector_slot_get_qual_result(vector_slot, &n); Assert(!policy->returning_results); - const int n = batch_state->total_batch_rows; - /* * Initialize the array for storing the aggregate state offsets corresponding * to a given batch row. We don't need the offsets for the previous batch @@ -317,24 +320,24 @@ gp_hash_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state) for (int i = 0; i < policy->num_grouping_columns; i++) { const GroupingColumn *def = &policy->grouping_columns[i]; - const CompressedColumnValues *values = &batch_state->compressed_columns[def->input_offset]; - policy->current_batch_grouping_column_values[i] = *values; - } + policy->current_batch_grouping_column_values[i] = + *vector_slot_get_compressed_column_values(vector_slot, + AttrOffsetGetAttrNumber(def->input_offset)); + } /* * Call the per-batch initialization function of the hashing strategy. */ - policy->hashing.prepare_for_batch(policy, batch_state); + policy->hashing.prepare_for_batch(policy, vector_slot); /* * Add the batch rows to aggregate function states. */ - const uint64 *restrict filter = batch_state->vector_qual_result; - add_one_range(policy, batch_state, 0, n); + add_one_range(policy, vector_slot, 0, n); - policy->stat_input_total_rows += batch_state->total_batch_rows; - policy->stat_input_valid_rows += arrow_num_valid(filter, batch_state->total_batch_rows); + policy->stat_input_total_rows += n; + policy->stat_input_valid_rows += arrow_num_valid(filter, n); } static bool diff --git a/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h b/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h index 0f18a0f5fe0..ed4c0ed12bb 100644 --- a/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h +++ b/tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h @@ -6,6 +6,9 @@ #pragma once +#include "nodes/vector_agg/grouping_policy_hash.h" +#include "nodes/vector_agg/vector_slot.h" + /* * The data required to map the rows of the given compressed batch to the unique * indexes of grouping keys, using a hash table. @@ -21,11 +24,12 @@ typedef struct BatchHashingParams } BatchHashingParams; static pg_attribute_always_inline BatchHashingParams -build_batch_hashing_params(GroupingPolicyHash *policy, DecompressBatchState *batch_state) +build_batch_hashing_params(GroupingPolicyHash *policy, TupleTableSlot *vector_slot) { + uint16 nrows; BatchHashingParams params = { .policy = policy, - .batch_filter = batch_state->vector_qual_result, + .batch_filter = vector_slot_get_qual_result(vector_slot, &nrows), .result_key_indexes = policy->key_index_for_row, }; diff --git a/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c b/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c index ead986dc600..c743a3171e8 100644 --- a/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c +++ b/tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c @@ -15,11 +15,11 @@ * allocations in the hot loop that fills the hash table. */ void -hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state) +hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows) { HashingStrategy *hashing = &policy->hashing; - const int n = batch_state->total_batch_rows; - const uint32 num_possible_keys = policy->last_used_key_index + 1 + n; + const uint32 num_possible_keys = policy->last_used_key_index + 1 + nrows; + if (num_possible_keys > hashing->num_allocated_output_keys) { hashing->num_allocated_output_keys = num_possible_keys * 2 + 1; diff --git a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c index 319ec06c6fb..0cc6426cbae 100644 --- a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c +++ b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c @@ -5,6 +5,7 @@ */ #include "batch_hashing_params.h" +#include "nodes/vector_agg/vector_slot.h" /* * The hash table maps the value of the grouping key to its unique index. @@ -61,10 +62,12 @@ FUNCTION_NAME(hash_strategy_reset)(HashingStrategy *hashing) static void FUNCTION_NAME(hash_strategy_prepare_for_batch)(GroupingPolicyHash *policy, - DecompressBatchState *batch_state) + TupleTableSlot *vector_slot) { - hash_strategy_output_key_alloc(policy, batch_state); - FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, batch_state); + uint16 nrows = 0; + vector_slot_get_qual_result(vector_slot, &nrows); + hash_strategy_output_key_alloc(policy, nrows); + FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, vector_slot); } /* @@ -158,12 +161,12 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e } static void -FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, DecompressBatchState *batch_state, - int start_row, int end_row) +FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row, + int end_row) { Assert((size_t) end_row <= policy->num_key_index_for_row); - BatchHashingParams params = build_batch_hashing_params(policy, batch_state); + BatchHashingParams params = build_batch_hashing_params(policy, vector_slot); FUNCTION_NAME(fill_offsets_impl)(params, start_row, end_row); } diff --git a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c index 972567dcc10..1e0ddabc187 100644 --- a/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c +++ b/tsl/src/nodes/vector_agg/hashing/hash_strategy_impl_single_fixed_key.c @@ -17,7 +17,7 @@ FUNCTION_NAME(key_hashing_init)(HashingStrategy *hashing) static void FUNCTION_NAME(key_hashing_prepare_for_batch)(GroupingPolicyHash *policy, - DecompressBatchState *batch_state) + TupleTableSlot *vector_slot) { } diff --git a/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h b/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h index 73c6130ffca..b900f002bdd 100644 --- a/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h +++ b/tsl/src/nodes/vector_agg/hashing/hashing_strategy.h @@ -25,9 +25,9 @@ typedef struct HashingStrategy void (*init)(HashingStrategy *hashing, GroupingPolicyHash *policy); void (*reset)(HashingStrategy *hashing); uint64 (*get_size_bytes)(HashingStrategy *hashing); - void (*prepare_for_batch)(GroupingPolicyHash *policy, DecompressBatchState *batch_state); - void (*fill_offsets)(GroupingPolicyHash *policy, DecompressBatchState *batch_state, - int start_row, int end_row); + void (*prepare_for_batch)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot); + void (*fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row, + int end_row); void (*emit_key)(GroupingPolicyHash *policy, uint32 current_key, TupleTableSlot *aggregated_slot); @@ -56,6 +56,6 @@ typedef struct HashingStrategy uint32 null_key_index; } HashingStrategy; -void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state); +void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows); void hash_strategy_output_key_single_emit(GroupingPolicyHash *policy, uint32 current_key, TupleTableSlot *aggregated_slot); diff --git a/tsl/src/nodes/vector_agg/vector_slot.h b/tsl/src/nodes/vector_agg/vector_slot.h new file mode 100644 index 00000000000..01a1a117b56 --- /dev/null +++ b/tsl/src/nodes/vector_agg/vector_slot.h @@ -0,0 +1,42 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#pragma once + +#include +#include +#include +#include + +/* + * Vector slot functions. + * + * These functions provide a common interface for arrow slots and compressed + * batches. + * + */ +static inline const uint64 * +vector_slot_get_qual_result(const TupleTableSlot *slot, uint16 *num_rows) +{ + const DecompressBatchState *batch_state = (const DecompressBatchState *) slot; + *num_rows = batch_state->total_batch_rows; + return batch_state->vector_qual_result; +} + +/* + * Return the arrow array or the datum (in case of single scalar value) for a + * given attribute. + * + * This is essentially doing the same thing as the separate functions above, + * but with a common return type. + */ +static inline const CompressedColumnValues * +vector_slot_get_compressed_column_values(TupleTableSlot *slot, const AttrNumber attnum) +{ + const uint16 offset = AttrNumberGetAttrOffset(attnum); + const DecompressBatchState *batch_state = (const DecompressBatchState *) slot; + const CompressedColumnValues *values = &batch_state->compressed_columns[offset]; + return values; +}