Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try even lazier decompression #7782

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 104 additions & 21 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,18 @@
}

static void
decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, int i)
compressed_column_fetch_compressed_data(DecompressContext *dcontext,
DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, int i)
{
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->arrow = NULL;
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->custom_scan_attno);
column_values->output_value = &compressed_batch_current_tuple(batch_state)->tts_values[attr];
column_values->output_isnull = &compressed_batch_current_tuple(batch_state)->tts_isnull[attr];
const int value_bytes = get_typlen(column_description->typid);
Assert(value_bytes != 0);

/*
* Shouldn't be called on a column that already has some data.
*/
Assert(column_values->arrow == NULL);
Assert(column_values->buffers[0] == NULL);

bool isnull;
Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull);
Expand All @@ -193,12 +194,43 @@
}

/* Detoast the compressed datum. */
value = PointerGetDatum(detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value),
&dcontext->detoaster,
batch_state->per_batch_context));
column_values->buffers[0] =
detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value),
&dcontext->detoaster,
batch_state->per_batch_context);

column_values->buffers[1] = dcontext;
column_values->buffers[2] = batch_state;
column_values->buffers[3] = (void *) (intptr_t) i;

column_values->decompression_type = DT_Pending;
}

void
compressed_batch_decompress_column(CompressedColumnValues *column_values)
{
Assert(column_values->decompression_type != DT_NoData);
if (column_values->decompression_type != DT_Pending)
{
return;

Check warning on line 215 in tsl/src/nodes/decompress_chunk/compressed_batch.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/compressed_batch.c#L215

Added line #L215 was not covered by tests
}

Assert(column_values->arrow == NULL);

DecompressContext *dcontext = (DecompressContext *) column_values->buffers[1];
DecompressBatchState *batch_state = (DecompressBatchState *) column_values->buffers[2];
const int i = (intptr_t) column_values->buffers[3];

CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
const int value_bytes = get_typlen(column_description->typid);
Assert(value_bytes != 0);

CompressedDataHeader *header = (CompressedDataHeader *) column_values->buffers[0];

/* The NULL compressed value should have been handled earlier. */
Assert(header != NULL);

/* Decompress the entire batch if it is supported. */
CompressedDataHeader *header = (CompressedDataHeader *) value;
ArrowArray *arrow = NULL;
if (dcontext->enable_bulk_decompression && column_description->bulk_decompression_supported)
{
Expand Down Expand Up @@ -351,15 +383,29 @@

CompressedColumnValues *column_values = &batch_state->compressed_columns[column_index];

if (column_values->decompression_type == DT_Invalid)
if (column_values->decompression_type == DT_NoData)
{
/*
* We decompress the compressed columns on demand, so that we can
* skip decompressing some columns if the entire batch doesn't pass
* the quals.
*/
decompress_column(dcontext, batch_state, compressed_slot, column_index);
Assert(column_values->decompression_type != DT_Invalid);
compressed_column_fetch_compressed_data(dcontext,
batch_state,
compressed_slot,
column_index);
Assert(column_values->decompression_type != DT_NoData);
}

if (column_values->decompression_type == DT_Pending)
{
/*
* Might see pending column as an argument when this is called for
* vectorized aggregate FILTER clause, hence this block is separate from
* the above one.
*/
compressed_batch_decompress_column(column_values);
Assert(column_values->decompression_type != DT_Pending);
}

Assert(column_values->decompression_type != DT_Iterator);
Expand Down Expand Up @@ -885,8 +931,19 @@
*/
Assert(i < dcontext->num_data_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->decompression_type = DT_Invalid;

column_values->decompression_type = DT_NoData;
column_values->buffers[0] = NULL;
column_values->buffers[1] = NULL;
column_values->buffers[2] = NULL;
column_values->buffers[3] = NULL;
column_values->arrow = NULL;

const AttrNumber attr =
AttrNumberGetAttrOffset(column_description->custom_scan_attno);
column_values->output_value = &decompressed_tuple->tts_values[attr];
column_values->output_isnull = &decompressed_tuple->tts_isnull[attr];

break;
}
case SEGMENTBY_COLUMN:
Expand Down Expand Up @@ -1008,16 +1065,22 @@
{
/*
* We have some rows in the batch that pass the vectorized filters, so
* we have to decompress the rest of the compressed columns.
* we have to fetch the compressed data for the rest of the columns. The
* subsequent decompression might be done later e.g. during the
* vectorized aggregation.
*/
const int num_data_columns = dcontext->num_data_columns;
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Invalid)
Assert(column_values->decompression_type != DT_Pending);
if (column_values->decompression_type == DT_NoData)
{
decompress_column(dcontext, batch_state, compressed_slot, i);
Assert(column_values->decompression_type != DT_Invalid);
compressed_column_fetch_compressed_data(dcontext, batch_state, compressed_slot, i);
Assert(column_values->decompression_type != DT_NoData);

/* FIXME */
// compressed_batch_decompress_column(column_values);
}
}

Expand Down Expand Up @@ -1176,6 +1239,22 @@
return ExecQual(dcontext->ps->qual, econtext);
}

static void
compressed_column_decompress_pending(DecompressContext *dcontext, DecompressBatchState *batch_state)
{
const int num_data_columns = dcontext->num_data_columns;
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Pending)
{
compressed_batch_decompress_column(column_values);
Assert(column_values->decompression_type != DT_NoData);
Assert(column_values->decompression_type != DT_Pending);
}
}
}

/*
* Decompress the next tuple from the batch indicated by batch state. The result is stored
* in batch_state->decompressed_scan_slot. The slot will be empty if the batch
Expand All @@ -1184,6 +1263,9 @@
void
compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batch_state)
{
/* FIXME */
compressed_column_decompress_pending(dcontext, batch_state);

Assert(batch_state->total_batch_rows > 0);

TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base;
Expand Down Expand Up @@ -1283,11 +1365,12 @@
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
Assert(column_values->decompression_type != DT_Invalid);
Assert(column_values->decompression_type != DT_NoData);
}
#endif

/* Make the first tuple and save it. */
compressed_column_decompress_pending(dcontext, batch_state);
Assert(batch_state->next_batch_row == 0);
const uint16 arrow_row = dcontext->reverse ? batch_state->total_batch_rows - 1 : 0;
make_next_tuple(batch_state, arrow_row, dcontext->num_data_columns);
Expand Down
14 changes: 9 additions & 5 deletions tsl/src/nodes/decompress_chunk/compressed_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ typedef struct ArrowArray ArrowArray;
/* How to obtain the decompressed datum for individual row. */
typedef enum
{
DT_ArrowTextDict = -4,
DT_ArrowTextDict = -5,

DT_ArrowText = -3,
DT_ArrowText = -4,

/*
* The decompressed value is already in the decompressed slot. This is used
* for segmentby and compressed columns with default value in batch.
*/
DT_Scalar = -2,
DT_Scalar = -3,

DT_Iterator = -1,
DT_Iterator = -2,

DT_Invalid = 0,
DT_Pending = -1,

DT_NoData = 0,

/*
* Any positive number is also valid for the decompression type. It means
Expand All @@ -48,6 +50,7 @@ typedef struct CompressedColumnValues
/*
* The flattened source buffers for getting the decompressed datum.
* Depending on decompression type, they are as follows:
* invalid: compressed data (this is used for lazier decompression).
* iterator: iterator
* arrow fixed: validity, value
* arrow text: validity, uint32* offsets, void* bodies
Expand Down Expand Up @@ -125,6 +128,7 @@ extern void compressed_batch_save_first_tuple(DecompressContext *dcontext,
DecompressBatchState *batch_state,
TupleTableSlot *first_tuple_slot);

extern void compressed_batch_decompress_column(CompressedColumnValues *column_values);
/*
* Initialize the batch memory context and bulk decompression context.
*
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/grouping_policy_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ compute_single_aggregate(GroupingPolicyBatch *policy, TupleTableSlot *vector_slo
const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);

Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_NoData);
Assert(values->decompression_type != DT_Pending);
Assert(values->decompression_type != DT_Iterator);

if (values->arrow != NULL)
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ compute_single_aggregate(GroupingPolicyHash *policy, TupleTableSlot *vector_slot
const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);

Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_NoData);
Assert(values->decompression_type != DT_Iterator);

if (values->arrow != NULL)
Expand Down
8 changes: 6 additions & 2 deletions tsl/src/nodes/vector_agg/vector_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ vector_slot_get_compressed_column_values(TupleTableSlot *slot, const AttrNumber
return values;
}

const DecompressBatchState *batch_state = (const DecompressBatchState *) slot;
const CompressedColumnValues *values = &batch_state->compressed_columns[offset];
DecompressBatchState *batch_state = (DecompressBatchState *) slot;
CompressedColumnValues *values = &batch_state->compressed_columns[offset];
if (values->decompression_type == DT_Pending)
{
compressed_batch_decompress_column(values);
}
return values;
}
Loading