Skip to content

Commit

Permalink
Reduce lock requirements during recompression
Browse files Browse the repository at this point in the history
Recompression used to take heavy Exclusive lock on chunk while it
recompressed data. This blocked a lot of concurrent operations like
inserts, deletes, and updates. By reducing the locking requirement
to ShareUpdateExclusive, we enable more concurrent operations on
the chunk and hypertable while relying on tuple locking. Recompression
can still end up taking an Exclusive lock at the end in order to
change the chunk status but it does this conditionally, meaning
it won't take it unless it can do it immediatelly.
  • Loading branch information
antekresic committed Feb 19, 2025
1 parent 134ddbf commit bd5cba5
Show file tree
Hide file tree
Showing 5 changed files with 2,230 additions and 65 deletions.
4 changes: 2 additions & 2 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1061,8 +1061,8 @@ get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk)
{
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ShareLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ShareLock);
Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, AccessShareLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, AccessShareLock);

CompressionSettings *settings = ts_compression_settings_get(compressed_chunk->table_id);

Expand Down
117 changes: 85 additions & 32 deletions tsl/src/compression/recompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "ts_catalog/compression_settings.h"

static bool fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
Relation uncompressed_chunk_rel);
Relation uncompressed_chunk_rel,
Snapshot snapshot);
static bool delete_tuple_for_recompression(Relation rel, ItemPointer tid, Snapshot snapshot);
static void update_current_segment(CompressedSegmentInfo *current_segment, TupleTableSlot *slot,
int nsegmentby_cols);
static void create_segmentby_scankeys(CompressionSettings *settings, Relation index_rel,
Expand Down Expand Up @@ -112,30 +114,15 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(compressed_chunk->table_id);

/* new status after recompress should simply be compressed (1)
* It is ok to update this early on in the transaction as it keeps a lock
* on the updated tuple in the CHUNK table potentially preventing other transaction
* from updating it
*/
if (ts_chunk_clear_status(uncompressed_chunk,
CHUNK_STATUS_COMPRESSED_UNORDERED | CHUNK_STATUS_COMPRESSED_PARTIAL))
ereport(DEBUG1,
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

ereport(DEBUG1,
(errmsg("acquiring locks for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));
/* lock both chunks, compressed and uncompressed */
/* TODO: Take RowExclusive locks instead of ExclusiveLock
* Taking a weaker lock is possible but in order to use that,
* we have to check row level locking results when modifying tuples
* and make decisions based on them.
*/
Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);
Relation uncompressed_chunk_rel =
table_open(uncompressed_chunk->table_id, ShareUpdateExclusiveLock);
Relation compressed_chunk_rel =
table_open(compressed_chunk->table_id, ShareUpdateExclusiveLock);

/*
* Calculate and add the column dimension ranges for the src chunk used by chunk skipping
Expand Down Expand Up @@ -272,8 +259,9 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
IndexScanDesc index_scan =
index_beginscan(compressed_chunk_rel, index_rel, snapshot, num_segmentby, 0);

bool found_tuple =
fetch_uncompressed_chunk_into_tuplesort(input_tuplesortstate, uncompressed_chunk_rel);
bool found_tuple = fetch_uncompressed_chunk_into_tuplesort(input_tuplesortstate,
uncompressed_chunk_rel,
snapshot);
if (!found_tuple)
goto finish;
tuplesort_performsort(input_tuplesortstate);
Expand Down Expand Up @@ -382,9 +370,12 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
row_decompressor_decompress_row_to_tuplesort(&decompressor,
recompress_tuplesortstate);

simple_table_tuple_delete(compressed_chunk_rel,
&(compressed_slot->tts_tid),
snapshot);
if (!delete_tuple_for_recompression(compressed_chunk_rel,
&(compressed_slot->tts_tid),
snapshot))
elog(ERROR,
"cannot proceed with recompression due to concurrent updates on "
"compressed data");
CommandCounterIncrement();

if (should_free)
Expand Down Expand Up @@ -465,8 +456,6 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
pfree(index_scankeys);
pfree(orderby_scankeys);

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk_id);

/* Need to rebuild indexes if the relation is using hypercore
* TAM. Alternatively, we could insert into indexes when inserting into
Expand All @@ -485,6 +474,46 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
#endif
}

/* If we can quickly upgrade the lock, lets try updating the chunk status to fully
* compressed. But we need to check if there are any uncompressed tuples in the
* relation since somebody might have inserted new tuples while we were recompressing.
*/
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
{
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
hypercore_scan_set_skip_compressed(scan, true);
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ? ForwardScanDirection : BackwardScanDirection;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

/* Doing a backwards scan with assumption that newly inserted tuples
* are most likely at the end of the heap.
*/
bool has_tuples = false;
while (table_scan_getnextslot(scan, scan_dir, slot))
{
has_tuples = true;
break;
}

ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);

if (!has_tuples)
{
if (ts_chunk_clear_status(uncompressed_chunk,
CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL))
ereport(DEBUG1,
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk_id);
}

}

table_close(uncompressed_chunk_rel, NoLock);
table_close(compressed_chunk_rel, NoLock);

Expand Down Expand Up @@ -558,10 +587,9 @@ match_tuple_batch(TupleTableSlot *compressed_slot, int num_orderby, ScanKey orde

static bool
fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
Relation uncompressed_chunk_rel)
Relation uncompressed_chunk_rel, Snapshot snapshot)
{
bool matching_exist = false;
Snapshot snapshot = GetLatestSnapshot();
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. */

Expand All @@ -574,9 +602,10 @@ fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
matching_exist = true;
slot_getallattrs(slot);
tuplesort_puttupleslot(tuplesortstate, slot);
/* simple_table_tuple_delete since we don't expect concurrent
* updates, have exclusive lock on the relation */
simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot);
if (!delete_tuple_for_recompression(uncompressed_chunk_rel, &slot->tts_tid, snapshot))
elog(ERROR,
"cannot proceed with recompression due to concurrent updates on uncompressed "
"data");
}
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);
Expand Down Expand Up @@ -742,3 +771,27 @@ create_orderby_scankeys(CompressionSettings *settings, Relation index_rel,
second_strategy);
}
}

/* Deleting a tuple for recompression if we can.
* If there is an unexpected result, we should just abort the operation completely.
* There are potential optimizations that can be done here in certain scenarios.
*/
static bool
delete_tuple_for_recompression(Relation rel, ItemPointer tid, Snapshot snapshot)
{
TM_Result result;
TM_FailureData tmfd;

result =
table_tuple_delete(rel,
tid,
GetCurrentCommandId(true),
snapshot,
InvalidSnapshot,
true /* for now, just wait for commit/abort, that might let us proceed */
,
&tmfd,
true /* changingPart */);

return result == TM_Ok;
}
Loading

0 comments on commit bd5cba5

Please sign in to comment.