Skip to content

Commit

Permalink
Use arrow cache only for index scans
Browse files Browse the repository at this point in the history
Since compressed segments are accessed sequentially during
sequence/columnar scans, it is not necessary to cache the
decompressed arrow arrays.

Instead, use the arrow cache only if an index scan is detected. Also
make it possible to manually disable the cache by setting the GUC
hypercore_arrow_cache_max_entries to 0.
  • Loading branch information
erimatnor committed Feb 18, 2025
1 parent c444b1a commit f9606d0
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 47 deletions.
9 changes: 5 additions & 4 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1101,12 +1101,13 @@ _guc_init(void)
/* short_desc= */ "max number of entries in arrow data cache",
/* long_desc= */
"The max number of decompressed arrow segments that can be "
"cached before entries are evicted. This mainly affects the "
"performance of index scans on the Hypercore TAM "
"when segments are accessed in non-sequential order.",
"cached before entries are evicted. This only affects the "
"performance of index scans when using Hypercore TAM "
"and segments are accessed in non-sequential order. "
"Set to 0 to disable the use of the cache.",
/* valueAddr= */ &ts_guc_hypercore_arrow_cache_max_entries,
/* bootValue= */ 25000,
/* minValue= */ 1,
/* minValue= */ 0,
/* maxValue= */ INT_MAX,
/* context= */ PGC_USERSET,
/* flags= */ 0,
Expand Down
9 changes: 9 additions & 0 deletions tsl/src/hypercore/arrow_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ arrow_release_buffers(ArrowArray *array)
arrow_private_release(array);
}

void
arrow_release(ArrowArray *array)

Check warning on line 145 in tsl/src/hypercore/arrow_array.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_array.c#L145

Added line #L145 was not covered by tests
{
if (array->release != NULL)
array->release(array);

Check warning on line 148 in tsl/src/hypercore/arrow_array.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_array.c#L148

Added line #L148 was not covered by tests

pfree(array);
}

Check warning on line 151 in tsl/src/hypercore/arrow_array.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_array.c#L150-L151

Added lines #L150 - L151 were not covered by tests

/*
* Variable-size primitive layout ArrowArray from decompression iterator.
*/
Expand Down
1 change: 1 addition & 0 deletions tsl/src/hypercore/arrow_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ extern NullableDatum arrow_get_datum(const ArrowArray *array, Oid typid, int16 t
uint16 index);
extern ArrowArray *arrow_from_compressed(Datum compressed, Oid typid, MemoryContext dest_mcxt,
MemoryContext tmp_mcxt);
extern void arrow_release(ArrowArray *array);
8 changes: 1 addition & 7 deletions tsl/src/hypercore/arrow_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,7 @@ arrow_cache_clear_entry(ArrowColumnCacheEntry *restrict entry)
if (entry->arrow_arrays[i])
{
ArrowArray *array = entry->arrow_arrays[i];

if (array->release)
{
array->release(array);
array->release = NULL;
}
pfree(array);
arrow_release(array);

Check warning on line 177 in tsl/src/hypercore/arrow_cache.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_cache.c#L177

Added line #L177 was not covered by tests
entry->arrow_arrays[i] = NULL;
}
}
Expand Down
89 changes: 81 additions & 8 deletions tsl/src/hypercore/arrow_tts.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include "guc.h"
#include <access/attnum.h>
#include <access/htup_details.h>
#include <access/tupdesc.h>
Expand Down Expand Up @@ -94,6 +95,7 @@ tts_arrow_init(TupleTableSlot *slot)
aslot->total_row_count = 0;
aslot->referenced_attrs = NULL;
aslot->arrow_qual_result = NULL;
aslot->arrow_arrays = NULL;

Check warning on line 98 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L98

Added line #L98 was not covered by tests

/*
* Set up child slots, one for the non-compressed relation and one for the
Expand All @@ -112,6 +114,8 @@ tts_arrow_init(TupleTableSlot *slot)
aslot->child_slot = aslot->noncompressed_slot;
aslot->valid_attrs = palloc0(sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->segmentby_attrs = palloc0(sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->arrow_arrays = palloc0(sizeof(ArrowArray *) * slot->tts_tupleDescriptor->natts);

Check warning on line 117 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L117

Added line #L117 was not covered by tests

/* Note that aslot->referenced_attrs is initialized on demand, and not
* here, because NULL is a valid state for referenced_attrs. */
MemoryContextSwitchTo(oldmcxt);
Expand All @@ -128,6 +132,24 @@ tts_arrow_init(TupleTableSlot *slot)
GenerationContextCreateCompat(slot->tts_mcxt, "Per-segment memory context", 64 * 1024);
}

static void
clear_arrow_arrays(TupleTableSlot *slot)

Check warning on line 136 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L136

Added line #L136 was not covered by tests
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;

Check warning on line 138 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L138

Added line #L138 was not covered by tests

if (aslot->arrow_arrays)
{
for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++)
{
if (aslot->arrow_arrays[i] != NULL)
{
arrow_release(aslot->arrow_arrays[i]);
aslot->arrow_arrays[i] = NULL;

Check warning on line 147 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L146-L147

Added lines #L146 - L147 were not covered by tests
}
}
}
}

Check warning on line 151 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L151

Added line #L151 was not covered by tests

/*
* The release function is called by:
*
Expand All @@ -153,6 +175,7 @@ tts_arrow_release(TupleTableSlot *slot)
aslot->compressed_slot = NULL;
aslot->noncompressed_slot = NULL;
aslot->arrow_cache_entry = NULL;
aslot->arrow_arrays = NULL;

Check warning on line 178 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L178

Added line #L178 was not covered by tests
}

static void
Expand Down Expand Up @@ -269,13 +292,15 @@ tts_arrow_clear(TupleTableSlot *slot)
memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->arrow_cache_entry = NULL;
aslot->arrow_qual_result = NULL;
clear_arrow_arrays(slot);

Check warning on line 295 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L295

Added line #L295 was not covered by tests
MemoryContextReset(aslot->per_segment_mcxt);
}

static inline void
tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 tuple_index)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
bool clear_arrow_data = true;

Check warning on line 303 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L303

Added line #L303 was not covered by tests

Assert(!TTS_EMPTY(child_slot));
Assert(OidIsValid(slot->tts_tableOid));
Expand Down Expand Up @@ -315,6 +340,8 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t

if (!ItemPointerEquals(&decoded_tid, &child_slot->tts_tid))
clear_arrow_parent(slot);
else
clear_arrow_data = false;

Check warning on line 344 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L344

Added line #L344 was not covered by tests
}
}

Expand All @@ -339,6 +366,10 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t
aslot->child_slot = child_slot;
aslot->tuple_index = tuple_index;
aslot->arrow_cache_entry = NULL;

if (clear_arrow_data)
clear_arrow_arrays(slot);

Check warning on line 371 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L371

Added line #L371 was not covered by tests

/* Clear valid attributes */
memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts);
MemoryContextReset(aslot->per_segment_mcxt);
Expand Down Expand Up @@ -462,6 +493,52 @@ is_compressed_col(const TupleDesc tupdesc, AttrNumber attno)
return coltypid == typinfo->type_oid;
}

static inline ArrowArray *
get_arrow_array(ArrowTupleTableSlot *aslot, const int16 attoff)

Check warning on line 497 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L497

Added line #L497 was not covered by tests
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff);
TupleTableSlot *slot = &aslot->base.base;

Check warning on line 500 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L499-L500

Added lines #L499 - L500 were not covered by tests

/*
* Only use the arrow array cache if the slot is used in an index scan and
* the cache hasn't been disabled by configuration.
*/
if (aslot->index_attrs != NULL && ts_guc_hypercore_arrow_cache_max_entries > 0)
{
ArrowArray **arrow_arrays = arrow_column_cache_read_one(aslot, attnum);
return arrow_arrays[attoff];

Check warning on line 509 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L508-L509

Added lines #L508 - L509 were not covered by tests
}

Assert(aslot->arrow_arrays);

if (NULL == aslot->arrow_arrays[attoff])
{
const int16 *attrs_offset_map = arrow_slot_get_attribute_offset_map(&aslot->base.base);
const AttrNumber cattno = AttrOffsetGetAttrNumber(attrs_offset_map[attoff]);
const TupleDesc compressed_tupdesc = aslot->compressed_slot->tts_tupleDescriptor;

Check warning on line 518 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L517-L518

Added lines #L517 - L518 were not covered by tests

if (is_compressed_col(compressed_tupdesc, cattno))
{
bool isnull;
Datum value = slot_getattr(aslot->child_slot, cattno, &isnull);

Check warning on line 523 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L523

Added line #L523 was not covered by tests

/* Can this ever be NULL? */
if (!isnull)
{
const ArrowColumnCache *acache = &aslot->arrow_cache;
const TupleDesc tupdesc = slot->tts_tupleDescriptor;
const Form_pg_attribute attr = TupleDescAttr(tupdesc, attoff);
aslot->arrow_arrays[attoff] = arrow_from_compressed(value,

Check warning on line 531 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L528-L531

Added lines #L528 - L531 were not covered by tests
attr->atttypid,
slot->tts_mcxt,
acache->decompression_mcxt);

Check warning on line 534 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L534

Added line #L534 was not covered by tests
}
}
}

return aslot->arrow_arrays[attoff];

Check warning on line 539 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L539

Added line #L539 was not covered by tests
}

static pg_attribute_always_inline ArrowArray *
set_attr_value(TupleTableSlot *slot, const int16 attoff)
{
Expand All @@ -486,13 +563,11 @@ set_attr_value(TupleTableSlot *slot, const int16 attoff)
}
else
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff);
ArrowArray **arrow_arrays = arrow_column_cache_read_one(aslot, attnum);

arrow_array = arrow_arrays[attoff];
arrow_array = get_arrow_array(aslot, attoff);

Check warning on line 566 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L566

Added line #L566 was not covered by tests

if (arrow_array == NULL)
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff);

Check warning on line 570 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L570

Added line #L570 was not covered by tests
/* Since the column is not the segment-by column, and there is no
* decompressed data, the column must be NULL. Use the default
* value. */
Expand All @@ -506,7 +581,7 @@ set_attr_value(TupleTableSlot *slot, const int16 attoff)
const Oid typid = attr->atttypid;
const int16 typlen = attr->attlen;
const NullableDatum datum =
arrow_get_datum(arrow_arrays[attoff], typid, typlen, aslot->tuple_index - 1);
arrow_get_datum(arrow_array, typid, typlen, aslot->tuple_index - 1);

Check warning on line 584 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L584

Added line #L584 was not covered by tests
slot->tts_values[attoff] = datum.value;
slot->tts_isnull[attoff] = datum.isnull;
}
Expand Down Expand Up @@ -777,7 +852,6 @@ arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
const int attoff = AttrNumberGetAttrOffset(attno);
ArrowArray **arrow_arrays;

TS_DEBUG_LOG("attno: %d, tuple_index: %d", attno, aslot->tuple_index);

Expand All @@ -800,8 +874,7 @@ arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno)
if (!aslot->valid_attrs[attoff])
return set_attr_value(slot, attoff);

arrow_arrays = arrow_column_cache_read_one(aslot, attno);
return arrow_arrays[attoff];
return get_arrow_array(aslot, attoff);

Check warning on line 877 in tsl/src/hypercore/arrow_tts.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/arrow_tts.c#L877

Added line #L877 was not covered by tests
}

/*
Expand Down
1 change: 1 addition & 0 deletions tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typedef struct ArrowTupleTableSlot
uint16 total_row_count;
ArrowColumnCache arrow_cache;
ArrowColumnCacheEntry *arrow_cache_entry;
ArrowArray **arrow_arrays;
bool *referenced_attrs;
bool *segmentby_attrs;
bool *valid_attrs; /* Per-column validity up to "tts_nvalid" */
Expand Down
24 changes: 8 additions & 16 deletions tsl/test/expected/hypercore_columnar.out
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ $$, :'chunk'));
Scankey: (device < 4)
Vectorized Filter: (location = 2)
Rows Removed by Filter: 16
Array: cache misses=N, decompress count=N calls=N
(6 rows)
(5 rows)

-- Save away all data from the chunk so that we can compare.
create table saved as select * from :chunk;
Expand Down Expand Up @@ -139,8 +138,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Vectorized Filter: (humidity > '110'::double precision)
Rows Removed by Filter: 204
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select count(*) from :chunk where humidity > 110;
count
Expand All @@ -159,8 +157,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Vectorized Filter: (humidity > '50'::double precision)
Rows Removed by Filter: 87
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select lhs.count, rhs.count
from (select count(*) from :chunk where humidity > 50) lhs,
Expand Down Expand Up @@ -191,8 +188,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Filter: (temp > '50'::numeric)
Rows Removed by Filter: 204
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select count(*) from :chunk where temp > 50;
count
Expand All @@ -210,8 +206,7 @@ $$, :'chunk'));
-> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Filter: (temp > '20'::numeric)
Rows Removed by Filter: 98
Array: cache misses=N, decompress count=N calls=N
(5 rows)
(4 rows)

select lhs.count, rhs.count
from (select count(*) from :chunk where temp > 20) lhs,
Expand Down Expand Up @@ -242,8 +237,7 @@ select count(*) from :chunk where humidity > 40 and temp > 20;
Filter: (temp > '20'::numeric)
Rows Removed by Filter: 132
Vectorized Filter: (humidity > '40'::double precision)
Array: cache misses=30, decompress count=60 calls=165
(6 rows)
(5 rows)

select count(*) from :chunk where humidity > 40 and temp > 20;
count
Expand Down Expand Up @@ -272,8 +266,7 @@ $$, :'chunk'));
Rows Removed by Filter: 3
Scankey: (device = 3)
Vectorized Filter: (humidity > '40'::double precision)
Array: cache misses=N, decompress count=N calls=N
(7 rows)
(6 rows)

select count(*) from :chunk where humidity > 40 and temp > 20 and device = 3;
count
Expand Down Expand Up @@ -303,8 +296,7 @@ $$, :'chunk'));
-> Seq Scan on _hyper_I_N_chunk (actual rows=N loops=N)
Filter: (device < 4)
Rows Removed by Filter: 184
Array: cache misses=N, decompress count=N calls=N
(8 rows)
(7 rows)

drop table readings;
drop table saved;
3 changes: 1 addition & 2 deletions tsl/test/expected/hypercore_index_btree.out
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,7 @@ $$, :'chunk1'));
------------------------------------------------------------------------
Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N)
Scankey: ((location_id >= 5) AND (location_id <= 10))
Array: cache misses=N, decompress count=N calls=N
(3 rows)
(2 rows)

-- These should generate decompressions as above, but for all columns.
select explain_analyze_anonymize(format($$
Expand Down
15 changes: 5 additions & 10 deletions tsl/test/expected/hypercore_scans.out
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ on conflict (location, device, time) do nothing;
-> Custom Scan (ChunkDispatch) (actual rows=8641 loops=1)
-> Subquery Scan on "*SELECT*" (actual rows=8641 loops=1)
-> Function Scan on generate_series t (actual rows=8641 loops=1)
Array: cache misses=2, decompress count=4 calls=4
(10 rows)
(9 rows)

-- This should show values for all columns
explain (analyze, costs off, timing off, summary off, decompress_cache_stats)
Expand Down Expand Up @@ -446,8 +445,7 @@ order by time desc;
-> Custom Scan (ColumnarScan) on _hyper_1_1_chunk (actual rows=88 loops=1)
Vectorized Filter: (location = '1'::text)
Rows Removed by Filter: 319
Array: cache misses=30, decompress count=84 calls=242
(7 rows)
(6 rows)

-- Save the data for comparison with seqscan
create temp table chunk_saved as
Expand Down Expand Up @@ -524,8 +522,7 @@ select count(*) from :chunk where location = 1::text;
-> Custom Scan (ColumnarScan) on _hyper_1_1_chunk (actual rows=89 loops=1)
Vectorized Filter: (location = '1'::text)
Rows Removed by Filter: 320
Array: cache misses=30, decompress count=30 calls=30
(5 rows)
(4 rows)

-- Testing same thing with SeqScan. It still decompresses in the
-- count(*) case, although it shouldn't have to. So, probably an
Expand All @@ -539,8 +536,7 @@ select count(*) from :chunk where device = 1;
-> Seq Scan on _hyper_1_1_chunk (actual rows=17 loops=1)
Filter: (device = 1)
Rows Removed by Filter: 392
Array: cache misses=30, decompress count=62 calls=410
(5 rows)
(4 rows)

explain (analyze, costs off, timing off, summary off, decompress_cache_stats)
select device from :chunk where device = 1;
Expand All @@ -559,8 +555,7 @@ select count(*) from :chunk where location = 1::text;
-> Seq Scan on _hyper_1_1_chunk (actual rows=89 loops=1)
Filter: (location = '1'::text)
Rows Removed by Filter: 320
Array: cache misses=30, decompress count=62 calls=410
(5 rows)
(4 rows)

-- ColumnarScan declares itself as projection capable. This query
-- would add a Result node on top if ColumnarScan couldn't project.
Expand Down

0 comments on commit f9606d0

Please sign in to comment.