-
Notifications
You must be signed in to change notification settings - Fork 900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix INSERT query performance on compressed chunk #6061
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fixes: #6061 Fix INSERT query performs on compressed chunk |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1836,6 +1836,62 @@ tsl_compressed_data_out(PG_FUNCTION_ARGS) | |
PG_RETURN_CSTRING(encoded); | ||
} | ||
|
||
Oid | ||
get_index_on_compressed_chunk(Chunk *uncompressed_chunk, Chunk *compressed_chunk) | ||
{ | ||
int32 srcht_id = uncompressed_chunk->fd.hypertable_id; | ||
List *htcols_list = ts_hypertable_compression_get(srcht_id); | ||
ListCell *lc; | ||
int htcols_listlen = list_length(htcols_list); | ||
|
||
const ColumnCompressionInfo **colinfo_array; | ||
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen); | ||
|
||
int i = 0; | ||
foreach (lc, htcols_list) | ||
{ | ||
FormData_hypertable_compression *fd = (FormData_hypertable_compression *) lfirst(lc); | ||
colinfo_array[i++] = fd; | ||
} | ||
|
||
const ColumnCompressionInfo **keys; | ||
int n_keys; | ||
int16 *in_column_offsets = compress_chunk_populate_keys(uncompressed_chunk->table_id, | ||
colinfo_array, | ||
htcols_listlen, | ||
&n_keys, | ||
&keys); | ||
|
||
Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock); | ||
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock); | ||
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel); | ||
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel); | ||
|
||
RowCompressor row_compressor; | ||
row_compressor_init(&row_compressor, | ||
uncompressed_rel_tupdesc, | ||
compressed_chunk_rel, | ||
htcols_listlen, | ||
colinfo_array, | ||
in_column_offsets, | ||
compressed_rel_tupdesc->natts, | ||
true /*need_bistate*/, | ||
true /*reset_sequence*/); | ||
|
||
/* | ||
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested | ||
* by recompression later on, both in the case of segmentwise recompression, and | ||
* in the case of decompress-compress. This implicitly locks the index too, so | ||
* it cannot be dropped in another session, which is what we want to prevent by | ||
* locking the compressed chunk here | ||
*/ | ||
table_close(compressed_chunk_rel, NoLock); | ||
table_close(uncompressed_chunk_rel, NoLock); | ||
|
||
row_compressor_finish(&row_compressor); | ||
return row_compressor.index_oid; | ||
} | ||
|
||
extern CompressionStorage | ||
compression_get_toast_storage(CompressionAlgorithms algorithm) | ||
{ | ||
|
@@ -1986,9 +2042,75 @@ create_segment_filter_scankey(RowDecompressor *decompressor, char *segment_filte | |
return num_scankeys; | ||
} | ||
|
||
/* | ||
* Helper method to build scankeys to do lookup into compressed chunk index | ||
*/ | ||
static ScanKeyData * | ||
fetch_index_scankeys(const Relation idxrel, Oid hypertable_relid, TupleTableSlot *uncompressedslot, | ||
int *num_keys) | ||
{ | ||
ScanKeyData *scankey = NULL; | ||
int indnkeyatts; | ||
int total_keys = 0; | ||
int attoff; | ||
bool isnull; | ||
Datum indclassDatum; | ||
oidvector *opclass; | ||
|
||
indclassDatum = | ||
SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass, &isnull); | ||
Assert(!isnull); | ||
opclass = (oidvector *) DatumGetPointer(indclassDatum); | ||
|
||
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(idxrel); | ||
scankey = palloc0(sizeof(ScanKeyData) * indnkeyatts); | ||
|
||
/* Build scankey for every attribute in the index. */ | ||
for (attoff = 0; attoff < indnkeyatts; attoff++) | ||
{ | ||
Oid operator; | ||
Oid opfamily; | ||
RegProcedure regop; | ||
int pkattno = attoff + 1; | ||
Oid optype = get_opclass_input_type(opclass->values[attoff]); | ||
/* | ||
* Load the operator info. We need this to get the equality operator | ||
* function for the scan key. | ||
*/ | ||
opfamily = get_opclass_family(opclass->values[attoff]); | ||
operator= get_opfamily_member(opfamily, optype, optype, BTEqualStrategyNumber); | ||
regop = get_opcode(operator); | ||
|
||
if (attoff < indnkeyatts - 1) | ||
{ | ||
/* Initialize segmentby scankeys. */ | ||
/* get index attribute name */ | ||
Form_pg_attribute attr = TupleDescAttr(idxrel->rd_att, attoff); | ||
AttrNumber attno = get_attnum(hypertable_relid, attr->attname.data); | ||
bool isnull; | ||
Datum value = slot_getattr(uncompressedslot, attno, &isnull); | ||
ScanKeyInit(&scankey[attoff], pkattno, BTEqualStrategyNumber, regop, value); | ||
if (uncompressedslot->tts_isnull[attno - 1]) | ||
scankey[attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL); | ||
scankey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; | ||
total_keys++; | ||
} | ||
} | ||
*num_keys = total_keys; | ||
return scankey; | ||
} | ||
|
||
void | ||
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot) | ||
decompress_batches_for_insert(ChunkInsertState *cis, Chunk *uncompressed_chunk, | ||
TupleTableSlot *slot) | ||
{ | ||
Relation idxrel; | ||
IndexScanDesc index_scan = NULL; | ||
TableScanDesc heapScan; | ||
TupleTableSlot *index_slot = NULL; | ||
Oid index_oid; | ||
ScanKeyData *scankeys = NULL; | ||
int num_scankeys; | ||
Relation out_rel = cis->rel; | ||
|
||
if (!ts_indexing_relation_has_primary_or_unique_index(out_rel)) | ||
|
@@ -2005,82 +2127,120 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo | |
errmsg("inserting into compressed chunk with unique constraints disabled"), | ||
errhint("Set timescaledb.enable_dml_decompression to TRUE."))); | ||
|
||
Chunk *comp = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true); | ||
Relation in_rel = relation_open(comp->table_id, RowExclusiveLock); | ||
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true); | ||
Relation in_rel = relation_open(compressed_chunk->table_id, RowExclusiveLock); | ||
|
||
RowDecompressor decompressor = build_decompressor(in_rel, out_rel); | ||
/* fetch index details from compressed chunk */ | ||
index_oid = get_index_on_compressed_chunk(uncompressed_chunk, compressed_chunk); | ||
|
||
Bitmapset *key_columns = RelationGetIndexAttrBitmap(out_rel, INDEX_ATTR_BITMAP_KEY); | ||
Bitmapset *null_columns = NULL; | ||
|
||
int num_scankeys; | ||
ScanKeyData *scankeys = build_scankeys(chunk->fd.hypertable_id, | ||
chunk->hypertable_relid, | ||
decompressor, | ||
key_columns, | ||
&null_columns, | ||
slot, | ||
&num_scankeys); | ||
|
||
bms_free(key_columns); | ||
|
||
/* | ||
* Using latest snapshot to scan the heap since we are doing this to build | ||
* the index on the uncompressed chunks in order to do speculative insertion | ||
* which is always built from all tuples (even in higher levels of isolation). | ||
* By default there is always an index on compressed chunk which has segmentby | ||
* columns. Use this index to fetch the required compress tuples to check for | ||
* unqiue constraint violation. | ||
*/ | ||
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys); | ||
|
||
for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); | ||
compressed_tuple != NULL; | ||
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) | ||
if (OidIsValid(index_oid)) | ||
{ | ||
Assert(HeapTupleIsValid(compressed_tuple)); | ||
bool valid = true; | ||
|
||
/* | ||
* Since the heap scan API does not support SK_SEARCHNULL we have to check | ||
* for NULL values manually when those are part of the constraints. | ||
*/ | ||
for (int attno = bms_next_member(null_columns, -1); attno >= 0; | ||
attno = bms_next_member(null_columns, attno)) | ||
idxrel = index_open(index_oid, ExclusiveLock); | ||
scankeys = | ||
fetch_index_scankeys(idxrel, uncompressed_chunk->hypertable_relid, slot, &num_scankeys); | ||
index_scan = index_beginscan(in_rel, idxrel, GetLatestSnapshot(), num_scankeys, 0); | ||
index_rescan(index_scan, scankeys, num_scankeys, NULL, 0); | ||
index_slot = table_slot_create(in_rel, NULL); | ||
/* fetch matching compressed tuple based on segmentby column values */ | ||
while (index_getnext_slot(index_scan, ForwardScanDirection, index_slot)) | ||
{ | ||
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc)) | ||
{ | ||
valid = false; | ||
break; | ||
} | ||
HeapTuple compressed_tuple = ExecFetchSlotHeapTuple(index_slot, false, NULL); | ||
heap_deform_tuple(compressed_tuple, | ||
decompressor.in_desc, | ||
decompressor.compressed_datums, | ||
decompressor.compressed_is_nulls); | ||
write_logical_replication_msg_decompression_start(); | ||
row_decompressor_decompress_row(&decompressor, NULL); | ||
write_logical_replication_msg_decompression_end(); | ||
|
||
TM_FailureData tmfd; | ||
/* silence gcc warning for unused variable */ | ||
TM_Result result pg_attribute_unused(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was already present in the old code. Could you please add a comment to explain why it is safe to ignore the result of the operation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems you added also an |
||
result = table_tuple_delete(in_rel, | ||
&compressed_tuple->t_self, | ||
decompressor.mycid, | ||
GetTransactionSnapshot(), | ||
InvalidSnapshot, | ||
true, | ||
&tmfd, | ||
false); | ||
Assert(result == TM_Ok); | ||
} | ||
ExecDropSingleTupleTableSlot(index_slot); | ||
index_endscan(index_scan); | ||
index_close(idxrel, ExclusiveLock); | ||
} | ||
else | ||
{ | ||
scankeys = build_scankeys(uncompressed_chunk->fd.hypertable_id, | ||
uncompressed_chunk->hypertable_relid, | ||
decompressor, | ||
key_columns, | ||
&null_columns, | ||
slot, | ||
&num_scankeys); | ||
bms_free(key_columns); | ||
heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys); | ||
|
||
for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); | ||
compressed_tuple != NULL; | ||
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) | ||
{ | ||
Assert(HeapTupleIsValid(compressed_tuple)); | ||
bool valid = true; | ||
|
||
/* | ||
* Skip if NULL check failed. | ||
*/ | ||
if (!valid) | ||
continue; | ||
|
||
heap_deform_tuple(compressed_tuple, | ||
decompressor.in_desc, | ||
decompressor.compressed_datums, | ||
decompressor.compressed_is_nulls); | ||
|
||
write_logical_replication_msg_decompression_start(); | ||
row_decompressor_decompress_row(&decompressor, NULL); | ||
write_logical_replication_msg_decompression_end(); | ||
/* | ||
* Since the heap scan API does not support SK_SEARCHNULL we have to check | ||
* for NULL values manually when those are part of the constraints. | ||
*/ | ||
for (int attno = bms_next_member(null_columns, -1); attno >= 0; | ||
attno = bms_next_member(null_columns, attno)) | ||
{ | ||
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc)) | ||
{ | ||
valid = false; | ||
break; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a test for this line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tsl/test/sql/compression_conflicts.sql has test which will cover these lines. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems Codecov still does not see any coverage here. Which SQL statement from the test should cover these lines? |
||
} | ||
} | ||
|
||
TM_FailureData tmfd; | ||
TM_Result result pg_attribute_unused(); | ||
result = table_tuple_delete(in_rel, | ||
&compressed_tuple->t_self, | ||
decompressor.mycid, | ||
GetTransactionSnapshot(), | ||
InvalidSnapshot, | ||
true, | ||
&tmfd, | ||
false); | ||
Assert(result == TM_Ok); | ||
/* | ||
* Skip if NULL check failed. | ||
*/ | ||
if (!valid) | ||
continue; | ||
heap_deform_tuple(compressed_tuple, | ||
decompressor.in_desc, | ||
decompressor.compressed_datums, | ||
decompressor.compressed_is_nulls); | ||
write_logical_replication_msg_decompression_start(); | ||
row_decompressor_decompress_row(&decompressor, NULL); | ||
write_logical_replication_msg_decompression_end(); | ||
|
||
TM_FailureData tmfd; | ||
/* silence gcc warning for unused variable */ | ||
TM_Result result pg_attribute_unused(); | ||
result = table_tuple_delete(in_rel, | ||
&compressed_tuple->t_self, | ||
decompressor.mycid, | ||
GetTransactionSnapshot(), | ||
InvalidSnapshot, | ||
true, | ||
&tmfd, | ||
false); | ||
Assert(result == TM_Ok); | ||
} | ||
table_endscan(heapScan); | ||
} | ||
|
||
table_endscan(heapScan); | ||
|
||
ts_catalog_close_indexes(decompressor.indexstate); | ||
FreeBulkInsertState(decompressor.bistate); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended to keep the lock of the
uncompressed_chunk_rel
too? If so, could you add a comment why this is required?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a comment just above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment only covers the
compressed_chunk_rel
. If the same applies to theuncompressed_chunk_rel
it should be added to the comment.