Skip to content

Commit

Permalink
Fix INSERT query performs on compressed chunk
Browse files Browse the repository at this point in the history
The INSERT query with ON CONFLICT on compressed chunk does a
heapscan to verify unique constraint violation. This patch
improves the performance by doing an indexscan on compressed
chunk, to fetch matching records based on segmentby columns.
These matching records are inserted into uncompressed chunk,
then unique constraint violation is verified.

Since index on compressed chunk has only segmentby columns,
we cannot do a point lookup considering orderby columns as well.
This patch thus will result in decompressing matching
records only based on segmentby columns.

Fixes #6063
  • Loading branch information
sb230132 committed Sep 12, 2023
1 parent 93519d0 commit e127cc8
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 141 deletions.
1 change: 1 addition & 0 deletions .unreleased/bugfix_6063
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6061 Fix INSERT query performs on compressed chunk
58 changes: 3 additions & 55 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);
if (NULL == uncompressed_chunk)
Expand All @@ -1031,61 +1032,8 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
else // don't care what the idx oid is, data node will find it and open it
PG_RETURN_OID(uncompressed_chunk_id);
}
int32 srcht_id = uncompressed_chunk->fd.hypertable_id;
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

List *htcols_list = ts_hypertable_compression_get(srcht_id);
ListCell *lc;
int htcols_listlen = list_length(htcols_list);

const ColumnCompressionInfo **colinfo_array;
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);

int i = 0;

foreach (lc, htcols_list)
{
FormData_hypertable_compression *fd = (FormData_hypertable_compression *) lfirst(lc);
colinfo_array[i++] = fd;
}

const ColumnCompressionInfo **keys;
int n_keys;
int16 *in_column_offsets = compress_chunk_populate_keys(uncompressed_chunk->table_id,
colinfo_array,
htcols_listlen,
&n_keys,
&keys);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
uncompressed_rel_tupdesc,
compressed_chunk_rel,
htcols_listlen,
colinfo_array,
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
* by recompression later on, both in the case of segmentwise recompression, and
* in the case of decompress-compress. This implicitly locks the index too, so
* it cannot be dropped in another session, which is what we want to prevent by
* locking the compressed chunk here
*/
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);

row_compressor_finish(&row_compressor);

if (OidIsValid(row_compressor.index_oid))
Oid index_oid = get_index_on_compressed_chunk(uncompressed_chunk, compressed_chunk);
if (OidIsValid(index_oid))
{
PG_RETURN_OID(uncompressed_chunk_id);
}
Expand Down
289 changes: 226 additions & 63 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,62 @@ tsl_compressed_data_out(PG_FUNCTION_ARGS)
PG_RETURN_CSTRING(encoded);
}

Oid
get_index_on_compressed_chunk(Chunk *uncompressed_chunk, Chunk *compressed_chunk)
{
int32 srcht_id = uncompressed_chunk->fd.hypertable_id;
List *htcols_list = ts_hypertable_compression_get(srcht_id);
ListCell *lc;
int htcols_listlen = list_length(htcols_list);

const ColumnCompressionInfo **colinfo_array;
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);

int i = 0;
foreach (lc, htcols_list)
{
FormData_hypertable_compression *fd = (FormData_hypertable_compression *) lfirst(lc);
colinfo_array[i++] = fd;
}

const ColumnCompressionInfo **keys;
int n_keys;
int16 *in_column_offsets = compress_chunk_populate_keys(uncompressed_chunk->table_id,
colinfo_array,
htcols_listlen,
&n_keys,
&keys);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
uncompressed_rel_tupdesc,
compressed_chunk_rel,
htcols_listlen,
colinfo_array,
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
* by recompression later on, both in the case of segmentwise recompression, and
* in the case of decompress-compress. This implicitly locks the index too, so
* it cannot be dropped in another session, which is what we want to prevent by
* locking the compressed chunk here
*/
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);

row_compressor_finish(&row_compressor);
return row_compressor.index_oid;
}

extern CompressionStorage
compression_get_toast_storage(CompressionAlgorithms algorithm)
{
Expand Down Expand Up @@ -1986,9 +2042,78 @@ create_segment_filter_scankey(RowDecompressor *decompressor, char *segment_filte
return num_scankeys;
}

/*
* Helper method to build scankeys to do lookup into compressed chunk index
*/
static ScanKeyData *
fetch_index_scankeys(const Relation idxrel, Oid hypertable_relid, TupleTableSlot *uncompressedslot,
int *num_keys, const int key_flags)
{
ScanKeyData *scankey = NULL;
int indnkeyatts;
int total_keys = 0;
int attoff;
bool isnull;
Datum indclassDatum;
oidvector *opclass;

indclassDatum =
SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass, &isnull);
Assert(!isnull);
opclass = (oidvector *) DatumGetPointer(indclassDatum);

indnkeyatts = IndexRelationGetNumberOfKeyAttributes(idxrel);
scankey = palloc0(sizeof(ScanKeyData) * indnkeyatts);

/* Build scankey for every attribute in the index. */
for (attoff = 0; attoff < indnkeyatts; attoff++)
{
Oid operator;
Oid opfamily;
RegProcedure regop;
int pkattno = attoff + 1;
Oid optype = get_opclass_input_type(opclass->values[attoff]);
/*
* Load the operator info. We need this to get the equality operator
* function for the scan key.
*/
opfamily = get_opclass_family(opclass->values[attoff]);
operator= get_opfamily_member(opfamily, optype, optype, BTEqualStrategyNumber);
regop = get_opcode(operator);

if (attoff < indnkeyatts - 1)
{
/* Initialize segmentby scankeys. */
if (key_flags & SEGMENTBY_KEYS)
{
/* get index attribute name */
Form_pg_attribute attr = TupleDescAttr(idxrel->rd_att, attoff);
AttrNumber attno = get_attnum(hypertable_relid, attr->attname.data);
bool isnull;
Datum value = slot_getattr(uncompressedslot, attno, &isnull);
ScanKeyInit(&scankey[attoff], pkattno, BTEqualStrategyNumber, regop, value);
if (uncompressedslot->tts_isnull[attno - 1])
scankey[attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
scankey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
total_keys++;
}
}
}
*num_keys = total_keys;
return scankey;
}

void
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot)
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *uncompressed_chunk,
TupleTableSlot *slot)
{
Relation idxrel;
IndexScanDesc index_scan = NULL;
TableScanDesc heapScan;
TupleTableSlot *index_slot = NULL;
Oid index_oid;
ScanKeyData *scankeys = NULL;
int num_scankeys;
Relation out_rel = cis->rel;

if (!ts_indexing_relation_has_primary_or_unique_index(out_rel))
Expand All @@ -2005,82 +2130,120 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
errmsg("inserting into compressed chunk with unique constraints disabled"),
errhint("Set timescaledb.enable_dml_decompression to TRUE.")));

Chunk *comp = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
Relation in_rel = relation_open(comp->table_id, RowExclusiveLock);
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);
Relation in_rel = relation_open(compressed_chunk->table_id, RowExclusiveLock);

RowDecompressor decompressor = build_decompressor(in_rel, out_rel);
/* fetch index details from compressed chunk */
index_oid = get_index_on_compressed_chunk(uncompressed_chunk, compressed_chunk);

Bitmapset *key_columns = RelationGetIndexAttrBitmap(out_rel, INDEX_ATTR_BITMAP_KEY);
Bitmapset *null_columns = NULL;

int num_scankeys;
ScanKeyData *scankeys = build_scankeys(chunk->fd.hypertable_id,
chunk->hypertable_relid,
decompressor,
key_columns,
&null_columns,
slot,
&num_scankeys);

bms_free(key_columns);

/*
* Using latest snapshot to scan the heap since we are doing this to build
* the index on the uncompressed chunks in order to do speculative insertion
* which is always built from all tuples (even in higher levels of isolation).
* By default there is always an index on compressed chunk which has segmentby
* columns. Use this index to fetch the required compress tuples to check for
* unqiue constraint violation.
*/
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);

for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
if (OidIsValid(index_oid))
{
idxrel = index_open(index_oid, ExclusiveLock);
scankeys = fetch_index_scankeys(idxrel,
uncompressed_chunk->hypertable_relid,
slot,
&num_scankeys,
SEGMENTBY_KEYS);
index_scan = index_beginscan(in_rel, idxrel, GetLatestSnapshot(), num_scankeys, 0);
index_rescan(index_scan, scankeys, num_scankeys, NULL, 0);
index_slot = table_slot_create(in_rel, NULL);
/* fetch matching compressed tuple based on segmentby column values */
while (index_getnext_slot(index_scan, ForwardScanDirection, index_slot))
{
HeapTuple compressed_tuple = ExecFetchSlotHeapTuple(index_slot, false, NULL);
heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);
write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row(&decompressor, NULL);
write_logical_replication_msg_decompression_end();

TM_FailureData tmfd;
TM_Result result pg_attribute_unused();
result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
decompressor.mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
}
ExecDropSingleTupleTableSlot(index_slot);
index_endscan(index_scan);
index_close(idxrel, ExclusiveLock);
}
else
{
Assert(HeapTupleIsValid(compressed_tuple));
bool valid = true;

/*
* Since the heap scan API does not support SK_SEARCHNULL we have to check
* for NULL values manually when those are part of the constraints.
*/
for (int attno = bms_next_member(null_columns, -1); attno >= 0;
attno = bms_next_member(null_columns, attno))
scankeys = build_scankeys(uncompressed_chunk->fd.hypertable_id,
uncompressed_chunk->hypertable_relid,
decompressor,
key_columns,
&null_columns,
slot,
&num_scankeys);
bms_free(key_columns);
heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);

for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
{
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc))
Assert(HeapTupleIsValid(compressed_tuple));
bool valid = true;

/*
* Since the heap scan API does not support SK_SEARCHNULL we have to check
* for NULL values manually when those are part of the constraints.
*/
for (int attno = bms_next_member(null_columns, -1); attno >= 0;
attno = bms_next_member(null_columns, attno))
{
valid = false;
break;
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc))
{
valid = false;
break;
}
}
}

/*
* Skip if NULL check failed.
*/
if (!valid)
continue;

heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row(&decompressor, NULL);
write_logical_replication_msg_decompression_end();

TM_FailureData tmfd;
TM_Result result pg_attribute_unused();
result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
decompressor.mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
Assert(result == TM_Ok);
/*
* Skip if NULL check failed.
*/
if (!valid)
continue;
heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);
write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row(&decompressor, NULL);
write_logical_replication_msg_decompression_end();

TM_FailureData tmfd;
TM_Result result pg_attribute_unused();
result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
decompressor.mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
Assert(result == TM_Ok);
}
table_endscan(heapScan);
}

table_endscan(heapScan);

ts_catalog_close_indexes(decompressor.indexstate);
FreeBulkInsertState(decompressor.bistate);

Expand Down
Loading

0 comments on commit e127cc8

Please sign in to comment.