Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use arrow cache only for index scans #7724

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1101,12 +1101,13 @@ _guc_init(void)
/* short_desc= */ "max number of entries in arrow data cache",
/* long_desc= */
"The max number of decompressed arrow segments that can be "
"cached before entries are evicted. This mainly affects the "
"performance of index scans on the Hypercore TAM "
"when segments are accessed in non-sequential order.",
"cached before entries are evicted. This only affects the "
"performance of index scans when using Hypercore TAM "
"and segments are accessed in non-sequential order. "
"Set to 0 to disable the use of the cache.",
/* valueAddr= */ &ts_guc_hypercore_arrow_cache_max_entries,
/* bootValue= */ 25000,
/* minValue= */ 1,
/* minValue= */ 0,
/* maxValue= */ INT_MAX,
/* context= */ PGC_USERSET,
/* flags= */ 0,
Expand Down
9 changes: 9 additions & 0 deletions tsl/src/hypercore/arrow_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ arrow_release_buffers(ArrowArray *array)
arrow_private_release(array);
}

void
arrow_release(ArrowArray *array)
{
if (array->release != NULL)
array->release(array);

pfree(array);
}

/*
* Variable-size primitive layout ArrowArray from decompression iterator.
*/
Expand Down
1 change: 1 addition & 0 deletions tsl/src/hypercore/arrow_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ extern NullableDatum arrow_get_datum(const ArrowArray *array, Oid typid, int16 t
uint16 index);
extern ArrowArray *arrow_from_compressed(Datum compressed, Oid typid, MemoryContext dest_mcxt,
MemoryContext tmp_mcxt);
extern void arrow_release(ArrowArray *array);
8 changes: 1 addition & 7 deletions tsl/src/hypercore/arrow_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,7 @@ arrow_cache_clear_entry(ArrowColumnCacheEntry *restrict entry)
if (entry->arrow_arrays[i])
{
ArrowArray *array = entry->arrow_arrays[i];

if (array->release)
{
array->release(array);
array->release = NULL;
}
pfree(array);
arrow_release(array);
entry->arrow_arrays[i] = NULL;
}
}
Expand Down
89 changes: 81 additions & 8 deletions tsl/src/hypercore/arrow_tts.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include "guc.h"
#include <access/attnum.h>
#include <access/htup_details.h>
#include <access/tupdesc.h>
Expand Down Expand Up @@ -94,6 +95,7 @@ tts_arrow_init(TupleTableSlot *slot)
aslot->total_row_count = 0;
aslot->referenced_attrs = NULL;
aslot->arrow_qual_result = NULL;
aslot->arrow_arrays = NULL;

/*
* Set up child slots, one for the non-compressed relation and one for the
Expand All @@ -112,6 +114,8 @@ tts_arrow_init(TupleTableSlot *slot)
aslot->child_slot = aslot->noncompressed_slot;
aslot->valid_attrs = palloc0(sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->segmentby_attrs = palloc0(sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->arrow_arrays = palloc0(sizeof(ArrowArray *) * slot->tts_tupleDescriptor->natts);

/* Note that aslot->referenced_attrs is initialized on demand, and not
* here, because NULL is a valid state for referenced_attrs. */
MemoryContextSwitchTo(oldmcxt);
Expand All @@ -128,6 +132,24 @@ tts_arrow_init(TupleTableSlot *slot)
GenerationContextCreateCompat(slot->tts_mcxt, "Per-segment memory context", 64 * 1024);
}

static void
clear_arrow_arrays(TupleTableSlot *slot)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;

if (aslot->arrow_arrays)
{
for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++)
{
if (aslot->arrow_arrays[i] != NULL)
{
arrow_release(aslot->arrow_arrays[i]);
aslot->arrow_arrays[i] = NULL;
}
}
}
}

/*
* The release function is called by:
*
Expand All @@ -153,6 +175,7 @@ tts_arrow_release(TupleTableSlot *slot)
aslot->compressed_slot = NULL;
aslot->noncompressed_slot = NULL;
aslot->arrow_cache_entry = NULL;
aslot->arrow_arrays = NULL;
}

static void
Expand Down Expand Up @@ -269,13 +292,15 @@ tts_arrow_clear(TupleTableSlot *slot)
memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->arrow_cache_entry = NULL;
aslot->arrow_qual_result = NULL;
clear_arrow_arrays(slot);
MemoryContextReset(aslot->per_segment_mcxt);
}

static inline void
tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 tuple_index)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
bool clear_arrow_data = true;

Assert(!TTS_EMPTY(child_slot));
Assert(OidIsValid(slot->tts_tableOid));
Expand Down Expand Up @@ -315,6 +340,8 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t

if (!ItemPointerEquals(&decoded_tid, &child_slot->tts_tid))
clear_arrow_parent(slot);
else
clear_arrow_data = false;
}
}

Expand All @@ -339,6 +366,10 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t
aslot->child_slot = child_slot;
aslot->tuple_index = tuple_index;
aslot->arrow_cache_entry = NULL;

if (clear_arrow_data)
clear_arrow_arrays(slot);

/* Clear valid attributes */
memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts);
MemoryContextReset(aslot->per_segment_mcxt);
Expand Down Expand Up @@ -462,6 +493,52 @@ is_compressed_col(const TupleDesc tupdesc, AttrNumber attno)
return coltypid == typinfo->type_oid;
}

static inline ArrowArray *
get_arrow_array(ArrowTupleTableSlot *aslot, const int16 attoff)
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff);
TupleTableSlot *slot = &aslot->base.base;

/*
* Only use the arrow array cache if the slot is used in an index scan and
* the cache hasn't been disabled by configuration.
*/
if (aslot->index_attrs != NULL && ts_guc_hypercore_arrow_cache_max_entries > 0)
{
ArrowArray **arrow_arrays = arrow_column_cache_read_one(aslot, attnum);
return arrow_arrays[attoff];
}

Assert(aslot->arrow_arrays);

if (NULL == aslot->arrow_arrays[attoff])
{
const int16 *attrs_offset_map = arrow_slot_get_attribute_offset_map(&aslot->base.base);
const AttrNumber cattno = AttrOffsetGetAttrNumber(attrs_offset_map[attoff]);
const TupleDesc compressed_tupdesc = aslot->compressed_slot->tts_tupleDescriptor;

if (is_compressed_col(compressed_tupdesc, cattno))
{
bool isnull;
Datum value = slot_getattr(aslot->child_slot, cattno, &isnull);

/* Can this ever be NULL? */
if (!isnull)
{
const ArrowColumnCache *acache = &aslot->arrow_cache;
const TupleDesc tupdesc = slot->tts_tupleDescriptor;
const Form_pg_attribute attr = TupleDescAttr(tupdesc, attoff);
aslot->arrow_arrays[attoff] = arrow_from_compressed(value,
attr->atttypid,
slot->tts_mcxt,
acache->decompression_mcxt);
}
}
}

return aslot->arrow_arrays[attoff];
}

static pg_attribute_always_inline ArrowArray *
set_attr_value(TupleTableSlot *slot, const int16 attoff)
{
Expand All @@ -486,13 +563,11 @@ set_attr_value(TupleTableSlot *slot, const int16 attoff)
}
else
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff);
ArrowArray **arrow_arrays = arrow_column_cache_read_one(aslot, attnum);

arrow_array = arrow_arrays[attoff];
arrow_array = get_arrow_array(aslot, attoff);

if (arrow_array == NULL)
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff);
/* Since the column is not the segment-by column, and there is no
* decompressed data, the column must be NULL. Use the default
* value. */
Expand All @@ -506,7 +581,7 @@ set_attr_value(TupleTableSlot *slot, const int16 attoff)
const Oid typid = attr->atttypid;
const int16 typlen = attr->attlen;
const NullableDatum datum =
arrow_get_datum(arrow_arrays[attoff], typid, typlen, aslot->tuple_index - 1);
arrow_get_datum(arrow_array, typid, typlen, aslot->tuple_index - 1);
slot->tts_values[attoff] = datum.value;
slot->tts_isnull[attoff] = datum.isnull;
}
Expand Down Expand Up @@ -777,7 +852,6 @@ arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
const int attoff = AttrNumberGetAttrOffset(attno);
ArrowArray **arrow_arrays;

TS_DEBUG_LOG("attno: %d, tuple_index: %d", attno, aslot->tuple_index);

Expand All @@ -800,8 +874,7 @@ arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno)
if (!aslot->valid_attrs[attoff])
return set_attr_value(slot, attoff);

arrow_arrays = arrow_column_cache_read_one(aslot, attno);
return arrow_arrays[attoff];
return get_arrow_array(aslot, attoff);
}

/*
Expand Down
1 change: 1 addition & 0 deletions tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typedef struct ArrowTupleTableSlot
uint16 total_row_count;
ArrowColumnCache arrow_cache;
ArrowColumnCacheEntry *arrow_cache_entry;
ArrowArray **arrow_arrays;
bool *referenced_attrs;
bool *segmentby_attrs;
bool *valid_attrs; /* Per-column validity up to "tts_nvalid" */
Expand Down
24 changes: 8 additions & 16 deletions tsl/test/expected/hypercore_columnar.out
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ $$, :'chunk'));
Scankey: (device < 4)
Vectorized Filter: (location = 2)
Rows Removed by Filter: 16
Array: cache misses=N, decompress count=N calls=N
(6 rows)
(5 rows)

-- Save away all data from the chunk so that we can compare.
create table saved as select * from :chunk;
Expand Down Expand Up @@ -139,8 +138,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Vectorized Filter: (humidity > '110'::double precision)
Rows Removed by Filter: 204
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select count(*) from :chunk where humidity > 110;
count
Expand All @@ -159,8 +157,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Vectorized Filter: (humidity > '50'::double precision)
Rows Removed by Filter: 87
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select lhs.count, rhs.count
from (select count(*) from :chunk where humidity > 50) lhs,
Expand Down Expand Up @@ -191,8 +188,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Filter: (temp > '50'::numeric)
Rows Removed by Filter: 204
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select count(*) from :chunk where temp > 50;
count
Expand All @@ -210,8 +206,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Filter: (temp > '20'::numeric)
Rows Removed by Filter: 98
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select lhs.count, rhs.count
from (select count(*) from :chunk where temp > 20) lhs,
Expand Down Expand Up @@ -242,8 +237,7 @@ select count(*) from :chunk where humidity > 40 and temp > 20;
Filter: (temp > '20'::numeric)
Rows Removed by Filter: 132
Vectorized Filter: (humidity > '40'::double precision)
Array: cache misses=30, decompress count=60 calls=165
(6 rows)
(5 rows)

select count(*) from :chunk where humidity > 40 and temp > 20;
count
Expand Down Expand Up @@ -272,8 +266,7 @@ $$, :'chunk'));
Rows Removed by Filter: 3
Scankey: (device = 3)
Vectorized Filter: (humidity > '40'::double precision)
Array: cache misses=N, decompress count=N calls=N
(7 rows)
(6 rows)

select count(*) from :chunk where humidity > 40 and temp > 20 and device = 3;
count
Expand Down Expand Up @@ -303,8 +296,7 @@ $$, :'chunk'));
-> Seq Scan on _hyper_I_N_chunk (actual rows=N loops=N)
Filter: (device < 4)
Rows Removed by Filter: 184
Array: cache misses=N, decompress count=N calls=N
(8 rows)
(7 rows)

drop table readings;
drop table saved;
3 changes: 1 addition & 2 deletions tsl/test/expected/hypercore_index_btree.out
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,7 @@ $$, :'chunk1'));
------------------------------------------------------------------------
Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Scankey: ((location_id >= 5) AND (location_id <= 10))
Array: cache misses=N, decompress count=N calls=N
(3 rows)
(2 rows)

-- These should generate decompressions as above, but for all columns.
select explain_analyze_anonymize(format($$
Expand Down
15 changes: 5 additions & 10 deletions tsl/test/expected/hypercore_scans.out
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ on conflict (location, device, time) do nothing;
-> Custom Scan (ChunkDispatch) (actual rows=8641 loops=1)
-> Subquery Scan on "*SELECT*" (actual rows=8641 loops=1)
-> Function Scan on generate_series t (actual rows=8641 loops=1)
Array: cache misses=2, decompress count=4 calls=4
(10 rows)
(9 rows)

-- This should show values for all columns
explain (analyze, costs off, timing off, summary off, decompress_cache_stats)
Expand Down Expand Up @@ -446,8 +445,7 @@ order by time desc;
-> Custom Scan (ColumnarScan) on _hyper_1_1_chunk (actual rows=88 loops=1)
Vectorized Filter: (location = '1'::text)
Rows Removed by Filter: 319
Array: cache misses=30, decompress count=84 calls=242
(7 rows)
(6 rows)

-- Save the data for comparison with seqscan
create temp table chunk_saved as
Expand Down Expand Up @@ -524,8 +522,7 @@ select count(*) from :chunk where location = 1::text;
-> Custom Scan (ColumnarScan) on _hyper_1_1_chunk (actual rows=89 loops=1)
Vectorized Filter: (location = '1'::text)
Rows Removed by Filter: 320
Array: cache misses=30, decompress count=30 calls=30
(5 rows)
(4 rows)

-- Testing same thing with SeqScan. It still decompresses in the
-- count(*) case, although it shouldn't have to. So, probably an
Expand All @@ -539,8 +536,7 @@ select count(*) from :chunk where device = 1;
-> Seq Scan on _hyper_1_1_chunk (actual rows=17 loops=1)
Filter: (device = 1)
Rows Removed by Filter: 392
Array: cache misses=30, decompress count=62 calls=410
(5 rows)
(4 rows)

explain (analyze, costs off, timing off, summary off, decompress_cache_stats)
select device from :chunk where device = 1;
Expand All @@ -559,8 +555,7 @@ select count(*) from :chunk where location = 1::text;
-> Seq Scan on _hyper_1_1_chunk (actual rows=89 loops=1)
Filter: (location = '1'::text)
Rows Removed by Filter: 320
Array: cache misses=30, decompress count=62 calls=410
(5 rows)
(4 rows)

-- ColumnarScan declares itself as projection capable. This query
-- would add a Result node on top if ColumnarScan couldn't project.
Expand Down
Loading