Skip to content

Commit

Permalink
Vectorized hash grouping by a single text column (timescale#7586)
Browse files Browse the repository at this point in the history
Use the UMASH hashes that have a guaranteed lower bound on collisions as
the hash table keys.
  • Loading branch information
akuzm authored Feb 13, 2025
1 parent b85fee5 commit eea2895
Show file tree
Hide file tree
Showing 15 changed files with 639 additions and 10 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ jobs:
DEBIAN_FRONTEND: noninteractive
# vectorized_aggregation has different output on i386 because int8 is by
# reference and currently it cannot be used for vectorized hash grouping.
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum vectorized_aggregation"
# vector_agg_text and vector_agg_groupagg use the UMASH hashing library
# that we can't compile on i386.
IGNORES: >-
append-* transparent_decompression-*
transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler*
hypercore_vacuum vectorized_aggregation vector_agg_text
vector_agg_groupagg
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/windows-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
build_type: ${{ fromJson(needs.config.outputs.build_type) }}
ignores: ["chunk_adaptive metadata telemetry"]
tsl_ignores: ["compression_algos"]
tsl_skips: ["bgw_db_scheduler bgw_db_scheduler_fixed"]
tsl_skips: ["vector_agg_text vector_agg_groupagg bgw_db_scheduler bgw_db_scheduler_fixed"]
pg_config: ["-cfsync=off -cstatement_timeout=60s"]
include:
- pg: 14
Expand Down
1 change: 1 addition & 0 deletions .unreleased/vectorized-text-grouping
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7586 Vectorized aggregation with grouping by a single text column.
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/grouping_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ typedef enum
VAGT_Batch,
VAGT_HashSingleFixed2,
VAGT_HashSingleFixed4,
VAGT_HashSingleFixed8
VAGT_HashSingleFixed8,
VAGT_HashSingleText
} VectorAggGroupingType;

extern GroupingPolicy *create_grouping_policy_batch(int num_agg_defs, VectorAggDef *agg_defs,
Expand Down
10 changes: 10 additions & 0 deletions tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
extern HashingStrategy single_fixed_2_strategy;
extern HashingStrategy single_fixed_4_strategy;
extern HashingStrategy single_fixed_8_strategy;
#ifdef TS_USE_UMASH
extern HashingStrategy single_text_strategy;
#endif

static const GroupingPolicy grouping_policy_hash_functions;

Expand Down Expand Up @@ -70,6 +73,11 @@ create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs, int num_gr

switch (grouping_type)
{
#ifdef TS_USE_UMASH
case VAGT_HashSingleText:
policy->hashing = single_text_strategy;
break;
#endif
case VAGT_HashSingleFixed8:
policy->hashing = single_fixed_8_strategy;
break;
Expand All @@ -84,6 +92,8 @@ create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs, int num_gr
break;
}

policy->hashing.key_body_mctx = policy->agg_extra_mctx;

policy->hashing.init(&policy->hashing, policy);

return &policy->funcs;
Expand Down
5 changes: 5 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_fixed_4.c
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_fixed_8.c
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_common.c)

if(USE_UMASH)
list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_text.c)
endif()

target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
135 changes: 135 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_single_text.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

/*
* Implementation of column hashing for a single text column.
*/

#include <postgres.h>

#include <common/hashfn.h>

#include "compression/arrow_c_data_interface.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/vector_agg/exec.h"
#include "nodes/vector_agg/grouping_policy_hash.h"
#include "template_helper.h"

#include "batch_hashing_params.h"

#include "umash_fingerprint_key.h"

#define EXPLAIN_NAME "single text"
#define KEY_VARIANT single_text
#define OUTPUT_KEY_TYPE BytesView

static void
single_text_key_hashing_init(HashingStrategy *hashing)
{
hashing->umash_params = umash_key_hashing_init();
}

typedef struct BytesView
{
const uint8 *data;
uint32 len;
} BytesView;

static BytesView
get_bytes_view(CompressedColumnValues *column_values, int arrow_row)
{
const uint32 start = ((uint32 *) column_values->buffers[1])[arrow_row];
const int32 value_bytes = ((uint32 *) column_values->buffers[1])[arrow_row + 1] - start;
Assert(value_bytes >= 0);

return (BytesView){ .len = value_bytes, .data = &((uint8 *) column_values->buffers[2])[start] };
}

static pg_attribute_always_inline void
single_text_key_hashing_get_key(BatchHashingParams params, int row, void *restrict output_key_ptr,
void *restrict hash_table_key_ptr, bool *restrict valid)
{
Assert(params.policy->num_grouping_columns == 1);

BytesView *restrict output_key = (BytesView *) output_key_ptr;
HASH_TABLE_KEY_TYPE *restrict hash_table_key = (HASH_TABLE_KEY_TYPE *) hash_table_key_ptr;

if (unlikely(params.single_grouping_column.decompression_type == DT_Scalar))
{
*valid = !*params.single_grouping_column.output_isnull;
if (*valid)
{
output_key->len = VARSIZE_ANY_EXHDR(*params.single_grouping_column.output_value);
output_key->data =
(const uint8 *) VARDATA_ANY(*params.single_grouping_column.output_value);
}
else
{
output_key->len = 0;
output_key->data = NULL;
}
}
else if (params.single_grouping_column.decompression_type == DT_ArrowText)
{
*output_key = get_bytes_view(&params.single_grouping_column, row);
*valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row);
}
else if (params.single_grouping_column.decompression_type == DT_ArrowTextDict)
{
const int16 index = ((int16 *) params.single_grouping_column.buffers[3])[row];
*output_key = get_bytes_view(&params.single_grouping_column, index);
*valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row);
}
else
{
pg_unreachable();
}

DEBUG_PRINT("%p consider key row %d key index %d is %d bytes: ",
params.policy,
row,
params.policy->last_used_key_index + 1,
output_key->len);
for (size_t i = 0; i < output_key->len; i++)
{
DEBUG_PRINT("%.2x.", output_key->data[i]);
}
DEBUG_PRINT("\n");

const struct umash_fp fp = umash_fprint(params.policy->hashing.umash_params,
/* seed = */ ~0ULL,
output_key->data,
output_key->len);
*hash_table_key = umash_fingerprint_get_key(fp);
}

static pg_attribute_always_inline void
single_text_key_hashing_store_new(GroupingPolicyHash *restrict policy, uint32 new_key_index,
BytesView output_key)
{
const int total_bytes = output_key.len + VARHDRSZ;
text *restrict stored = (text *) MemoryContextAlloc(policy->hashing.key_body_mctx, total_bytes);
SET_VARSIZE(stored, total_bytes);
memcpy(VARDATA(stored), output_key.data, output_key.len);
policy->hashing.output_keys[new_key_index] = PointerGetDatum(stored);
}

/*
* We use the standard single-key key output functions.
*/
static void
single_text_emit_key(GroupingPolicyHash *policy, uint32 current_key,
TupleTableSlot *aggregated_slot)
{
return hash_strategy_output_key_single_emit(policy, current_key, aggregated_slot);
}

static void
single_text_key_hashing_prepare_for_batch(GroupingPolicyHash *policy, TupleTableSlot *vector_slot)
{
}

#include "hash_strategy_impl.c"
11 changes: 10 additions & 1 deletion tsl/src/nodes/vector_agg/hashing/hashing_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ typedef struct HashingStrategy
* This is stored separately from hash table keys, because they might not
* have the full column values, and also storing them contiguously here
* leads to better memory access patterns when emitting the results.
* The details of the key storage are managed by the hashing strategy.
* The details of the key storage are managed by the hashing strategy. The
* by-reference keys can use a separate memory context for dense storage.
*/
Datum *restrict output_keys;
uint64 num_allocated_output_keys;
MemoryContext key_body_mctx;

/*
* In single-column grouping, we store the null key outside of the hash
Expand All @@ -54,6 +56,13 @@ typedef struct HashingStrategy
* to reduce the hash table size.
*/
uint32 null_key_index;

#ifdef TS_USE_UMASH
/*
* UMASH fingerprinting parameters.
*/
struct umash_params *umash_params;
#endif
} HashingStrategy;

void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows);
Expand Down
45 changes: 45 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/umash_fingerprint_key.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#pragma once

/*
* Helpers to use the umash fingerprint as a hash table key in our hashing
* strategies for vectorized grouping.
*/

#include "import/umash.h"

/*
* The struct is packed so that the hash table entry fits into 16
* bytes with the uint32 key index that goes before.
*/
struct umash_fingerprint_key
{
uint32 hash;
uint64 rest;
} pg_attribute_packed();

#define HASH_TABLE_KEY_TYPE struct umash_fingerprint_key
#define KEY_HASH(X) (X.hash)
#define KEY_EQUAL(a, b) (a.hash == b.hash && a.rest == b.rest)

static inline struct umash_fingerprint_key
umash_fingerprint_get_key(struct umash_fp fp)
{
const struct umash_fingerprint_key key = {
.hash = fp.hash[0] & (~(uint32) 0),
.rest = fp.hash[1],
};
return key;
}

static inline struct umash_params *
umash_key_hashing_init()
{
struct umash_params *params = palloc0(sizeof(struct umash_params));
umash_params_derive(params, 0xabcdef1234567890ull, NULL);
return params;
}
9 changes: 9 additions & 0 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ get_vectorized_grouping_type(const VectorQualInfo *vqinfo, Agg *agg, List *resol
break;
}
}
#ifdef TS_USE_UMASH
else
{
Ensure(single_grouping_var->vartype == TEXTOID,
"invalid vector type %d for grouping",
single_grouping_var->vartype);
return VAGT_HashSingleText;
}
#endif
}

return VAGT_Invalid;
Expand Down
11 changes: 8 additions & 3 deletions tsl/test/expected/vector_agg_groupagg.out
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ select s, sum(value) from groupagg group by s order by s nulls first limit 10;
(10 rows)

reset timescaledb.debug_require_vector_agg;
-- More tests for dictionary encoding. This is not vectorized at the moment.
set timescaledb.debug_require_vector_agg to 'forbid';
-- More tests for dictionary encoding.
create table text_table(ts int);
select create_hypertable('text_table', 'ts', chunk_time_interval => 3);
NOTICE: adding not-null constraint to column "ts"
Expand Down Expand Up @@ -137,7 +136,7 @@ select count(compress_chunk(x)) from show_chunks('text_table') x;
(1 row)

vacuum analyze text_table;
-- Test the GroupAggregate by a text column. Not vectorized for now.
set timescaledb.debug_require_vector_agg to 'require';
select a, count(*) from text_table group by a order by a limit 10;
a | count
-------------------------+-------
Expand All @@ -155,6 +154,7 @@ select a, count(*) from text_table group by a order by a limit 10;

-- The hash grouping policies do not support the GroupAggregate mode in the
-- reverse order.
set timescaledb.debug_require_vector_agg to 'forbid';
select a, count(*) from text_table group by a order by a desc limit 10;
a | count
-----------------+-------
Expand All @@ -170,6 +170,7 @@ select a, count(*) from text_table group by a order by a desc limit 10;
different993 | 1
(10 rows)

reset timescaledb.debug_require_vector_agg;
-- with NULLS FIRST
select count(decompress_chunk(x)) from show_chunks('text_table') x;
count
Expand All @@ -185,6 +186,7 @@ select count(compress_chunk(x)) from show_chunks('text_table') x;
2
(1 row)

set timescaledb.debug_require_vector_agg to 'require';
select a, count(*) from text_table group by a order by a nulls first limit 10;
a | count
-------------------------+-------
Expand All @@ -200,7 +202,9 @@ select a, count(*) from text_table group by a order by a nulls first limit 10;
different-with-nulls11 | 1
(10 rows)

reset timescaledb.debug_require_vector_agg;
-- TODO verify that this works with the serialized hash grouping strategy
set timescaledb.debug_require_vector_agg to 'forbid';
select ts, a, count(*) from text_table group by ts, a order by ts, a limit 10;
ts | a | count
----+---------------+-------
Expand Down Expand Up @@ -231,6 +235,7 @@ select a, ts, count(*) from text_table group by a, ts order by a desc, ts desc l
different994 | 3 | 1
(10 rows)

reset timescaledb.debug_require_vector_agg;
reset max_parallel_workers_per_gather;
reset timescaledb.debug_require_vector_agg;
reset enable_hashagg;
Loading

0 comments on commit eea2895

Please sign in to comment.