diff --git a/src/guc.c b/src/guc.c index 3629a393c62..03ce4a933f5 100644 --- a/src/guc.c +++ b/src/guc.c @@ -1101,12 +1101,13 @@ _guc_init(void) /* short_desc= */ "max number of entries in arrow data cache", /* long_desc= */ "The max number of decompressed arrow segments that can be " - "cached before entries are evicted. This mainly affects the " - "performance of index scans on the Hypercore TAM " - "when segments are accessed in non-sequential order.", + "cached before entries are evicted. This only affects the " + "performance of index scans when using Hypercore TAM " + "and segments are accessed in non-sequential order. " + "Set to 0 to disable the use of the cache.", /* valueAddr= */ &ts_guc_hypercore_arrow_cache_max_entries, /* bootValue= */ 25000, - /* minValue= */ 1, + /* minValue= */ 0, /* maxValue= */ INT_MAX, /* context= */ PGC_USERSET, /* flags= */ 0, diff --git a/tsl/src/hypercore/arrow_array.c b/tsl/src/hypercore/arrow_array.c index f4474b45c69..da94b3dda92 100644 --- a/tsl/src/hypercore/arrow_array.c +++ b/tsl/src/hypercore/arrow_array.c @@ -141,6 +141,15 @@ arrow_release_buffers(ArrowArray *array) arrow_private_release(array); } +void +arrow_release(ArrowArray *array) +{ + if (array->release != NULL) + array->release(array); + + pfree(array); +} + /* * Variable-size primitive layout ArrowArray from decompression iterator. */ diff --git a/tsl/src/hypercore/arrow_array.h b/tsl/src/hypercore/arrow_array.h index 71e96e34d81..12b80cc076b 100644 --- a/tsl/src/hypercore/arrow_array.h +++ b/tsl/src/hypercore/arrow_array.h @@ -15,3 +15,4 @@ extern NullableDatum arrow_get_datum(const ArrowArray *array, Oid typid, int16 t uint16 index); extern ArrowArray *arrow_from_compressed(Datum compressed, Oid typid, MemoryContext dest_mcxt, MemoryContext tmp_mcxt); +extern void arrow_release(ArrowArray *array); diff --git a/tsl/src/hypercore/arrow_cache.c b/tsl/src/hypercore/arrow_cache.c index e62440555b1..8efffb68364 100644 --- a/tsl/src/hypercore/arrow_cache.c +++ b/tsl/src/hypercore/arrow_cache.c @@ -174,13 +174,7 @@ arrow_cache_clear_entry(ArrowColumnCacheEntry *restrict entry) if (entry->arrow_arrays[i]) { ArrowArray *array = entry->arrow_arrays[i]; - - if (array->release) - { - array->release(array); - array->release = NULL; - } - pfree(array); + arrow_release(array); entry->arrow_arrays[i] = NULL; } } diff --git a/tsl/src/hypercore/arrow_tts.c b/tsl/src/hypercore/arrow_tts.c index c666434cdfc..2375a6d0b64 100644 --- a/tsl/src/hypercore/arrow_tts.c +++ b/tsl/src/hypercore/arrow_tts.c @@ -4,6 +4,7 @@ * LICENSE-TIMESCALE for a copy of the license. */ #include +#include "guc.h" #include #include #include @@ -94,6 +95,7 @@ tts_arrow_init(TupleTableSlot *slot) aslot->total_row_count = 0; aslot->referenced_attrs = NULL; aslot->arrow_qual_result = NULL; + aslot->arrow_arrays = NULL; /* * Set up child slots, one for the non-compressed relation and one for the @@ -112,6 +114,8 @@ tts_arrow_init(TupleTableSlot *slot) aslot->child_slot = aslot->noncompressed_slot; aslot->valid_attrs = palloc0(sizeof(bool) * slot->tts_tupleDescriptor->natts); aslot->segmentby_attrs = palloc0(sizeof(bool) * slot->tts_tupleDescriptor->natts); + aslot->arrow_arrays = palloc0(sizeof(ArrowArray *) * slot->tts_tupleDescriptor->natts); + /* Note that aslot->referenced_attrs is initialized on demand, and not * here, because NULL is a valid state for referenced_attrs. */ MemoryContextSwitchTo(oldmcxt); @@ -128,6 +132,24 @@ tts_arrow_init(TupleTableSlot *slot) GenerationContextCreateCompat(slot->tts_mcxt, "Per-segment memory context", 64 * 1024); } +static void +clear_arrow_arrays(TupleTableSlot *slot) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + if (aslot->arrow_arrays) + { + for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++) + { + if (aslot->arrow_arrays[i] != NULL) + { + arrow_release(aslot->arrow_arrays[i]); + aslot->arrow_arrays[i] = NULL; + } + } + } +} + /* * The release function is called by: * @@ -153,6 +175,7 @@ tts_arrow_release(TupleTableSlot *slot) aslot->compressed_slot = NULL; aslot->noncompressed_slot = NULL; aslot->arrow_cache_entry = NULL; + aslot->arrow_arrays = NULL; } static void @@ -269,6 +292,7 @@ tts_arrow_clear(TupleTableSlot *slot) memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts); aslot->arrow_cache_entry = NULL; aslot->arrow_qual_result = NULL; + clear_arrow_arrays(slot); MemoryContextReset(aslot->per_segment_mcxt); } @@ -276,6 +300,7 @@ static inline void tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 tuple_index) { ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + bool clear_arrow_data = true; Assert(!TTS_EMPTY(child_slot)); Assert(OidIsValid(slot->tts_tableOid)); @@ -315,6 +340,8 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t if (!ItemPointerEquals(&decoded_tid, &child_slot->tts_tid)) clear_arrow_parent(slot); + else + clear_arrow_data = false; } } @@ -339,6 +366,10 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t aslot->child_slot = child_slot; aslot->tuple_index = tuple_index; aslot->arrow_cache_entry = NULL; + + if (clear_arrow_data) + clear_arrow_arrays(slot); + /* Clear valid attributes */ memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts); MemoryContextReset(aslot->per_segment_mcxt); @@ -462,6 +493,52 @@ is_compressed_col(const TupleDesc tupdesc, AttrNumber attno) return coltypid == typinfo->type_oid; } +static inline ArrowArray * +get_arrow_array(ArrowTupleTableSlot *aslot, const int16 attoff) +{ + const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff); + TupleTableSlot *slot = &aslot->base.base; + + /* + * Only use the arrow array cache if the slot is used in an index scan and + * the cache hasn't been disabled by configuration. + */ + if (aslot->index_attrs != NULL && ts_guc_hypercore_arrow_cache_max_entries > 0) + { + ArrowArray **arrow_arrays = arrow_column_cache_read_one(aslot, attnum); + return arrow_arrays[attoff]; + } + + Assert(aslot->arrow_arrays); + + if (NULL == aslot->arrow_arrays[attoff]) + { + const int16 *attrs_offset_map = arrow_slot_get_attribute_offset_map(&aslot->base.base); + const AttrNumber cattno = AttrOffsetGetAttrNumber(attrs_offset_map[attoff]); + const TupleDesc compressed_tupdesc = aslot->compressed_slot->tts_tupleDescriptor; + + if (is_compressed_col(compressed_tupdesc, cattno)) + { + bool isnull; + Datum value = slot_getattr(aslot->child_slot, cattno, &isnull); + + /* Can this ever be NULL? */ + if (!isnull) + { + const ArrowColumnCache *acache = &aslot->arrow_cache; + const TupleDesc tupdesc = slot->tts_tupleDescriptor; + const Form_pg_attribute attr = TupleDescAttr(tupdesc, attoff); + aslot->arrow_arrays[attoff] = arrow_from_compressed(value, + attr->atttypid, + slot->tts_mcxt, + acache->decompression_mcxt); + } + } + } + + return aslot->arrow_arrays[attoff]; +} + static pg_attribute_always_inline ArrowArray * set_attr_value(TupleTableSlot *slot, const int16 attoff) { @@ -486,13 +563,11 @@ set_attr_value(TupleTableSlot *slot, const int16 attoff) } else { - const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff); - ArrowArray **arrow_arrays = arrow_column_cache_read_one(aslot, attnum); - - arrow_array = arrow_arrays[attoff]; + arrow_array = get_arrow_array(aslot, attoff); if (arrow_array == NULL) { + const AttrNumber attnum = AttrOffsetGetAttrNumber(attoff); /* Since the column is not the segment-by column, and there is no * decompressed data, the column must be NULL. Use the default * value. */ @@ -506,7 +581,7 @@ set_attr_value(TupleTableSlot *slot, const int16 attoff) const Oid typid = attr->atttypid; const int16 typlen = attr->attlen; const NullableDatum datum = - arrow_get_datum(arrow_arrays[attoff], typid, typlen, aslot->tuple_index - 1); + arrow_get_datum(arrow_array, typid, typlen, aslot->tuple_index - 1); slot->tts_values[attoff] = datum.value; slot->tts_isnull[attoff] = datum.isnull; } @@ -777,7 +852,6 @@ arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno) { ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; const int attoff = AttrNumberGetAttrOffset(attno); - ArrowArray **arrow_arrays; TS_DEBUG_LOG("attno: %d, tuple_index: %d", attno, aslot->tuple_index); @@ -800,8 +874,7 @@ arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno) if (!aslot->valid_attrs[attoff]) return set_attr_value(slot, attoff); - arrow_arrays = arrow_column_cache_read_one(aslot, attno); - return arrow_arrays[attoff]; + return get_arrow_array(aslot, attoff); } /* diff --git a/tsl/src/hypercore/arrow_tts.h b/tsl/src/hypercore/arrow_tts.h index 830af1362cf..26595ea8d03 100644 --- a/tsl/src/hypercore/arrow_tts.h +++ b/tsl/src/hypercore/arrow_tts.h @@ -73,6 +73,7 @@ typedef struct ArrowTupleTableSlot uint16 total_row_count; ArrowColumnCache arrow_cache; ArrowColumnCacheEntry *arrow_cache_entry; + ArrowArray **arrow_arrays; bool *referenced_attrs; bool *segmentby_attrs; bool *valid_attrs; /* Per-column validity up to "tts_nvalid" */ diff --git a/tsl/test/expected/hypercore_columnar.out b/tsl/test/expected/hypercore_columnar.out index 768f7fe846f..fe48e0786db 100644 --- a/tsl/test/expected/hypercore_columnar.out +++ b/tsl/test/expected/hypercore_columnar.out @@ -107,8 +107,7 @@ $$, :'chunk')); Scankey: (device < 4) Vectorized Filter: (location = 2) Rows Removed by Filter: 16 - Array: cache misses=N, decompress count=N calls=N -(6 rows) +(5 rows) -- Save away all data from the chunk so that we can compare. create table saved as select * from :chunk; @@ -139,8 +138,7 @@ $$, :'chunk')); -> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N) Vectorized Filter: (humidity > '110'::double precision) Rows Removed by Filter: 204 - Array: cache misses=N, decompress count=N calls=N -(5 rows) +(4 rows) select count(*) from :chunk where humidity > 110; count @@ -159,8 +157,7 @@ $$, :'chunk')); -> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N) Vectorized Filter: (humidity > '50'::double precision) Rows Removed by Filter: 87 - Array: cache misses=N, decompress count=N calls=N -(5 rows) +(4 rows) select lhs.count, rhs.count from (select count(*) from :chunk where humidity > 50) lhs, @@ -191,8 +188,7 @@ $$, :'chunk')); -> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N) Filter: (temp > '50'::numeric) Rows Removed by Filter: 204 - Array: cache misses=N, decompress count=N calls=N -(5 rows) +(4 rows) select count(*) from :chunk where temp > 50; count @@ -210,8 +206,7 @@ $$, :'chunk')); -> Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N) Filter: (temp > '20'::numeric) Rows Removed by Filter: 98 - Array: cache misses=N, decompress count=N calls=N -(5 rows) +(4 rows) select lhs.count, rhs.count from (select count(*) from :chunk where temp > 20) lhs, @@ -242,8 +237,7 @@ select count(*) from :chunk where humidity > 40 and temp > 20; Filter: (temp > '20'::numeric) Rows Removed by Filter: 132 Vectorized Filter: (humidity > '40'::double precision) - Array: cache misses=30, decompress count=60 calls=165 -(6 rows) +(5 rows) select count(*) from :chunk where humidity > 40 and temp > 20; count @@ -272,8 +266,7 @@ $$, :'chunk')); Rows Removed by Filter: 3 Scankey: (device = 3) Vectorized Filter: (humidity > '40'::double precision) - Array: cache misses=N, decompress count=N calls=N -(7 rows) +(6 rows) select count(*) from :chunk where humidity > 40 and temp > 20 and device = 3; count @@ -303,8 +296,7 @@ $$, :'chunk')); -> Seq Scan on _hyper_I_N_chunk (actual rows=N loops=N) Filter: (device < 4) Rows Removed by Filter: 184 - Array: cache misses=N, decompress count=N calls=N -(8 rows) +(7 rows) drop table readings; drop table saved; diff --git a/tsl/test/expected/hypercore_index_btree.out b/tsl/test/expected/hypercore_index_btree.out index e6c06d9fd53..60df5593d84 100644 --- a/tsl/test/expected/hypercore_index_btree.out +++ b/tsl/test/expected/hypercore_index_btree.out @@ -435,8 +435,7 @@ $$, :'chunk1')); ------------------------------------------------------------------------ Custom Scan (ColumnarScan) on _hyper_I_N_chunk (actual rows=N loops=N) Scankey: ((location_id >= 5) AND (location_id <= 10)) - Array: cache misses=N, decompress count=N calls=N -(3 rows) +(2 rows) -- These should generate decompressions as above, but for all columns. select explain_analyze_anonymize(format($$ diff --git a/tsl/test/expected/hypercore_scans.out b/tsl/test/expected/hypercore_scans.out index 1104116432f..b0ca8fb942a 100644 --- a/tsl/test/expected/hypercore_scans.out +++ b/tsl/test/expected/hypercore_scans.out @@ -377,8 +377,7 @@ on conflict (location, device, time) do nothing; -> Custom Scan (ChunkDispatch) (actual rows=8641 loops=1) -> Subquery Scan on "*SELECT*" (actual rows=8641 loops=1) -> Function Scan on generate_series t (actual rows=8641 loops=1) - Array: cache misses=2, decompress count=4 calls=4 -(10 rows) +(9 rows) -- This should show values for all columns explain (analyze, costs off, timing off, summary off, decompress_cache_stats) @@ -446,8 +445,7 @@ order by time desc; -> Custom Scan (ColumnarScan) on _hyper_1_1_chunk (actual rows=88 loops=1) Vectorized Filter: (location = '1'::text) Rows Removed by Filter: 319 - Array: cache misses=30, decompress count=84 calls=242 -(7 rows) +(6 rows) -- Save the data for comparison with seqscan create temp table chunk_saved as @@ -524,8 +522,7 @@ select count(*) from :chunk where location = 1::text; -> Custom Scan (ColumnarScan) on _hyper_1_1_chunk (actual rows=89 loops=1) Vectorized Filter: (location = '1'::text) Rows Removed by Filter: 320 - Array: cache misses=30, decompress count=30 calls=30 -(5 rows) +(4 rows) -- Testing same thing with SeqScan. It still decompresses in the -- count(*) case, although it shouldn't have to. So, probably an @@ -539,8 +536,7 @@ select count(*) from :chunk where device = 1; -> Seq Scan on _hyper_1_1_chunk (actual rows=17 loops=1) Filter: (device = 1) Rows Removed by Filter: 392 - Array: cache misses=30, decompress count=62 calls=410 -(5 rows) +(4 rows) explain (analyze, costs off, timing off, summary off, decompress_cache_stats) select device from :chunk where device = 1; @@ -559,8 +555,7 @@ select count(*) from :chunk where location = 1::text; -> Seq Scan on _hyper_1_1_chunk (actual rows=89 loops=1) Filter: (location = '1'::text) Rows Removed by Filter: 320 - Array: cache misses=30, decompress count=62 calls=410 -(5 rows) +(4 rows) -- ColumnarScan declares itself as projection capable. This query -- would add a Result node on top if ColumnarScan couldn't project.