Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix INSERT query performance on compressed chunk #6061

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/bugfix_6063
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6061 Fix INSERT query performs on compressed chunk
59 changes: 4 additions & 55 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,8 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);
Oid compressed_chunk_relid = ts_chunk_get_relid(uncompressed_chunk->fd.compressed_chunk_id, true);
Chunk *compressed_chunk = ts_chunk_get_by_relid(compressed_chunk_relid, true);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);
if (NULL == uncompressed_chunk)
Expand All @@ -1031,61 +1033,8 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
else // don't care what the idx oid is, data node will find it and open it
PG_RETURN_OID(uncompressed_chunk_id);
}
int32 srcht_id = uncompressed_chunk->fd.hypertable_id;
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

List *htcols_list = ts_hypertable_compression_get(srcht_id);
ListCell *lc;
int htcols_listlen = list_length(htcols_list);

const ColumnCompressionInfo **colinfo_array;
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);

int i = 0;

foreach (lc, htcols_list)
{
FormData_hypertable_compression *fd = (FormData_hypertable_compression *) lfirst(lc);
colinfo_array[i++] = fd;
}

const ColumnCompressionInfo **keys;
int n_keys;
int16 *in_column_offsets = compress_chunk_populate_keys(uncompressed_chunk->table_id,
colinfo_array,
htcols_listlen,
&n_keys,
&keys);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
uncompressed_rel_tupdesc,
compressed_chunk_rel,
htcols_listlen,
colinfo_array,
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
* by recompression later on, both in the case of segmentwise recompression, and
* in the case of decompress-compress. This implicitly locks the index too, so
* it cannot be dropped in another session, which is what we want to prevent by
* locking the compressed chunk here
*/
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);

row_compressor_finish(&row_compressor);

if (OidIsValid(row_compressor.index_oid))
Oid index_oid = get_index_on_compressed_chunk(uncompressed_chunk, compressed_chunk);
if (OidIsValid(index_oid))
{
PG_RETURN_OID(uncompressed_chunk_id);
}
Expand Down
286 changes: 223 additions & 63 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,62 @@ tsl_compressed_data_out(PG_FUNCTION_ARGS)
PG_RETURN_CSTRING(encoded);
}

Oid
get_index_on_compressed_chunk(Chunk *uncompressed_chunk, Chunk *compressed_chunk)
{
int32 srcht_id = uncompressed_chunk->fd.hypertable_id;
List *htcols_list = ts_hypertable_compression_get(srcht_id);
ListCell *lc;
int htcols_listlen = list_length(htcols_list);

const ColumnCompressionInfo **colinfo_array;
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);

int i = 0;
foreach (lc, htcols_list)
{
FormData_hypertable_compression *fd = (FormData_hypertable_compression *) lfirst(lc);
colinfo_array[i++] = fd;
}

const ColumnCompressionInfo **keys;
int n_keys;
int16 *in_column_offsets = compress_chunk_populate_keys(uncompressed_chunk->table_id,
colinfo_array,
htcols_listlen,
&n_keys,
&keys);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);

RowCompressor row_compressor;
row_compressor_init(&row_compressor,
uncompressed_rel_tupdesc,
compressed_chunk_rel,
htcols_listlen,
colinfo_array,
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
* by recompression later on, both in the case of segmentwise recompression, and
* in the case of decompress-compress. This implicitly locks the index too, so
* it cannot be dropped in another session, which is what we want to prevent by
* locking the compressed chunk here
*/
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended to keep the lock of the uncompressed_chunk_rel too? If so, could you add a comment why this is required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a comment just above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment only covers the compressed_chunk_rel. If the same applies to the uncompressed_chunk_rel it should be added to the comment.


row_compressor_finish(&row_compressor);
return row_compressor.index_oid;
}

extern CompressionStorage
compression_get_toast_storage(CompressionAlgorithms algorithm)
{
Expand Down Expand Up @@ -1986,9 +2042,75 @@ create_segment_filter_scankey(RowDecompressor *decompressor, char *segment_filte
return num_scankeys;
}

/*
* Helper method to build scankeys to do lookup into compressed chunk index
*/
static ScanKeyData *
fetch_index_scankeys(const Relation idxrel, Oid hypertable_relid, TupleTableSlot *uncompressedslot,
int *num_keys)
{
ScanKeyData *scankey = NULL;
int indnkeyatts;
int total_keys = 0;
int attoff;
bool isnull;
Datum indclassDatum;
oidvector *opclass;

indclassDatum =
SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass, &isnull);
Assert(!isnull);
opclass = (oidvector *) DatumGetPointer(indclassDatum);

indnkeyatts = IndexRelationGetNumberOfKeyAttributes(idxrel);
scankey = palloc0(sizeof(ScanKeyData) * indnkeyatts);

/* Build scankey for every attribute in the index. */
for (attoff = 0; attoff < indnkeyatts; attoff++)
{
Oid operator;
Oid opfamily;
RegProcedure regop;
int pkattno = attoff + 1;
Oid optype = get_opclass_input_type(opclass->values[attoff]);
/*
* Load the operator info. We need this to get the equality operator
* function for the scan key.
*/
opfamily = get_opclass_family(opclass->values[attoff]);
operator= get_opfamily_member(opfamily, optype, optype, BTEqualStrategyNumber);
regop = get_opcode(operator);

if (attoff < indnkeyatts - 1)
{
/* Initialize segmentby scankeys. */
/* get index attribute name */
Form_pg_attribute attr = TupleDescAttr(idxrel->rd_att, attoff);
AttrNumber attno = get_attnum(hypertable_relid, attr->attname.data);
bool isnull;
Datum value = slot_getattr(uncompressedslot, attno, &isnull);
ScanKeyInit(&scankey[attoff], pkattno, BTEqualStrategyNumber, regop, value);
if (uncompressedslot->tts_isnull[attno - 1])
scankey[attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
scankey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
total_keys++;
}
}
*num_keys = total_keys;
return scankey;
}

void
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot)
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *uncompressed_chunk,
TupleTableSlot *slot)
{
Relation idxrel;
IndexScanDesc index_scan = NULL;
TableScanDesc heapScan;
TupleTableSlot *index_slot = NULL;
Oid index_oid;
ScanKeyData *scankeys = NULL;
int num_scankeys;
Relation out_rel = cis->rel;

if (!ts_indexing_relation_has_primary_or_unique_index(out_rel))
Expand All @@ -2005,82 +2127,120 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
errmsg("inserting into compressed chunk with unique constraints disabled"),
errhint("Set timescaledb.enable_dml_decompression to TRUE.")));

Chunk *comp = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
Relation in_rel = relation_open(comp->table_id, RowExclusiveLock);
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);
Relation in_rel = relation_open(compressed_chunk->table_id, RowExclusiveLock);

RowDecompressor decompressor = build_decompressor(in_rel, out_rel);
/* fetch index details from compressed chunk */
index_oid = get_index_on_compressed_chunk(uncompressed_chunk, compressed_chunk);

Bitmapset *key_columns = RelationGetIndexAttrBitmap(out_rel, INDEX_ATTR_BITMAP_KEY);
Bitmapset *null_columns = NULL;

int num_scankeys;
ScanKeyData *scankeys = build_scankeys(chunk->fd.hypertable_id,
chunk->hypertable_relid,
decompressor,
key_columns,
&null_columns,
slot,
&num_scankeys);

bms_free(key_columns);

/*
* Using latest snapshot to scan the heap since we are doing this to build
* the index on the uncompressed chunks in order to do speculative insertion
* which is always built from all tuples (even in higher levels of isolation).
* By default there is always an index on compressed chunk which has segmentby
* columns. Use this index to fetch the required compress tuples to check for
* unqiue constraint violation.
*/
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);

for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
if (OidIsValid(index_oid))
{
Assert(HeapTupleIsValid(compressed_tuple));
bool valid = true;

/*
* Since the heap scan API does not support SK_SEARCHNULL we have to check
* for NULL values manually when those are part of the constraints.
*/
for (int attno = bms_next_member(null_columns, -1); attno >= 0;
attno = bms_next_member(null_columns, attno))
idxrel = index_open(index_oid, ExclusiveLock);
scankeys =
fetch_index_scankeys(idxrel, uncompressed_chunk->hypertable_relid, slot, &num_scankeys);
index_scan = index_beginscan(in_rel, idxrel, GetLatestSnapshot(), num_scankeys, 0);
index_rescan(index_scan, scankeys, num_scankeys, NULL, 0);
index_slot = table_slot_create(in_rel, NULL);
/* fetch matching compressed tuple based on segmentby column values */
while (index_getnext_slot(index_scan, ForwardScanDirection, index_slot))
{
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc))
{
valid = false;
break;
}
HeapTuple compressed_tuple = ExecFetchSlotHeapTuple(index_slot, false, NULL);
heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);
write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row(&decompressor, NULL);
write_logical_replication_msg_decompression_end();

TM_FailureData tmfd;
/* silence gcc warning for unused variable */
TM_Result result pg_attribute_unused();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was already present in the old code. Could you please add a comment to explain why it is safe to ignore the result of the operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you added also an assert. My question was more why is it safe to ignore the result of the operation? Maybe you could add this information to the comment.

result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
decompressor.mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
Assert(result == TM_Ok);
}
ExecDropSingleTupleTableSlot(index_slot);
index_endscan(index_scan);
index_close(idxrel, ExclusiveLock);
}
else
{
scankeys = build_scankeys(uncompressed_chunk->fd.hypertable_id,
uncompressed_chunk->hypertable_relid,
decompressor,
key_columns,
&null_columns,
slot,
&num_scankeys);
bms_free(key_columns);
heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);

for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
{
Assert(HeapTupleIsValid(compressed_tuple));
bool valid = true;

/*
* Skip if NULL check failed.
*/
if (!valid)
continue;

heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row(&decompressor, NULL);
write_logical_replication_msg_decompression_end();
/*
* Since the heap scan API does not support SK_SEARCHNULL we have to check
* for NULL values manually when those are part of the constraints.
*/
for (int attno = bms_next_member(null_columns, -1); attno >= 0;
attno = bms_next_member(null_columns, attno))
{
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc))
{
valid = false;
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tsl/test/sql/compression_conflicts.sql has test which will cover these lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems Codecov still does not see any coverage here. Which SQL statement from the test should cover these lines?

}
}

TM_FailureData tmfd;
TM_Result result pg_attribute_unused();
result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
decompressor.mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
Assert(result == TM_Ok);
/*
* Skip if NULL check failed.
*/
if (!valid)
continue;
heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);
write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row(&decompressor, NULL);
write_logical_replication_msg_decompression_end();

TM_FailureData tmfd;
/* silence gcc warning for unused variable */
TM_Result result pg_attribute_unused();
result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
decompressor.mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
Assert(result == TM_Ok);
}
table_endscan(heapScan);
}

table_endscan(heapScan);

ts_catalog_close_indexes(decompressor.indexstate);
FreeBulkInsertState(decompressor.bistate);

Expand Down
Loading