Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized grouping by multiple column #7754

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,19 @@ jobs:
# vector_agg_text and vector_agg_groupagg use the UMASH hashing library
# that we can't compile on i386.
IGNORES: >-
append-* transparent_decompression-*
transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler*
hypercore_vacuum vectorized_aggregation vector_agg_text
vector_agg_groupagg hypercore_parallel hypercore_vectoragg
append-*
bgw_db_scheduler*
hypercore_parallel
hypercore_vacuum
hypercore_vectoragg
pg_dump
telemetry
transparent_decompress_chunk-*
transparent_decompression-*
vector_agg_groupagg
vector_agg_grouping
vector_agg_text
vectorized_aggregation
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/windows-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ jobs:
build_type: ${{ fromJson(needs.config.outputs.build_type) }}
ignores: ["chunk_adaptive metadata telemetry"]
tsl_ignores: ["compression_algos"]
tsl_skips: ["vector_agg_text vector_agg_groupagg bgw_db_scheduler bgw_db_scheduler_fixed"]
tsl_skips:
- >-
bgw_db_scheduler
bgw_db_scheduler_fixed
vector_agg_groupagg
vector_agg_grouping
vector_agg_text
vectorized_aggregation
pg_config: ["-cfsync=off -cstatement_timeout=60s"]
include:
- pg: 14
Expand Down
1 change: 1 addition & 0 deletions .unreleased/hash-grouping-multiple
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7754 Vectorized aggregation with grouping by several columns
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/grouping_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ typedef enum
VAGT_HashSingleFixed2,
VAGT_HashSingleFixed4,
VAGT_HashSingleFixed8,
VAGT_HashSingleText
VAGT_HashSingleText,
VAGT_HashSerialized,
} VectorAggGroupingType;

extern GroupingPolicy *create_grouping_policy_batch(int num_agg_defs, VectorAggDef *agg_defs,
Expand Down
23 changes: 13 additions & 10 deletions tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extern HashingStrategy single_fixed_4_strategy;
extern HashingStrategy single_fixed_8_strategy;
#ifdef TS_USE_UMASH
extern HashingStrategy single_text_strategy;
extern HashingStrategy serialized_strategy;
#endif

static const GroupingPolicy grouping_policy_hash_functions;
Expand Down Expand Up @@ -74,6 +75,9 @@ create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs, int num_gr
switch (grouping_type)
{
#ifdef TS_USE_UMASH
case VAGT_HashSerialized:
policy->hashing = serialized_strategy;
break;
case VAGT_HashSingleText:
policy->hashing = single_text_strategy;
break;
Expand Down Expand Up @@ -110,8 +114,6 @@ gp_hash_reset(GroupingPolicy *obj)

policy->hashing.reset(&policy->hashing);

policy->last_used_key_index = 0;

policy->stat_input_valid_rows = 0;
policy->stat_input_total_rows = 0;
policy->stat_bulk_filtered_rows = 0;
Expand Down Expand Up @@ -225,7 +227,7 @@ add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int
* Remember which aggregation states have already existed, and which we
* have to initialize. State index zero is invalid.
*/
const uint32 last_initialized_key_index = policy->last_used_key_index;
const uint32 last_initialized_key_index = policy->hashing.last_used_key_index;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some mechanical refactoring to move more hashing-related data into the hashing strategy struct.

Assert(last_initialized_key_index <= policy->num_allocated_per_key_agg_states);

/*
Expand All @@ -246,13 +248,13 @@ add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int
* If we added new keys, initialize the aggregate function states for
* them.
*/
if (policy->last_used_key_index > last_initialized_key_index)
if (policy->hashing.last_used_key_index > last_initialized_key_index)
{
/*
* If the aggregate function states don't fit into the existing
* storage, reallocate it.
*/
if (policy->last_used_key_index >= policy->num_allocated_per_key_agg_states)
if (policy->hashing.last_used_key_index >= policy->num_allocated_per_key_agg_states)
{
policy->per_agg_per_key_states[agg_index] =
repalloc(policy->per_agg_per_key_states[agg_index],
Expand All @@ -263,7 +265,8 @@ add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int
agg_def->func.state_bytes * (last_initialized_key_index + 1) +
(char *) policy->per_agg_per_key_states[agg_index];
agg_def->func.agg_init(first_uninitialized_state,
policy->last_used_key_index - last_initialized_key_index);
policy->hashing.last_used_key_index -
last_initialized_key_index);
}

/*
Expand All @@ -281,7 +284,7 @@ add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int
* Record the newly allocated number of aggregate function states in case we
* had to reallocate.
*/
if (policy->last_used_key_index >= policy->num_allocated_per_key_agg_states)
if (policy->hashing.last_used_key_index >= policy->num_allocated_per_key_agg_states)
{
Assert(new_aggstate_rows > policy->num_allocated_per_key_agg_states);
policy->num_allocated_per_key_agg_states = new_aggstate_rows;
Expand Down Expand Up @@ -421,7 +424,7 @@ gp_hash_should_emit(GroupingPolicy *gp)
{
GroupingPolicyHash *policy = (GroupingPolicyHash *) gp;

if (policy->last_used_key_index > UINT32_MAX - GLOBAL_MAX_ROWS_PER_COMPRESSION)
if (policy->hashing.last_used_key_index > UINT32_MAX - GLOBAL_MAX_ROWS_PER_COMPRESSION)
{
/*
* The max valid key index is UINT32_MAX, so we have to spill if the next
Expand Down Expand Up @@ -450,7 +453,7 @@ gp_hash_do_emit(GroupingPolicy *gp, TupleTableSlot *aggregated_slot)
policy->returning_results = true;
policy->last_returned_key = 1;

const float keys = policy->last_used_key_index;
const float keys = policy->hashing.last_used_key_index;
if (keys > 0)
{
DEBUG_LOG("spill after %ld input, %ld valid, %ld bulk filtered, %ld cons, %.0f keys, "
Expand All @@ -471,7 +474,7 @@ gp_hash_do_emit(GroupingPolicy *gp, TupleTableSlot *aggregated_slot)
}

const uint32 current_key = policy->last_returned_key;
const uint32 keys_end = policy->last_used_key_index + 1;
const uint32 keys_end = policy->hashing.last_used_key_index + 1;
if (current_key >= keys_end)
{
policy->returning_results = false;
Expand Down
5 changes: 0 additions & 5 deletions tsl/src/nodes/vector_agg/grouping_policy_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ typedef struct GroupingPolicyHash
*/
HashingStrategy hashing;

/*
* The last used index of an unique grouping key. Key index 0 is invalid.
*/
uint32 last_used_key_index;

/*
* Temporary storage of unique indexes of keys corresponding to a given row
* of the compressed batch that is currently being aggregated. We keep it in
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/hashing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_common.c)

if(USE_UMASH)
list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_text.c)
list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_text.c
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_serialized.c)
endif()

target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
16 changes: 13 additions & 3 deletions tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ typedef struct BatchHashingParams
const uint64 *batch_filter;
CompressedColumnValues single_grouping_column;

GroupingPolicyHash *restrict policy;
int num_grouping_columns;
const CompressedColumnValues *grouping_column_values;

GroupingPolicyHash *policy;
HashingStrategy *restrict hashing;

uint32 *restrict result_key_indexes;
} BatchHashingParams;
Expand All @@ -29,12 +33,18 @@ build_batch_hashing_params(GroupingPolicyHash *policy, TupleTableSlot *vector_sl
uint16 nrows;
BatchHashingParams params = {
.policy = policy,
.hashing = &policy->hashing,
.batch_filter = vector_slot_get_qual_result(vector_slot, &nrows),
.num_grouping_columns = policy->num_grouping_columns,
.grouping_column_values = policy->current_batch_grouping_column_values,
.result_key_indexes = policy->key_index_for_row,
};

Assert(policy->num_grouping_columns == 1);
params.single_grouping_column = policy->current_batch_grouping_column_values[0];
Assert(policy->num_grouping_columns > 0);
if (policy->num_grouping_columns == 1)
{
params.single_grouping_column = policy->current_batch_grouping_column_values[0];
}

return params;
}
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ void
hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows)
{
HashingStrategy *hashing = &policy->hashing;
const uint32 num_possible_keys = policy->last_used_key_index + 1 + nrows;
const uint32 num_possible_keys = hashing->last_used_key_index + 1 + nrows;

if (num_possible_keys > hashing->num_allocated_output_keys)
{
Expand Down
31 changes: 20 additions & 11 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@ FUNCTION_NAME(hash_strategy_reset)(HashingStrategy *hashing)
{
struct FUNCTION_NAME(hash) *table = (struct FUNCTION_NAME(hash) *) hashing->table;
FUNCTION_NAME(reset)(table);

hashing->last_used_key_index = 0;

hashing->null_key_index = 0;

/*
* Have to reset this because it's in the key body context which is also
* reset here.
*/
hashing->tmp_key_storage = NULL;
hashing->num_tmp_key_storage_bytes = 0;
}

static void
Expand All @@ -76,8 +86,7 @@ FUNCTION_NAME(hash_strategy_prepare_for_batch)(GroupingPolicyHash *policy,
static pg_attribute_always_inline void
FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int end_row)
{
GroupingPolicyHash *policy = params.policy;
HashingStrategy *hashing = &policy->hashing;
HashingStrategy *restrict hashing = params.hashing;

uint32 *restrict indexes = params.result_key_indexes;

Expand All @@ -90,7 +99,7 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e
if (!arrow_row_is_valid(params.batch_filter, row))
{
/* The row doesn't pass the filter. */
DEBUG_PRINT("%p: row %d doesn't pass batch filter\n", policy, row);
DEBUG_PRINT("%p: row %d doesn't pass batch filter\n", hashing, row);
continue;
}

Expand All @@ -109,10 +118,10 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e
/* The key is null. */
if (hashing->null_key_index == 0)
{
hashing->null_key_index = ++policy->last_used_key_index;
hashing->null_key_index = ++hashing->last_used_key_index;
}
indexes[row] = hashing->null_key_index;
DEBUG_PRINT("%p: row %d null key index %d\n", policy, row, hashing->null_key_index);
DEBUG_PRINT("%p: row %d null key index %d\n", hashing, row, hashing->null_key_index);
continue;
}

Expand All @@ -128,9 +137,9 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e
*/
indexes[row] = previous_key_index;
#ifndef NDEBUG
policy->stat_consecutive_keys++;
params.policy->stat_consecutive_keys++;
#endif
DEBUG_PRINT("%p: row %d consecutive key index %d\n", policy, row, previous_key_index);
DEBUG_PRINT("%p: row %d consecutive key index %d\n", hashing, row, previous_key_index);
continue;
}

Expand All @@ -144,14 +153,14 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e
/*
* New key, have to store it persistently.
*/
const uint32 index = ++policy->last_used_key_index;
const uint32 index = ++hashing->last_used_key_index;
entry->key_index = index;
FUNCTION_NAME(key_hashing_store_new)(policy, index, output_key);
DEBUG_PRINT("%p: row %d new key index %d\n", policy, row, index);
FUNCTION_NAME(key_hashing_store_new)(hashing, index, output_key);
DEBUG_PRINT("%p: row %d new key index %d\n", hashing, row, index);
}
else
{
DEBUG_PRINT("%p: row %d old key index %d\n", policy, row, entry->key_index);
DEBUG_PRINT("%p: row %d old key index %d\n", hashing, row, entry->key_index);
}
indexes[row] = entry->key_index;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ FUNCTION_NAME(key_hashing_get_key)(BatchHashingParams params, int row,
}

static pg_attribute_always_inline void
FUNCTION_NAME(key_hashing_store_new)(GroupingPolicyHash *restrict policy, uint32 new_key_index,
FUNCTION_NAME(key_hashing_store_new)(HashingStrategy *restrict hashing, uint32 new_key_index,
OUTPUT_KEY_TYPE output_key)
{
policy->hashing.output_keys[new_key_index] = OUTPUT_KEY_TO_DATUM(output_key);
hashing->output_keys[new_key_index] = OUTPUT_KEY_TO_DATUM(output_key);
}

static void
Expand Down
Loading
Loading