Skip to content

Commit

Permalink
Support vectorized aggregation on Hypercore TAM
Browse files Browse the repository at this point in the history
Add support for vectorized aggregation over Hypercore TAM. This
includes some refactoring of the VectorAgg node in order to plan
vectorized aggregation on top of ColumnarScans.

Currently, only ColumnarScan can run below VectorAgg, because it is
doing qual filtering. In theory, a SeqScan reading from Hypercore TAM
should also work because it would produce Arrow slots. However, a
SeqScan doesn't do vectorized filtering, which is currently assumed to
be done before the VectorAgg node.

In ColumnarScan, it necessary to turn off projection when VectorAgg is
used. Otherwise, it would project the arrow slot into a virtual slot,
thus losing the vector data. Ideally, a projection should never be
planned to begin with, but this isn't possible since VectorAgg relies
on replacing existing non-vectorized Agg plans added by PostgreSQL.
  • Loading branch information
erimatnor committed Feb 6, 2025
1 parent 3e5c2b6 commit 4c26867
Show file tree
Hide file tree
Showing 13 changed files with 648 additions and 58 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7655
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7655 Support vectorized aggregation on Hypercore TAM
8 changes: 7 additions & 1 deletion tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow_cache.h"
#include "compression/arrow_c_data_interface.h"
#include "debug_assert.h"
#include "nodes/decompress_chunk/compressed_batch.h"

#include <limits.h>

Expand Down Expand Up @@ -88,6 +89,10 @@ typedef struct ArrowTupleTableSlot
const uint64 *arrow_qual_result; /* Bitmap with result of qual
* filtering over arrow_array. NULL if
* no filtering has been applied. */

/* Struct to hold values for one column. Necessary for compatibility with
* vector aggs. */
struct CompressedColumnValues ccvalues;
} ArrowTupleTableSlot;

extern const TupleTableSlotOps TTSOpsArrowTuple;
Expand Down Expand Up @@ -402,8 +407,9 @@ arrow_slot_per_segment_memory_context(const TupleTableSlot *slot)
return aslot->per_segment_mcxt;
}

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno);

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs);
extern void arrow_slot_set_index_attrs(TupleTableSlot *slot, Bitmapset *attrs);

Expand Down
131 changes: 121 additions & 10 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <optimizer/optimizer.h>

#include "nodes/vector_agg/exec.h"

#include "compression/arrow_c_data_interface.h"
#include "hypercore/arrow_tts.h"
#include "hypercore/vector_quals.h"
#include "nodes/columnar_scan/columnar_scan.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"
#include "nodes/vector_agg/vector_slot.h"

static int
get_input_offset(const CustomScanState *state, const Var *var)
get_input_offset_decompress_chunk(const DecompressChunkState *decompress_state, const Var *var)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;

/*
Expand Down Expand Up @@ -58,16 +62,57 @@ get_input_offset(const CustomScanState *state, const Var *var)
}

static void
get_column_storage_properties(const CustomScanState *state, int input_offset,
GroupingColumn *result)
get_column_storage_properties_decompress_chunk(const DecompressChunkState *state, int input_offset,
GroupingColumn *result)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;
const DecompressContext *dcontext = &state->decompress_context;
const CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[input_offset];
result->value_bytes = desc->value_bytes;
result->by_value = desc->by_value;
}

/*
* Given a Var reference, get the offset of the corresponding attribute in the
* input tuple.
*
* For a node returning arrow slots, this is just the attribute number in the
* Var. But if the node is DecompressChunk, it is necessary to translate
* between the compressed and non-compressed columns.
*/
static int
get_input_offset(const CustomScanState *state, const Var *var)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
return AttrNumberGetAttrOffset(var->varattno);

return get_input_offset_decompress_chunk((const DecompressChunkState *) state, var);
}

/*
* Get the type length and "byval" properties for the grouping column given by
* the input offset.
*
* For a node returning arrow slots, the properties can be read directly from
* the scanned relation's tuple descriptor. For DecompressChunk, the input
* offset references the compressed relation.
*/
static void
get_column_storage_properties(const CustomScanState *state, int input_offset,
GroupingColumn *result)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
{
const TupleDesc tupdesc = RelationGetDescr(state->ss.ss_currentRelation);
result->by_value = TupleDescAttr(tupdesc, input_offset)->attbyval;
result->value_bytes = TupleDescAttr(tupdesc, input_offset)->attlen;
return;
}

get_column_storage_properties_decompress_chunk((const DecompressChunkState *) state,
input_offset,
result);
}

static void
vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
{
Expand Down Expand Up @@ -312,6 +357,33 @@ compressed_batch_get_next_slot(VectorAggState *vector_agg_state)
return &batch_state->decompressed_scan_slot_data.base;
}

/*
* Get the next slot to aggregate for a arrow tuple table slot.
*
* Implements "get next slot" on top of ColumnarScan (or any node producing
* ArrowTupleTableSlots). It just reads the slot from the child node.
*/
static TupleTableSlot *
arrow_get_next_slot(VectorAggState *vector_agg_state)
{
TupleTableSlot *slot = ExecProcNode(linitial(vector_agg_state->custom.custom_ps));

if (TupIsNull(slot))
{
/* The input has ended. */
vector_agg_state->input_ended = true;
return NULL;
}

Assert(TTS_IS_ARROWTUPLE(slot));

/* Filtering should have happened in the scan node below so the slot
* should not be consumed here. */
Assert(!arrow_slot_is_consumed(slot));

return slot;
}

/*
* Initialize vector quals for a compressed batch.
*
Expand Down Expand Up @@ -341,6 +413,18 @@ compressed_batch_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_
return &agg_state->vqual_state.vqstate;
}

/*
* Initialize FILTER vector quals for an arrow tuple slot.
*
* Used to implement vectorized aggregate function filter clause.
*/
static VectorQualState *
arrow_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_def, TupleTableSlot *slot)

Check warning on line 422 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L422

Added line #L422 was not covered by tests
{
vector_qual_state_init(&agg_state->vqual_state.vqstate, agg_def->filter_clauses, slot);
return &agg_state->vqual_state.vqstate;

Check warning on line 425 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L424-L425

Added lines #L424 - L425 were not covered by tests
}

static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
Expand Down Expand Up @@ -425,6 +509,11 @@ vector_agg_exec(CustomScanState *node)
* Finally, pass the compressed batch to the grouping policy.
*/
grouping->gp_add_batch(grouping, slot);

/* The entire arrow array should be aggregated, so mark it is consumed
* so that we get the next array (or end) in the next iteration of the
* loop. */
vector_slot_mark_consumed(slot);
}

/*
Expand Down Expand Up @@ -481,20 +570,42 @@ Node *
vector_agg_state_create(CustomScan *cscan)
{
VectorAggState *state = (VectorAggState *) newNode(sizeof(VectorAggState), T_CustomScanState);
CustomScan *childscan = castNode(CustomScan, linitial(cscan->custom_plans));

state->custom.methods = &exec_methods;

/*
* Initialize VectorAggState to process vector slots from different
* subnodes. Currently, only compressed batches are supported, but arrow
* slots will be supported as well.
* subnodes.
*
* VectorAgg supports two child nodes: ColumnarScan (producing arrow tuple
* table slots) and DecompressChunk (producing compressed batches).
*
* When the child is ColumnarScan, VectorAgg expects Arrow slots that
* carry arrow arrays. ColumnarScan performs standard qual filtering and
* vectorized qual filtering prior to handing the slot up to VectorAgg.
*
* When the child is DecompressChunk, VectorAgg doesn't read the slot from
* the child node. Instead, it bypasses DecompressChunk and reads
* compressed tuples directly from the grandchild. It therefore needs to
* handle batch decompression and vectorized qual filtering itself, in its
* own "get next slot" implementation.
*
* The vector qual init functions are needed to implement vectorized
* aggregate function FILTER clauses for arrow tuple table slots and
* compressed batches, respectively.
*/
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
if (is_columnar_scan(childscan))
{
state->get_next_slot = arrow_get_next_slot;
state->init_vector_quals = arrow_init_vector_quals;
}
else
{
Assert(strcmp(childscan->methods->CustomName, "DecompressChunk") == 0);
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
}

return (Node *) state;
}
Loading

0 comments on commit 4c26867

Please sign in to comment.