From 08050fb9d6170ec43c3cfa18f59355b299f5ea55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Wed, 9 Oct 2024 13:41:35 +0200 Subject: [PATCH] Add support for merging chunks New procedures to `merge_chunks` are introduced that can merge an arbitrary number of chunks if the right conditions apply. Basic checks are done to ensure that the chunks can be merged from a partitioning perspective. Some more advanced cases that are potentially mergeable are not supported at this time (e.g., chunks with non-adjacent partitioning) and merging of compressed chunks. Merging compressed chunks requires additional work, although the same basic rewrite approach should work also on the internal compressed relations. Still, one needs to handle merges of a compressed chunk and a non-compressed chunk, or two compressed chunks with different compression settings, partially compressed chunks, and so forth. This is left for a future enhancement. Currently, the merge defaults to taking an AccessExclusive lock on the merged chunks to prevent deadlocks and concurrent modifications. Weaker locking is supported via an anonymous settings variable, and it is used in tests to illustrate various deadlock scenarios. Alternative locking approaches, including multi-transactional merges, can be considered in the future. The actual merging is done by rewriting all the data from multiple chunks into a (temporary) merged heap using the same approach as that implemented to support VACUUM FULL and CLUSTER. Then this new heap is swapped into one of the original relations while the rest are dropped. This approach is MVCC compliant and implements correct visibility under higher isolation levels, while also cleaning up garbage tuples. --- .unreleased/pr_7433 | 1 + sql/maintenance_utils.sql | 8 + sql/updates/latest-dev.sql | 9 + sql/updates/reverse-dev.sql | 4 + src/chunk.c | 13 + src/chunk.h | 3 +- src/chunk_constraint.c | 9 +- src/chunk_constraint.h | 9 +- src/compat/compat.h | 12 + src/cross_module_fn.c | 2 + src/cross_module_fn.h | 1 + src/dimension.h | 3 +- src/dimension_slice.h | 12 +- src/hypercube.h | 4 +- tsl/src/chunk.c | 902 +++++++++++++++++- tsl/src/chunk.h | 1 + tsl/src/init.c | 1 + tsl/test/expected/merge_chunks.out | 490 ++++++++++ .../expected/merge_chunks_concurrent.out | 611 ++++++++++++ tsl/test/isolation/specs/CMakeLists.txt | 3 +- .../specs/merge_chunks_concurrent.spec | 174 ++++ tsl/test/shared/expected/extension.out | 2 + tsl/test/sql/CMakeLists.txt | 1 + tsl/test/sql/merge_chunks.sql | 247 +++++ 24 files changed, 2494 insertions(+), 28 deletions(-) create mode 100644 .unreleased/pr_7433 create mode 100644 tsl/test/expected/merge_chunks.out create mode 100644 tsl/test/isolation/expected/merge_chunks_concurrent.out create mode 100644 tsl/test/isolation/specs/merge_chunks_concurrent.spec create mode 100644 tsl/test/sql/merge_chunks.sql diff --git a/.unreleased/pr_7433 b/.unreleased/pr_7433 new file mode 100644 index 00000000000..c733d50d429 --- /dev/null +++ b/.unreleased/pr_7433 @@ -0,0 +1 @@ +Implements: #7433 Add support for merging chunks diff --git a/sql/maintenance_utils.sql b/sql/maintenance_utils.sql index 339ea07beeb..7edcbf9e3dc 100644 --- a/sql/maintenance_utils.sql +++ b/sql/maintenance_utils.sql @@ -57,6 +57,14 @@ CREATE OR REPLACE PROCEDURE @extschema@.convert_to_rowstore( if_columnstore BOOLEAN = true ) AS '@MODULE_PATHNAME@', 'ts_decompress_chunk' LANGUAGE C; +CREATE OR REPLACE PROCEDURE @extschema@.merge_chunks( + chunk1 REGCLASS, chunk2 REGCLASS +) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_merge_two_chunks'; + +CREATE OR REPLACE PROCEDURE @extschema@.merge_chunks( + chunks REGCLASS[] +) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_merge_chunks'; + CREATE OR REPLACE FUNCTION _timescaledb_functions.recompress_chunk_segmentwise( uncompressed_chunk REGCLASS, if_compressed BOOLEAN = true diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index 33d290e374a..5863834971c 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -144,3 +144,12 @@ CREATE FUNCTION @extschema@.add_continuous_aggregate_policy( RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_update_placeholder' LANGUAGE C VOLATILE; + +-- Merge chunks +CREATE PROCEDURE @extschema@.merge_chunks( + chunk1 REGCLASS, chunk2 REGCLASS +) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder'; + +CREATE PROCEDURE @extschema@.merge_chunks( + chunks REGCLASS[] +) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder'; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 37f3659dd6f..89c7935ebbf 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -87,3 +87,7 @@ CREATE FUNCTION @extschema@.add_continuous_aggregate_policy( RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add' LANGUAGE C VOLATILE; + +-- Merge chunks +DROP PROCEDURE IF EXISTS @extschema@.merge_chunks(chunk1 REGCLASS, chunk2 REGCLASS); +DROP PROCEDURE IF EXISTS @extschema@.merge_chunks(chunks REGCLASS[]); diff --git a/src/chunk.c b/src/chunk.c index 565cc6f87bd..f2d620548b0 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -5158,3 +5160,14 @@ ts_chunk_drop_osm_chunk(PG_FUNCTION_ARGS) ts_cache_release(hcache); PG_RETURN_BOOL(true); } + +TS_FUNCTION_INFO_V1(ts_merge_two_chunks); + +Datum +ts_merge_two_chunks(PG_FUNCTION_ARGS) +{ + Datum chunks[2] = { PG_GETARG_DATUM(0), PG_GETARG_DATUM(1) }; + ArrayType *chunk_array = + construct_array(chunks, 2, REGCLASSOID, sizeof(Oid), true, TYPALIGN_INT); + return DirectFunctionCall1(ts_cm_functions->merge_chunks, PointerGetDatum(chunk_array)); +} diff --git a/src/chunk.h b/src/chunk.h index aaa93026e14..690fff8ca2c 100644 --- a/src/chunk.h +++ b/src/chunk.h @@ -188,7 +188,8 @@ extern bool ts_chunk_exists_relid(Oid relid); extern TSDLLEXPORT bool ts_chunk_exists_with_compression(int32 hypertable_id); extern void ts_chunk_recreate_all_constraints_for_dimension(Hypertable *ht, int32 dimension_id); extern int ts_chunk_delete_by_hypertable_id(int32 hypertable_id); -extern int ts_chunk_delete_by_name(const char *schema, const char *table, DropBehavior behavior); +extern TSDLLEXPORT int ts_chunk_delete_by_name(const char *schema, const char *table, + DropBehavior behavior); extern bool ts_chunk_set_name(Chunk *chunk, const char *newname); extern bool ts_chunk_set_schema(Chunk *chunk, const char *newschema); extern TSDLLEXPORT List *ts_chunk_get_window(int32 dimension_id, int64 point, int count, diff --git a/src/chunk_constraint.c b/src/chunk_constraint.c index 260ef6a17f7..de0d9e6b5d0 100644 --- a/src/chunk_constraint.c +++ b/src/chunk_constraint.c @@ -272,9 +272,9 @@ ts_chunk_constraints_add_from_tuple(ChunkConstraints *ccs, const TupleInfo *ti) /* * Create a dimensional CHECK constraint for a partitioning dimension. */ -static Constraint * -create_dimension_check_constraint(const Dimension *dim, const DimensionSlice *slice, - const char *name) +Constraint * +ts_chunk_constraint_dimensional_create(const Dimension *dim, const DimensionSlice *slice, + const char *name) { Constraint *constr = NULL; bool isvarlena; @@ -489,7 +489,8 @@ ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk) dim = ts_hyperspace_get_dimension_by_id(ht->space, slice->fd.dimension_id); Assert(dim); - constr = create_dimension_check_constraint(dim, slice, NameStr(cc->fd.constraint_name)); + constr = + ts_chunk_constraint_dimensional_create(dim, slice, NameStr(cc->fd.constraint_name)); /* In some cases, a CHECK constraint is not needed. For instance, * if the range is -INF to +INF. */ diff --git a/src/chunk_constraint.h b/src/chunk_constraint.h index 585e243ffdc..50dcda84ca4 100644 --- a/src/chunk_constraint.h +++ b/src/chunk_constraint.h @@ -42,9 +42,9 @@ extern int ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *sli ChunkScanCtx *ctx, MemoryContext mctx); extern int ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice *slice, List **list, MemoryContext mctx); -extern int ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id, - ChunkConstraints *ccs, - MemoryContext mctx); +extern int TSDLLEXPORT ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id, + ChunkConstraints *ccs, + MemoryContext mctx); extern ChunkConstraint *ts_chunk_constraints_add(ChunkConstraints *ccs, int32 chunk_id, int32 dimension_slice_id, const char *constraint_name, @@ -58,6 +58,9 @@ extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_constraints(ChunkCon extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_check_constraints( ChunkConstraints *ccs, int32 chunk_id, const char chunk_relkind, Oid hypertable_oid); extern TSDLLEXPORT void ts_chunk_constraints_insert_metadata(const ChunkConstraints *ccs); +extern TSDLLEXPORT Constraint *ts_chunk_constraint_dimensional_create(const Dimension *dim, + const DimensionSlice *slice, + const char *name); extern TSDLLEXPORT void ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk); extern void ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk, Oid constraint_oid); diff --git a/src/compat/compat.h b/src/compat/compat.h index a5e66e47efb..6e73f19961b 100644 --- a/src/compat/compat.h +++ b/src/compat/compat.h @@ -959,3 +959,15 @@ RestrictSearchPath(void) constraintId) #endif +#if PG17_LT +/* + * Overflow-aware comparison functions to be used in qsort. Introduced in PG + * 17 and included here for older PG versions. + */ +static inline int +pg_cmp_u32(uint32 a, uint32 b) +{ + return (a > b) - (a < b); +} + +#endif diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 5f1b5a0290e..2985cf32fc2 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -98,6 +98,7 @@ CROSSMODULE_WRAPPER(chunk_create_empty_table); CROSSMODULE_WRAPPER(recompress_chunk_segmentwise); CROSSMODULE_WRAPPER(get_compressed_chunk_index_for_recompression); +CROSSMODULE_WRAPPER(merge_chunks); /* hypercore */ CROSSMODULE_WRAPPER(is_compressed_tid); @@ -407,6 +408,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .recompress_chunk_segmentwise = error_no_default_fn_pg_community, .get_compressed_chunk_index_for_recompression = error_no_default_fn_pg_community, .preprocess_query_tsl = preprocess_query_tsl_default_fn_community, + .merge_chunks = error_no_default_fn_pg_community, }; TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default; diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index da2c520b260..90fb7c6523a 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -163,6 +163,7 @@ typedef struct CrossModuleFunctions PGFunction recompress_chunk_segmentwise; PGFunction get_compressed_chunk_index_for_recompression; void (*preprocess_query_tsl)(Query *parse, int *cursor_opts); + PGFunction merge_chunks; } CrossModuleFunctions; extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions; diff --git a/src/dimension.h b/src/dimension.h index be84046b888..44d9d1a8000 100644 --- a/src/dimension.h +++ b/src/dimension.h @@ -125,7 +125,8 @@ extern Hyperspace *ts_dimension_scan(int32 hypertable_id, Oid main_table_relid, extern DimensionSlice *ts_dimension_calculate_default_slice(const Dimension *dim, int64 value); extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot); extern int ts_dimension_get_slice_ordinal(const Dimension *dim, const DimensionSlice *slice); -extern const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, int32 id); +extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, + int32 id); extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension(const Hyperspace *hs, DimensionType type, Index n); extern TSDLLEXPORT Dimension *ts_hyperspace_get_mutable_dimension(Hyperspace *hs, diff --git a/src/dimension_slice.h b/src/dimension_slice.h index 04122048f79..0ad985aabfa 100644 --- a/src/dimension_slice.h +++ b/src/dimension_slice.h @@ -58,8 +58,8 @@ ts_dimension_slice_scan_range_limit(int32 dimension_id, StrategyNumber start_str int limit, const ScanTupLock *tuplock); extern DimensionVec *ts_dimension_slice_collision_scan_limit(int32 dimension_id, int64 range_start, int64 range_end, int limit); -extern bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice, - const ScanTupLock *tuplock); +extern TSDLLEXPORT bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice, + const ScanTupLock *tuplock); extern DimensionSlice *ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id, const ScanTupLock *tuplock, MemoryContext mctx, @@ -70,18 +70,20 @@ extern DimensionVec *ts_dimension_slice_scan_by_dimension_before_point(int32 dim ScanDirection scandir, MemoryContext mctx); extern int ts_dimension_slice_delete_by_dimension_id(int32 dimension_id, bool delete_constraints); -extern int ts_dimension_slice_delete_by_id(int32 dimension_slice_id, bool delete_constraints); +extern TSDLLEXPORT int ts_dimension_slice_delete_by_id(int32 dimension_slice_id, + bool delete_constraints); extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_create(int dimension_id, int64 range_start, int64 range_end); extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_copy(const DimensionSlice *original); extern TSDLLEXPORT bool ts_dimension_slices_collide(const DimensionSlice *slice1, const DimensionSlice *slice2); -extern bool ts_dimension_slices_equal(const DimensionSlice *slice1, const DimensionSlice *slice2); +extern TSDLLEXPORT bool ts_dimension_slices_equal(const DimensionSlice *slice1, + const DimensionSlice *slice2); extern bool ts_dimension_slice_cut(DimensionSlice *to_cut, const DimensionSlice *other, int64 coord); extern void ts_dimension_slice_free(DimensionSlice *slice); extern int ts_dimension_slice_insert_multi(DimensionSlice **slice, Size num_slices); -extern void ts_dimension_slice_insert(DimensionSlice *slice); +extern TSDLLEXPORT void ts_dimension_slice_insert(DimensionSlice *slice); extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSlice *right); extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord); diff --git a/src/hypercube.h b/src/hypercube.h index 1190c9af04a..63cb3cc9a4f 100644 --- a/src/hypercube.h +++ b/src/hypercube.h @@ -27,7 +27,7 @@ typedef struct Hypercube (sizeof(Hypercube) + (sizeof(DimensionSlice *) * (num_dimensions))) extern TSDLLEXPORT Hypercube *ts_hypercube_alloc(int16 num_dimensions); -extern void ts_hypercube_free(Hypercube *hc); +extern TSDLLEXPORT void ts_hypercube_free(Hypercube *hc); extern TSDLLEXPORT DimensionSlice * ts_hypercube_add_slice_from_range(Hypercube *hc, int32 dimension_id, int64 start, int64 end); @@ -41,6 +41,6 @@ extern Hypercube *ts_hypercube_calculate_from_point(const Hyperspace *hs, const extern bool ts_hypercubes_collide(const Hypercube *cube1, const Hypercube *cube2); extern TSDLLEXPORT const DimensionSlice *ts_hypercube_get_slice_by_dimension_id(const Hypercube *hc, int32 dimension_id); -extern Hypercube *ts_hypercube_copy(const Hypercube *hc); +extern TSDLLEXPORT Hypercube *ts_hypercube_copy(const Hypercube *hc); extern bool ts_hypercube_equal(const Hypercube *hc1, const Hypercube *hc2); extern void ts_hypercube_slice_sort(Hypercube *hc); diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c index d9ebf1444a1..6ad0a406832 100644 --- a/tsl/src/chunk.c +++ b/tsl/src/chunk.c @@ -3,44 +3,65 @@ * Please see the included NOTICE for copyright information and * LICENSE-TIMESCALE for a copy of the license. */ - #include +#include #include +#include +#include +#include +#include +#include #include +#include #include +#include #include +#include +#include +#include +#include +#include #include #include +#include +#include +#include +#include #include #include #include #include #include +#include #include +#include #include #include +#include +#include #include +#include +#include #include #include +#include +#include #include +#include #include #include #include #include #include -#ifdef USE_ASSERT_CHECKING -#endif - -#include -#include "hypercube.h" -#include -#include -#include -#include +#include "annotations.h" +#include "cache.h" #include "chunk.h" -#include "chunk_api.h" #include "debug_point.h" +#include "extension.h" +#include "hypercube.h" +#include "hypertable.h" +#include "hypertable_cache.h" #include "utils.h" /* Data in a frozen chunk cannot be modified. So any operation @@ -186,3 +207,862 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type, boo return num_results; } + +typedef struct RelationMergeInfo +{ + Oid relid; + struct VacuumCutoffs cutoffs; + const Chunk *chunk; + Relation rel; +} RelationMergeInfo; + +typedef enum MergeLockUpgrade +{ + MERGE_LOCK_UPGRADE, + MERGE_LOCK_CONDITIONAL_UPGRADE, + MERGE_LOCK_ACCESS_EXCLUSIVE, +} MergeLockUpgrade; + +static void +compute_rel_vacuum_cutoffs(Relation rel, struct VacuumCutoffs *cutoffs) +{ + VacuumParams params; + + memset(¶ms, 0, sizeof(VacuumParams)); + vacuum_get_cutoffs(rel, ¶ms, cutoffs); + + /* Frozen Id should not go backwards */ + TransactionId relfrozenxid = rel->rd_rel->relfrozenxid; + + if (TransactionIdIsValid(relfrozenxid) && + TransactionIdPrecedes(cutoffs->FreezeLimit, relfrozenxid)) + cutoffs->FreezeLimit = relfrozenxid; + + MultiXactId relminmxid = rel->rd_rel->relminmxid; + + if (MultiXactIdIsValid(relminmxid) && MultiXactIdPrecedes(cutoffs->MultiXactCutoff, relminmxid)) + cutoffs->MultiXactCutoff = relminmxid; +} + +static void +merge_chunks_finish(Oid new_relid, RelationMergeInfo *relinfos, int nrelids, + TransactionId freeze_limit, MultiXactId multi_cutoff, char relpersistence, + MergeLockUpgrade lock_upgrade) +{ + /* + * The relations being merged are currently locked in ExclusiveLock, which + * means other readers can have locks. To delete the relations, we first + * need to upgrade to an exclusive lock. However, this might lead to + * deadlocks so we need to bail out if we cannot get the lock immediately. + */ + for (int i = 0; i < nrelids; i++) + { + Oid relid = relinfos[i].relid; + + switch (lock_upgrade) + { + case MERGE_LOCK_CONDITIONAL_UPGRADE: + if (!ConditionalLockRelationOid(relid, AccessExclusiveLock)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not lock relation \"%s\" for merge", + get_rel_name(relid)))); + break; + case MERGE_LOCK_UPGRADE: + LockRelationOid(relid, AccessExclusiveLock); + break; + case MERGE_LOCK_ACCESS_EXCLUSIVE: + /* We should already hold AccessExclusivelock. Could preventively + * take it or assert the lock is taken, but it would require + * opening the relation again. */ + break; + } + } + + finish_heap_swap(relinfos[0].relid, + new_relid, + false, /* system catalog */ + false /* swap toast by content */, + false, /* check constraints */ + true, /* internal? */ + freeze_limit, + multi_cutoff, + relpersistence); + + /* + * Delete all the merged relations except the first one, since we are + * keeping it for the heap swap. + */ + ObjectAddresses *objects = new_object_addresses(); + + for (int i = 1; i < nrelids; i++) + { + Oid relid = relinfos[i].relid; + ObjectAddress object = { + .classId = RelationRelationId, + .objectId = relid, + }; + + /* Cannot drop if relation is still open */ + Assert(relinfos[i].rel == NULL); + + if (relinfos[i].chunk) + { + const Oid namespaceid = get_rel_namespace(relid); + const char *schemaname = get_namespace_name(namespaceid); + const char *tablename = get_rel_name(relid); + + ts_chunk_delete_by_name(schemaname, tablename, DROP_RESTRICT); + } + + add_exact_object_address(&object, objects); + } + + performMultipleDeletions(objects, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); + free_object_addresses(objects); +} + +static int +cmp_relations(const void *left, const void *right) +{ + const RelationMergeInfo *linfo = ((RelationMergeInfo *) left); + const RelationMergeInfo *rinfo = ((RelationMergeInfo *) right); + + if (linfo->chunk && rinfo->chunk) + { + const Hypercube *lcube = linfo->chunk->cube; + const Hypercube *rcube = rinfo->chunk->cube; + + Assert(lcube->num_slices == rcube->num_slices); + + for (int i = 0; i < lcube->num_slices; i++) + { + const DimensionSlice *lslice = lcube->slices[i]; + const DimensionSlice *rslice = rcube->slices[i]; + + Assert(lslice->fd.dimension_id == rslice->fd.dimension_id); + + /* Compare start of range for the dimension */ + if (lslice->fd.range_start < rslice->fd.range_start) + return -1; + + if (lslice->fd.range_start > rslice->fd.range_start) + return 1; + + /* If start of range is equal, compare by end of range */ + if (lslice->fd.range_end < rslice->fd.range_end) + return -1; + + if (lslice->fd.range_end > rslice->fd.range_end) + return 1; + } + + /* Should only reach here if partitioning is equal across all + * dimensions. Fall back to comparing relids. */ + } + + return pg_cmp_u32(linfo->relid, rinfo->relid); +} + +/* + * Check that the partition boundaries of two chunks align so that a new valid + * hypercube can be formed if the chunks are merged. This check assumes that + * the hypercubes are sorted so that cube2 "follows" cube1. + * + * The algorithm is simple and only allows merging along a single dimension in + * the same merge. For example, these two cases are mergeable: + * + * ' ____ + * ' |__| + * ' |__| + * + * ' _______ + * ' |__|__| + * + * while these cases are not mergeable: + * ' ____ + * ' __|__| + * ' |__| + * + * ' ______ + * ' |____| + * ' |__| + * + * + * The validation can handle merges of many chunks at once if they are + * "naively" aligned and this function is called on chunk hypercubes in + * "partition order": + * + * ' _____________ + * ' |__|__|__|__| + * + * However, the validation currently won't accept merges of multiple + * dimensions at once: + * + * ' _____________ + * ' |__|__|__|__| + * ' |__|__|__|__| + * + * It also cannot handle complicated merges of multi-dimensional partitioning + * schemes like the one below. + * + * ' _________ + * ' |__a____| + * ' |_b_|_c_| + * + * Merging a,b,c, should be possible but the validation currently cannot + * handle such cases. Instead, it is necessary to first merge b,c. Then merge + * a with the result (b,c) in a separate merge. Note that it is not possible + * to merge only a,b or a,c. + * + * A future, more advanced, validation needs to handle corner-cases like the + * one below that has gaps: + * + * ' _____________ + * ' |__|__|__|__| + * ' |____| |___| + * ' + */ +static void +validate_merge_possible(const Hypercube *cube1, const Hypercube *cube2) +{ + int follow_edges = 0; + int equal_edges = 0; + + Assert(cube1->num_slices == cube2->num_slices); + + for (int i = 0; i < cube1->num_slices; i++) + { + const DimensionSlice *slice1 = cube1->slices[i]; + const DimensionSlice *slice2 = cube2->slices[i]; + + if (ts_dimension_slices_equal(slice1, slice2)) + equal_edges++; + + if (slice1->fd.range_end == slice2->fd.range_start) + follow_edges++; + } + + if (follow_edges != 1 || (cube1->num_slices - equal_edges) != 1) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create new chunk partition boundaries"), + errhint("Try merging chunks that have adjacent partitions."))); +} + +static const ChunkConstraint * +get_chunk_constraint_by_slice_id(const ChunkConstraints *ccs, int32 slice_id) +{ + for (int i = 0; i < ccs->num_constraints; i++) + { + const ChunkConstraint *cc = &ccs->constraints[i]; + + if (cc->fd.dimension_slice_id == slice_id) + return cc; + } + + return NULL; +} + +static void +chunk_update_constraints(const Chunk *chunk, const Hypercube *new_cube) +{ + Cache *hcache; + const Hypertable *ht = + ts_hypertable_cache_get_cache_and_entry(chunk->hypertable_relid, CACHE_FLAG_NONE, &hcache); + List *new_constraints = NIL; + + for (int i = 0; i < new_cube->num_slices; i++) + { + const DimensionSlice *old_slice = chunk->cube->slices[i]; + DimensionSlice *new_slice = new_cube->slices[i]; + const ChunkConstraint *cc; + ScanTupLock tuplock = { + .waitpolicy = LockWaitBlock, + .lockmode = LockTupleShare, + }; + + /* The new slice has merged range, but still old ID. Should match with + * the old slice. */ + Assert(old_slice->fd.id == new_slice->fd.id); + + /* If nothing changed in this dimension, move on to the next */ + if (ts_dimension_slices_equal(old_slice, new_slice)) + continue; + + cc = get_chunk_constraint_by_slice_id(chunk->constraints, old_slice->fd.id); + + if (cc) + { + ObjectAddress constrobj = { + .classId = ConstraintRelationId, + .objectId = get_relation_constraint_oid(chunk->table_id, + NameStr(cc->fd.constraint_name), + false), + }; + + performDeletion(&constrobj, DROP_RESTRICT, 0); + + /* Create the new check constraint */ + const Dimension *dim = + ts_hyperspace_get_dimension_by_id(ht->space, old_slice->fd.dimension_id); + Constraint *constr = + ts_chunk_constraint_dimensional_create(dim, + new_slice, + NameStr(cc->fd.constraint_name)); + + /* Constraint could be NULL, e.g., if the merged chunk covers the + * entire range in a space dimension it needs no constraint. */ + if (constr != NULL) + new_constraints = lappend(new_constraints, constr); + } + + /* Check if there's already a slice with the new range. If so, avoid + * inserting a new slice. */ + if (!ts_dimension_slice_scan_for_existing(new_slice, &tuplock)) + { + new_slice->fd.id = -1; + ts_dimension_slice_insert(new_slice); + /* A new Id should be assigned */ + Assert(new_slice->fd.id > 0); + } + + /* Update the chunk constraint to point to the new slice ID */ + ts_chunk_constraint_update_slice_id(chunk->fd.id, old_slice->fd.id, new_slice->fd.id); + + /* Delete the old slice if it is orphaned now */ + if (ts_chunk_constraint_scan_by_dimension_slice_id(old_slice->fd.id, + NULL, + CurrentMemoryContext) == 0) + { + ts_dimension_slice_delete_by_id(old_slice->fd.id, false); + } + } + + /* Add new check constraints, if any */ + if (new_constraints != NIL) + { + /* Adding a constraint should require AccessExclusivelock. It should + * already be taken at this point, but specify it to be sure. */ + Relation rel = table_open(chunk->table_id, AccessExclusiveLock); + AddRelationNewConstraints(rel, + NIL /* List *newColDefaults */, + new_constraints, + false /* allow_merge */, + true /* is_local */, + false /* is_internal */, + NULL /* query string */); + table_close(rel, NoLock); + } + + ts_cache_release(hcache); +} + +static void +merge_cubes(Hypercube *merged_cube, const Hypercube *cube) +{ + /* Merge dimension slices */ + for (int i = 0; i < cube->num_slices; i++) + { + const DimensionSlice *slice = cube->slices[i]; + DimensionSlice *merged_slice = merged_cube->slices[i]; + + Assert(slice->fd.dimension_id == merged_slice->fd.dimension_id); + + if (slice->fd.range_start < merged_slice->fd.range_start) + merged_slice->fd.range_start = slice->fd.range_start; + + if (slice->fd.range_end > merged_slice->fd.range_end) + merged_slice->fd.range_end = slice->fd.range_end; + } +} + +/* + * Get the locking mode for merge chunks. + * + * By default, a merge happens with access exclusive locks taken on chunks in + * order to avoid deadlocks. It is possible to use a weaker exclusive lock by + * setting a session variable, thus allowing reads during merges. However, + * that can easily lead to deadlocks as shown in isolation tests. Therefore, + * use the stricter locking settings by default. + */ +static MergeLockUpgrade +merge_chunks_lock_upgrade_mode(void) +{ + const char *lockupgrade = + GetConfigOption("timescaledb.merge_chunks_lock_upgrade_mode", true, false); + + if (lockupgrade == NULL) + return MERGE_LOCK_ACCESS_EXCLUSIVE; + + if (strcmp("upgrade", lockupgrade) == 0) + return MERGE_LOCK_UPGRADE; + + if (strcmp("conditional", lockupgrade) == 0) + return MERGE_LOCK_CONDITIONAL_UPGRADE; + + return MERGE_LOCK_ACCESS_EXCLUSIVE; +} + +#if (PG_VERSION_NUM >= 170000 && PG_VERSION_NUM <= 170002) +/* + * Workaround for changed behavior in the relation rewrite code that appeared + * in PostgreSQL 17.0, but was fixed in 17.3. + * + * Merge chunks uses the relation rewrite functionality from CLUSTER and + * VACUUM FULL. This works for merge because, when writing into a non-empty + * relation, new pages are appended while the existing pages remain the + * same. In PG17.0, however, that changed so that existing pages in the + * relation were zeroed out. The changed behavior was introduced as part of + * this commit: + * + * https://github.com/postgres/postgres/commit/8af256524893987a3e534c6578dd60edfb782a77 + * + * Fortunately, this was fixed in a follow up commit: + * + * https://github.com/postgres/postgres/commit/9695835538c2c8e9cd0048028b8c85e1bbf5c79c + * + * The fix is part of PG 17.3. Howevever, this still leaves PG 17.0 - 17.2 + * with different behavior. + * + * To make the merge chunks code work for the "broken" versions we make PG + * believe the first rewrite operation is the size of the fully merged + * relation so that we reserve the full space needed and then "append" + * backwards into the zeroed space (see illustration below). By doing this, we + * ensure that no valid data is zeroed out. The downside of this approach is + * that there will be a lot of unnecessary writing of zero pages. Below is an + * example of what the rewrite would look like for merging three relations + * with one page each. When writing the first relation, PG believes the merged + * relation already contains two pages when starting the rewrite. These two + * existing pages will be zeroed. When writing the next relation we tell PG + * that there is only one existing page in the merged relation, and so forth. + * + * _____________ + * |_0_|_0_|_x_| + * _________ + * |_0_|_x_| + * _____ + * |_x_| + * + * Result: + * _____________ + * |_x_|_x_|_x_| + * + */ +static BlockNumber merge_rel_nblocks = 0; +static BlockNumber *blockoff = NULL; +static const TableAmRoutine *old_routine = NULL; +static TableAmRoutine routine = {}; + +/* + * TAM relation size function to make PG believe that the merged relation + * contains as specific amount of existing data. + */ +static uint64 +pq17_workaround_merge_relation_size(Relation rel, ForkNumber forkNumber) +{ + if (forkNumber == MAIN_FORKNUM) + return merge_rel_nblocks * BLCKSZ; + + return old_routine->relation_size(rel, forkNumber); +} + +static inline void +pg17_workaround_init(Relation rel, RelationMergeInfo *relinfos, int nrelids) +{ + routine = *rel->rd_tableam; + routine.relation_size = pq17_workaround_merge_relation_size; + old_routine = rel->rd_tableam; + rel->rd_tableam = &routine; + blockoff = palloc(sizeof(BlockNumber) * nrelids); + uint64 totalblocks = 0; + + for (int i = 0; i < nrelids; i++) + { + blockoff[i] = (BlockNumber) totalblocks; + totalblocks += RelationGetNumberOfBlocks(relinfos[i].rel); + + /* Ensure the offsets don't overflow. For the merge itself, it is + * assumed that the write will fail when writing too many blocks */ + Ensure(totalblocks <= MaxBlockNumber, "max number of blocks exceeded for merge"); + } +} + +static inline void +pg17_workaround_cleanup(Relation rel) +{ + pfree(blockoff); + rel->rd_tableam = old_routine; +} + +static inline RelationMergeInfo * +get_relmergeinfo(RelationMergeInfo *relinfos, int nrelids, int i) +{ + RelationMergeInfo *relinfo = &relinfos[nrelids - i - 1]; + merge_rel_nblocks = blockoff[nrelids - i - 1]; + return relinfo; +} + +#else +#define pg17_workaround_init(rel, relinfos, nrelids) +#define pg17_workaround_cleanup(rel) +#define get_relmergeinfo(relinfos, nrelids, i) &relinfos[i] +#endif + +/* + * Merge N chunk relations into one chunk based on Oids. + * + * The input chunk relations are ordered according to partition ranges and the + * "first" relation in that ordered list will be "kept" to hold the merged + * data. The merged chunk will have its partition ranges updated to cover the + * ranges of all the merged chunks. + * + * The merge happens via a heap rewrite, followed by a heap swap, essentially + * the same approach implemented by CLUSTER and VACUUM FULL, but applied on + * several relations in the same operation (many to one). + * + * + * The heap swap approach handles visibility across all PG isolation levels, + * as implemented by the cluster code. + * + * In the first step, all data from each chunk is written to a temporary heap + * (accounting for vacuum, half-dead/visible, and frozen tuples). In the + * second step, a heap swap is performed on one of the chunks and all metadata + * is rewritten to handle, e.g., new partition ranges. Finally, the old chunks + * are dropped, except for the chunk that received the heap swap. + * + * To be able to merge, the function checks that: + * + * - all relations are tables (not, e.g,, views) + * - all relations use same (or compatible) storage on disk + * - all relations are chunks (and not, e.g., foreign/OSM chunks) + * + * A current limitation is that it is not possible to merge compressed chunks + * since this requires additional functionality, such as: + * + * - Handling merge of compressed and non-compressed chunks + * + * - Merging chunks with different compression settings (e.g., different + * orderby or segmentby) + * + * - Merging partial chunks + * + * - Updating additional metadata of the internal compressed relations + */ +Datum +chunk_merge_chunks(PG_FUNCTION_ARGS) +{ + ArrayType *chunks_array = PG_ARGISNULL(0) ? NULL : PG_GETARG_ARRAYTYPE_P(0); + Datum *relids; + bool *nulls; + int nrelids; + RelationMergeInfo *relinfos; + int32 hypertable_id = INVALID_HYPERTABLE_ID; + Hypercube *merged_cube = NULL; + const Hypercube *prev_cube = NULL; + const MergeLockUpgrade lock_upgrade = merge_chunks_lock_upgrade_mode(); + + PreventCommandIfReadOnly("merge_chunks"); + + if (chunks_array == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("no chunks to merge specified"))); + + deconstruct_array(chunks_array, + REGCLASSOID, + sizeof(Oid), + true, + TYPALIGN_INT, + &relids, + &nulls, + &nrelids); + + if (nrelids < 2) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("must specify at least two chunks to merge"))); + + relinfos = palloc0(sizeof(struct RelationMergeInfo) * nrelids); + + /* Sort relids array in order to find duplicates and lock relations in + * consistent order to avoid deadlocks. It doesn't matter that we don't + * order the nulls array the same since we only care about all relids + * being non-null. */ + qsort(relids, nrelids, sizeof(Datum), oid_cmp); + + /* Step 1: Do sanity checks and then prepare to sort rels in consistent order. */ + for (int i = 0; i < nrelids; i++) + { + Oid relid = DatumGetObjectId(relids[i]); + const Chunk *chunk; + Relation rel; + Oid amoid; + + if (nulls[i] || !OidIsValid(relid)) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid relation"))); + + if (i > 0 && DatumGetObjectId(relids[i]) == DatumGetObjectId(relids[i - 1])) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("duplicate relation \"%s\" in merge", + get_rel_name(DatumGetObjectId(relids[i]))))); + + /* Lock the relation before doing other checks that can lock dependent + * objects (this can otherwise lead to deadlocks with concurrent + * operations). Note that if we take ExclusiveLock here to allow + * readers while we are rewriting/merging the relations, the lock + * needs to be upgraded to an AccessExclusiveLock later. This can also + * lead to deadlocks. + * + * Ideally, we should probably take locks on all dependent objects as + * well, at least on chunk-related objects that will be + * dropped. Otherwise, that might also cause deadlocks later. For + * example, if doing a concurrent DROP TABLE on one of the chunks will + * lead to deadlock because it grabs locks on all dependencies before + * dropping. + * + * However, for now we won't do that because that requires scanning + * pg_depends and concurrent operations will probably fail anyway if + * we remove the objects. We might as well fail with a deadlock. + */ + LOCKMODE lockmode = + (lock_upgrade == MERGE_LOCK_ACCESS_EXCLUSIVE) ? AccessExclusiveLock : ExclusiveLock; + + rel = try_table_open(relid, lockmode); + + /* Check if the table actually exists. If not, it could have been + * deleted in a concurrent merge. */ + if (rel == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation does not exist"), + errdetail("The relation with OID %u might have been removed " + "by a concurrent merge or other operation.", + relid))); + + if (rel->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot merge non-table relations"))); + + /* Only owner is allowed to merge */ + if (!object_ownercheck(RelationRelationId, relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(rel->rd_rel->relkind), + get_rel_name(relid)); + + /* Lock toast table to prevent it from being concurrently vacuumed */ + if (rel->rd_rel->reltoastrelid) + LockRelationOid(rel->rd_rel->reltoastrelid, lockmode); + + /* + * Check for active uses of the relation in the current transaction, + * including open scans and pending AFTER trigger events. + */ + CheckTableNotInUse(rel, "merge_chunks"); + + if (IsSystemRelation(rel)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot merge system catalog relations"))); + + /* + * Find the chunk corresponding to the relation for final checks. Done + * after locking the chunk relation because scanning for the chunk + * will grab locks on other objects, which might otherwise lead to + * deadlocks during concurrent merges instead of more helpful messages + * (like chunk does not exist because it was merged). + */ + chunk = ts_chunk_get_by_relid(relid, false); + + if (NULL == chunk) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can only merge hypertable chunks"))); + + if (chunk->fd.osm_chunk) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot merge OSM chunks"))); + + if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("merging compressed chunks is not yet supported"), + errhint("Decompress the chunks before merging."))); + + if (hypertable_id == INVALID_HYPERTABLE_ID) + hypertable_id = chunk->fd.hypertable_id; + else if (hypertable_id != chunk->fd.hypertable_id) + { + Assert(i > 0); + + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot merge chunks across different hypertables"), + errdetail("Chunk \"%s\" is part of hypertable \"%s\" while chunk \"%s\" is " + "part of hypertable \"%s\"", + get_rel_name(chunk->table_id), + get_rel_name(chunk->hypertable_relid), + get_rel_name(relinfos[i - 1].chunk->table_id), + get_rel_name(relinfos[i - 1].chunk->hypertable_relid)))); + } + + /* + * It might not be possible to merge two chunks with different + * storage, so better safe than sorry for now. + */ + amoid = rel->rd_rel->relam; + + if (amoid != HEAP_TABLE_AM_OID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("access method \"%s\" is not supported for merge", + get_am_name(amoid)))); + + relinfos[i].relid = relid; + relinfos[i].rel = rel; + relinfos[i].chunk = chunk; + } + + /* Sort rels in partition order (in case of chunks). This is necessary to + * validate that a merge is possible. */ + qsort(relinfos, nrelids, sizeof(RelationMergeInfo), cmp_relations); + + /* Step 2: Check alignment/mergeability and create the merged hypercube + * (partition ranges). */ + for (int i = 0; i < nrelids; i++) + { + const Chunk *chunk = relinfos[i].chunk; + + Assert(chunk != NULL); + + if (merged_cube == NULL) + { + merged_cube = ts_hypercube_copy(chunk->cube); + Assert(prev_cube == NULL); + } + else + { + Assert(chunk->cube->num_slices == merged_cube->num_slices); + Assert(prev_cube != NULL); + validate_merge_possible(prev_cube, chunk->cube); + merge_cubes(merged_cube, chunk->cube); + } + + prev_cube = chunk->cube; + compute_rel_vacuum_cutoffs(relinfos[i].rel, &relinfos[i].cutoffs); + } + + /* + * Keep the first of the ordered relations. It will receive a heap + * swap. + */ + Relation result_rel = relinfos[0].rel; + /* These will be our final cutoffs for the merged relation */ + struct VacuumCutoffs *cutoffs = &relinfos[0].cutoffs; + + Oid tablespace = result_rel->rd_rel->reltablespace; + char relpersistence = result_rel->rd_rel->relpersistence; + + /* Create the transient heap that will receive the re-ordered data */ + Oid new_relid = make_new_heap_compat(RelationGetRelid(result_rel), + tablespace, + result_rel->rd_rel->relam, + relpersistence, + ExclusiveLock); + Relation new_rel = table_open(new_relid, AccessExclusiveLock); + double total_num_tuples = 0.0; + + pg17_workaround_init(new_rel, relinfos, nrelids); + + /* Step 3: write the data from all the rels into a new merged heap */ + for (int i = 0; i < nrelids; i++) + { + RelationMergeInfo *relinfo = get_relmergeinfo(relinfos, nrelids, i); + struct VacuumCutoffs *cutoffs_i = &relinfo->cutoffs; + Relation rel = relinfo->rel; + + double num_tuples = 0.0; + double tups_vacuumed = 0.0; + double tups_recently_dead = 0.0; + + table_relation_copy_for_cluster(rel, + new_rel, + NULL, + false, + cutoffs_i->OldestXmin, + &cutoffs_i->FreezeLimit, + &cutoffs_i->MultiXactCutoff, + &num_tuples, + &tups_vacuumed, + &tups_recently_dead); + + elog(LOG, + "merged rows from \"%s\" into \"%s\": tuples %lf vacuumed %lf recently dead %lf", + RelationGetRelationName(rel), + RelationGetRelationName(result_rel), + num_tuples, + tups_vacuumed, + tups_recently_dead); + + total_num_tuples += num_tuples; + + if (TransactionIdPrecedes(cutoffs->FreezeLimit, cutoffs_i->FreezeLimit)) + cutoffs->FreezeLimit = cutoffs_i->FreezeLimit; + + if (MultiXactIdPrecedes(cutoffs->MultiXactCutoff, cutoffs_i->MultiXactCutoff)) + cutoffs->MultiXactCutoff = cutoffs_i->MultiXactCutoff; + + /* Close the relations before the heap swap, but keep the locks until + * end of transaction. */ + table_close(rel, NoLock); + relinfo->rel = NULL; + } + + pg17_workaround_cleanup(new_rel); + + /* Update table stats */ + Relation relRelation = table_open(RelationRelationId, RowExclusiveLock); + HeapTuple reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(new_relid)); + if (!HeapTupleIsValid(reltup)) + elog(ERROR, "cache lookup failed for relation %u", new_relid); + Form_pg_class relform = (Form_pg_class) GETSTRUCT(reltup); + BlockNumber num_pages = RelationGetNumberOfBlocks(new_rel); + relform->relpages = num_pages; + relform->reltuples = total_num_tuples; + + CatalogTupleUpdate(relRelation, &reltup->t_self, reltup); + heap_freetuple(reltup); + table_close(relRelation, RowExclusiveLock); + CommandCounterIncrement(); + + table_close(new_rel, NoLock); + + DEBUG_WAITPOINT("merge_chunks_before_heap_swap"); + + /* Step 4: Keep one of the original rels but transplant the merged heap + * into it using a heap swap. Then close and delete the remaining merged + * rels. */ + merge_chunks_finish(new_relid, + relinfos, + nrelids, + cutoffs->FreezeLimit, + cutoffs->MultiXactCutoff, + relpersistence, + lock_upgrade); + + /* Step 5: Update the dimensional metadata and constraints for the chunk + * we are keeping. */ + if (merged_cube) + { + Assert(relinfos[0].chunk); + chunk_update_constraints(relinfos[0].chunk, merged_cube); + ts_hypercube_free(merged_cube); + } + + pfree(relids); + pfree(nulls); + pfree(relinfos); + + PG_RETURN_VOID(); +} diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h index 496a296df38..5c8db95b77d 100644 --- a/tsl/src/chunk.h +++ b/tsl/src/chunk.h @@ -13,3 +13,4 @@ extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS); extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS); extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type, bool use_creation_time); +extern Datum chunk_merge_chunks(PG_FUNCTION_ARGS); diff --git a/tsl/src/init.c b/tsl/src/init.c index 50db7ac8c07..594d2277987 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -191,6 +191,7 @@ CrossModuleFunctions tsl_cm_functions = { .get_compressed_chunk_index_for_recompression = tsl_get_compressed_chunk_index_for_recompression, .preprocess_query_tsl = tsl_preprocess_query, + .merge_chunks = chunk_merge_chunks, }; static void diff --git a/tsl/test/expected/merge_chunks.out b/tsl/test/expected/merge_chunks.out new file mode 100644 index 00000000000..77c36dd6123 --- /dev/null +++ b/tsl/test/expected/merge_chunks.out @@ -0,0 +1,490 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE ACCESS METHOD testam TYPE TABLE HANDLER heap_tableam_handler; +set role :ROLE_DEFAULT_PERM_USER; +------------------ +-- Helper views -- +------------------- +create view partitions as +select c.table_name, d.column_name, ds.range_start, ds.range_end +from _timescaledb_catalog.hypertable h +join _timescaledb_catalog.chunk c on (c.hypertable_id = h.id) +join _timescaledb_catalog.dimension d on (d.hypertable_id = h.id) +join _timescaledb_catalog.dimension_slice ds on (d.id = ds.dimension_id) +join _timescaledb_catalog.chunk_constraint cc on (cc.chunk_id = c.id and cc.dimension_slice_id = ds.id) +where h.table_name = 'mergeme' +order by d.id, ds.range_start, ds.range_end; +create view orphaned_slices as +select ds.id, cc.constraint_name from _timescaledb_catalog.dimension_slice ds +left join _timescaledb_catalog.chunk_constraint cc on (ds.id = cc.dimension_slice_id) +where cc.constraint_name is null; +----------------- +-- Setup table -- +----------------- +create table mergeme (time timestamptz not null, device int, temp float); +select create_hypertable('mergeme', 'time', 'device', 3, chunk_time_interval => interval '1 day'); + create_hypertable +---------------------- + (1,public,mergeme,t) +(1 row) + +-- +-- Insert data to create two chunks with same time ranges like this: +-- _______ +-- | | +-- | 1 | +-- |_____| +-- | | +-- | 2 | +-- |_____| +--- +insert into mergeme values ('2024-01-01', 1, 1.0), ('2024-01-01', 2, 2.0); +select "Constraint", "Columns", "Expr" from test.show_constraints('_timescaledb_internal._hyper_1_1_chunk'); + Constraint | Columns | Expr +--------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------ + constraint_1 | {time} | (("time" >= 'Sun Dec 31 16:00:00 2023 PST'::timestamp with time zone) AND ("time" < 'Mon Jan 01 16:00:00 2024 PST'::timestamp with time zone)) + constraint_2 | {device} | (_timescaledb_functions.get_partition_hash(device) < 715827882) +(2 rows) + +-- Show partition layout +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+------------------ + _hyper_1_1_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_2_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_2_chunk | device | 715827882 | 1431655764 +(4 rows) + +-- Now merge chunk 1 and 2: +begin; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +select * from _timescaledb_internal._hyper_1_1_chunk; + time | device | temp +------------------------------+--------+------ + Mon Jan 01 00:00:00 2024 PST | 1 | 1 + Mon Jan 01 00:00:00 2024 PST | 2 | 2 +(2 rows) + +select reltuples from pg_class where oid='_timescaledb_internal._hyper_1_1_chunk'::regclass; + reltuples +----------- + 2 +(1 row) + +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+------------------ + _hyper_1_1_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 1431655764 +(2 rows) + +select "Constraint", "Columns", "Expr" from test.show_constraints('_timescaledb_internal._hyper_1_1_chunk'); + Constraint | Columns | Expr +--------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------ + constraint_1 | {time} | (("time" >= 'Sun Dec 31 16:00:00 2023 PST'::timestamp with time zone) AND ("time" < 'Mon Jan 01 16:00:00 2024 PST'::timestamp with time zone)) + constraint_2 | {device} | (_timescaledb_functions.get_partition_hash(device) < 1431655764) +(2 rows) + +select count(*) as num_orphaned_slices from orphaned_slices; + num_orphaned_slices +--------------------- + 0 +(1 row) + +select * from show_chunks('mergeme'); + show_chunks +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk +(1 row) + +select * from mergeme; + time | device | temp +------------------------------+--------+------ + Mon Jan 01 00:00:00 2024 PST | 1 | 1 + Mon Jan 01 00:00:00 2024 PST | 2 | 2 +(2 rows) + +rollback; +-- create a new chunk as a third space partition +-- _______ +-- | | +-- | 1 | +-- |_____| +-- | | +-- | 2 | +-- |_____| +-- | | +-- | 3 | +-- |_____| +--- +insert into mergeme values ('2024-01-01', 3, 3.0); +-- Test some basic error cases +\set ON_ERROR_STOP 0 +-- Can't merge chunk 1 and 3 +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_3_chunk'); +ERROR: cannot create new chunk partition boundaries +call merge_chunks(NULL); +ERROR: no chunks to merge specified +call merge_chunks(NULL, NULL); +ERROR: invalid relation +call merge_chunks(999999,999991); +ERROR: relation does not exist +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk']); +ERROR: must specify at least two chunks to merge +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', NULL); +ERROR: invalid relation +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', NULL]); +ERROR: invalid relation +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_1_chunk'); +ERROR: duplicate relation "_hyper_1_1_chunk" in merge +-- Check permissions +reset role; +set role :ROLE_1; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +ERROR: must be owner of table _hyper_1_1_chunk +reset role; +set role :ROLE_DEFAULT_PERM_USER; +\set ON_ERROR_STOP 1 +-- Show new partition +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+--------------------- + _hyper_1_1_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_2_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_3_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_2_chunk | device | 715827882 | 1431655764 + _hyper_1_3_chunk | device | 1431655764 | 9223372036854775807 +(6 rows) + +begin; +-- Should be able to merge all three chunks +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk']); +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+--------------------- + _hyper_1_1_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 9223372036854775807 +(2 rows) + +-- Note that no space partition CHECK constraint is added because it +-- now covers the entire range from -inf to +inf. +select "Constraint", "Columns", "Expr" from test.show_constraints('_timescaledb_internal._hyper_1_1_chunk'); + Constraint | Columns | Expr +--------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------ + constraint_1 | {time} | (("time" >= 'Sun Dec 31 16:00:00 2023 PST'::timestamp with time zone) AND ("time" < 'Mon Jan 01 16:00:00 2024 PST'::timestamp with time zone)) +(1 row) + +select count(*) as num_orphaned_slices from orphaned_slices; + num_orphaned_slices +--------------------- + 0 +(1 row) + +select * from show_chunks('mergeme'); + show_chunks +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk +(1 row) + +select * from mergeme; + time | device | temp +------------------------------+--------+------ + Mon Jan 01 00:00:00 2024 PST | 1 | 1 + Mon Jan 01 00:00:00 2024 PST | 2 | 2 + Mon Jan 01 00:00:00 2024 PST | 3 | 3 +(3 rows) + +rollback; +-- create two new chunks, 4 and 5, as follows: +-- _____________ _______ +-- | | | | | +-- | 1 | 4 | | 5 | +-- |_____|_____| |_____| +-- | | +-- | 2 | +-- |_____| +-- | | +-- | 3 | +-- |_____| +--- +insert into mergeme values ('2024-01-02', 1, 4.0), ('2024-01-04', 1, 5.0); +-- Show new partitions +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+--------------------- + _hyper_1_1_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_3_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_2_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_4_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_5_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_5_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_4_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_2_chunk | device | 715827882 | 1431655764 + _hyper_1_3_chunk | device | 1431655764 | 9223372036854775807 +(10 rows) + +\set ON_ERROR_STOP 0 +-- can't merge 3 and 4 +call merge_chunks('_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +ERROR: cannot create new chunk partition boundaries +-- can't merge 1 and 5 +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_5_chunk'); +ERROR: cannot create new chunk partition boundaries +-- can't merge 2 and 4 +call merge_chunks('_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +ERROR: cannot create new chunk partition boundaries +-- can't merge 4 and 5 +call merge_chunks('_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +ERROR: cannot create new chunk partition boundaries +-- currently can't merge 1,2,3,4 due to limitation in how we validate the merge +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_4_chunk', '_timescaledb_internal._hyper_1_1_chunk']); +ERROR: cannot create new chunk partition boundaries +begin; +-- Should be able to merge all three chunks 1,2,3 +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_3_chunk']); +-- But merging the merged 1,2,3 chunk with 4 is currently not +-- possible, although we chould do it in theory +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +ERROR: cannot create new chunk partition boundaries +rollback; +\set ON_ERROR_STOP 1 +alter table mergeme set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device'); +select compress_chunk('_timescaledb_internal._hyper_1_1_chunk'); + compress_chunk +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk +(1 row) + +select compress_chunk('_timescaledb_internal._hyper_1_3_chunk'); + compress_chunk +---------------------------------------- + _timescaledb_internal._hyper_1_3_chunk +(1 row) + +\set ON_ERROR_STOP 0 +-- Currently cannot merge compressed chunks +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +ERROR: merging compressed chunks is not yet supported +call merge_chunks('_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_3_chunk'); +ERROR: merging compressed chunks is not yet supported +\set ON_ERROR_STOP 1 +-- Currently cannot merge chunks using Hypercore TAM +alter table _timescaledb_internal._hyper_1_1_chunk set access method hypercore; +alter table _timescaledb_internal._hyper_1_3_chunk set access method hypercore; +select relname, amname from pg_class cl +join pg_am am on (cl.relam = am.oid) +where cl.oid in ('_timescaledb_internal._hyper_1_1_chunk'::regclass, '_timescaledb_internal._hyper_1_3_chunk'::regclass); + relname | amname +------------------+----------- + _hyper_1_1_chunk | hypercore + _hyper_1_3_chunk | hypercore +(2 rows) + +\set ON_ERROR_STOP 0 +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +ERROR: merging compressed chunks is not yet supported +call merge_chunks('_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_3_chunk'); +ERROR: merging compressed chunks is not yet supported +\set ON_ERROR_STOP 1 +--- +-- Test some error cases when merging chunks with non-chunks or chunks +-- from other hypertables +--- +-- Decompress all chunks to ensure we only have non-compressed chunks +select decompress_chunk(ch) from show_chunks('mergeme') ch; +NOTICE: chunk "_hyper_1_2_chunk" is not compressed +NOTICE: chunk "_hyper_1_4_chunk" is not compressed +NOTICE: chunk "_hyper_1_5_chunk" is not compressed + decompress_chunk +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk + + _timescaledb_internal._hyper_1_3_chunk + + +(5 rows) + +-- Create a non-chunk table +create table mergeme_too(time timestamptz not null, device int, temp float); +select create_hypertable('mergeme_too', 'time', 'device', 3, chunk_time_interval => interval '1 day'); + create_hypertable +-------------------------- + (3,public,mergeme_too,t) +(1 row) + +create table mergeme_regular(time timestamptz not null, device int, temp float); +insert into mergeme_too values ('2024-01-01', 1, 1.0); +insert into mergeme_regular select * from mergeme_too; +create materialized view mergeme_mat as +select * from mergeme_too where device=1; +select * from show_chunks('mergeme_too'); + show_chunks +---------------------------------------- + _timescaledb_internal._hyper_3_8_chunk +(1 row) + +\set ON_ERROR_STOP 0 +-- Merge chunk and regular table +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', 'mergeme_regular'); +ERROR: can only merge hypertable chunks +call merge_chunks('mergeme_regular', '_timescaledb_internal._hyper_1_1_chunk'); +ERROR: can only merge hypertable chunks +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', 'mergeme_mat'); +ERROR: cannot merge non-table relations +-- Merge chunks from different hypertables +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_3_8_chunk'); +ERROR: cannot merge chunks across different hypertables +-- Merge with unsupported access method +alter table _timescaledb_internal._hyper_1_1_chunk set access method testam; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +ERROR: access method "testam" is not supported for merge +alter table _timescaledb_internal._hyper_1_1_chunk set access method heap; +-- Merge OSM chunks +reset role; +update _timescaledb_catalog.chunk ch set osm_chunk = true where table_name = '_hyper_1_1_chunk'; +set role :ROLE_DEFAULT_PERM_USER; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +ERROR: cannot merge OSM chunks +reset role; +update _timescaledb_catalog.chunk ch set osm_chunk = false where table_name = '_hyper_1_1_chunk'; +set role :ROLE_DEFAULT_PERM_USER; +\set ON_ERROR_STOP 1 +-- Set seed to consistently generate same data and same set of chunks +select setseed(0.2); + setseed +--------- + +(1 row) + +-- Test merge with bigger data set and chunks with more blocks +insert into mergeme (time, device, temp) +select t, ceil(random()*10), random()*40 +from generate_series('2024-01-01'::timestamptz, '2024-01-04', '0.5s') t; +-- Show partitions before merge +select * from partitions; + table_name | column_name | range_start | range_end +-------------------+-------------+----------------------+--------------------- + _hyper_1_1_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_3_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_2_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_10_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_9_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_4_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_11_chunk | time | 1704240000000000 | 1704326400000000 + _hyper_1_13_chunk | time | 1704240000000000 | 1704326400000000 + _hyper_1_12_chunk | time | 1704240000000000 | 1704326400000000 + _hyper_1_15_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_5_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_14_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_4_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_5_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_11_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_9_chunk | device | 715827882 | 1431655764 + _hyper_1_2_chunk | device | 715827882 | 1431655764 + _hyper_1_14_chunk | device | 715827882 | 1431655764 + _hyper_1_12_chunk | device | 715827882 | 1431655764 + _hyper_1_15_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_10_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_3_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_13_chunk | device | 1431655764 | 9223372036854775807 +(24 rows) + +-- Merge all chunks until only 1 remains +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; + count | sum | round +--------+---------+--------------- + 518406 | 2854401 | 10373952.7510 +(1 row) + +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk','_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_11_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; + count | sum | round +--------+---------+--------------- + 518406 | 2854401 | 10373952.7510 +(1 row) + +select * from partitions; + table_name | column_name | range_start | range_end +-------------------+-------------+----------------------+--------------------- + _hyper_1_2_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_3_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_1_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_10_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_9_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_12_chunk | time | 1704240000000000 | 1704326400000000 + _hyper_1_13_chunk | time | 1704240000000000 | 1704326400000000 + _hyper_1_15_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_14_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_12_chunk | device | 715827882 | 1431655764 + _hyper_1_14_chunk | device | 715827882 | 1431655764 + _hyper_1_9_chunk | device | 715827882 | 1431655764 + _hyper_1_2_chunk | device | 715827882 | 1431655764 + _hyper_1_3_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_15_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_10_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_13_chunk | device | 1431655764 | 9223372036854775807 +(18 rows) + +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_9_chunk','_timescaledb_internal._hyper_1_12_chunk', '_timescaledb_internal._hyper_1_14_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; + count | sum | round +--------+---------+--------------- + 518406 | 2854401 | 10373952.7510 +(1 row) + +select * from partitions; + table_name | column_name | range_start | range_end +-------------------+-------------+----------------------+--------------------- + _hyper_1_3_chunk | time | 1704067200000000 | 1704153600000000 + _hyper_1_1_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_2_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_10_chunk | time | 1704153600000000 | 1704240000000000 + _hyper_1_13_chunk | time | 1704240000000000 | 1704326400000000 + _hyper_1_15_chunk | time | 1704326400000000 | 1704412800000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_2_chunk | device | 715827882 | 1431655764 + _hyper_1_15_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_13_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_10_chunk | device | 1431655764 | 9223372036854775807 + _hyper_1_3_chunk | device | 1431655764 | 9223372036854775807 +(12 rows) + +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_10_chunk','_timescaledb_internal._hyper_1_13_chunk', '_timescaledb_internal._hyper_1_15_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; + count | sum | round +--------+---------+--------------- + 518406 | 2854401 | 10373952.7510 +(1 row) + +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+--------------------- + _hyper_1_1_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_2_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_3_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 715827882 + _hyper_1_2_chunk | device | 715827882 | 1431655764 + _hyper_1_3_chunk | device | 1431655764 | 9223372036854775807 +(6 rows) + +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk','_timescaledb_internal._hyper_1_2_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; + count | sum | round +--------+---------+--------------- + 518406 | 2854401 | 10373952.7510 +(1 row) + +select * from partitions; + table_name | column_name | range_start | range_end +------------------+-------------+----------------------+--------------------- + _hyper_1_1_chunk | time | 1704067200000000 | 1704412800000000 + _hyper_1_1_chunk | device | -9223372036854775808 | 9223372036854775807 +(2 rows) + diff --git a/tsl/test/isolation/expected/merge_chunks_concurrent.out b/tsl/test/isolation/expected/merge_chunks_concurrent.out new file mode 100644 index 00000000000..7be59718a0e --- /dev/null +++ b/tsl/test/isolation/expected/merge_chunks_concurrent.out @@ -0,0 +1,611 @@ +Parsed test spec with 4 sessions + +starting permutation: s2_show_chunks s3_show_data s1_begin s3_begin s4_modify s2_merge_chunks s1_show_chunks s3_show_chunks s1_show_data s3_show_data s1_commit s1_show_data s3_commit +step s2_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 2 +(1 row) + +step s3_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_begin: + start transaction isolation level repeatable read; + select count(*) > 0 from pg_class; + +?column? +-------- +t +(1 row) + +step s3_begin: + start transaction isolation level read committed; + select count(*) > 0 from pg_class; + +?column? +-------- +t +(1 row) + +step s4_modify: + delete from readings where device=1; + insert into readings values ('2024-01-01 01:05', 5, 5.0); + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + +step s3_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s3_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:05:00 2024 PST| 5| 5 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 0| 1 +(1 row) + +step s1_commit: commit; +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:05:00 2024 PST| 5| 5 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 0| 1 +(1 row) + +step s3_commit: commit; + +starting permutation: s2_show_chunks s1_begin s1_show_data s2_merge_chunks s1_show_data s1_commit s1_show_data s1_show_chunks +step s2_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 2 +(1 row) + +step s1_begin: + start transaction isolation level repeatable read; + select count(*) > 0 from pg_class; + +?column? +-------- +t +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_commit: commit; +step s2_merge_chunks: <... completed> +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + + +starting permutation: s2_set_lock_upgrade s2_show_chunks s1_begin s1_show_data s2_merge_chunks s1_show_data s1_commit s1_show_data s1_show_chunks +step s2_set_lock_upgrade: + set timescaledb.merge_chunks_lock_upgrade_mode='upgrade'; + +step s2_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 2 +(1 row) + +step s1_begin: + start transaction isolation level repeatable read; + select count(*) > 0 from pg_class; + +?column? +-------- +t +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_commit: commit; +step s2_merge_chunks: <... completed> +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + + +starting permutation: s2_set_lock_upgrade s4_wp_enable s2_show_chunks s1_begin s1_show_data s2_merge_chunks s1_show_data s1_row_exclusive_lock s4_wp_release s1_commit s1_show_data s1_show_chunks +step s2_set_lock_upgrade: + set timescaledb.merge_chunks_lock_upgrade_mode='upgrade'; + +step s4_wp_enable: SELECT debug_waitpoint_enable('merge_chunks_before_heap_swap'); +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 2 +(1 row) + +step s1_begin: + start transaction isolation level repeatable read; + select count(*) > 0 from pg_class; + +?column? +-------- +t +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_row_exclusive_lock: call lock_one_chunk('readings'); +step s4_wp_release: SELECT debug_waitpoint_release('merge_chunks_before_heap_swap'); +debug_waitpoint_release +----------------------- + +(1 row) + +step s1_row_exclusive_lock: <... completed> +ERROR: deadlock detected +step s2_merge_chunks: <... completed> +step s1_commit: commit; +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + + +starting permutation: s2_set_lock_upgrade_conditional s4_wp_enable s2_show_chunks s1_begin s1_show_data s2_merge_chunks s1_show_data s1_row_exclusive_lock s4_wp_release s1_commit s1_show_data s1_show_chunks +step s2_set_lock_upgrade_conditional: + set timescaledb.merge_chunks_lock_upgrade_mode='conditional'; + +step s4_wp_enable: SELECT debug_waitpoint_enable('merge_chunks_before_heap_swap'); +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 2 +(1 row) + +step s1_begin: + start transaction isolation level repeatable read; + select count(*) > 0 from pg_class; + +?column? +-------- +t +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_row_exclusive_lock: call lock_one_chunk('readings'); +step s4_wp_release: SELECT debug_waitpoint_release('merge_chunks_before_heap_swap'); +debug_waitpoint_release +----------------------- + +(1 row) + +step s2_merge_chunks: <... completed> +ERROR: could not lock relation "_hyper_X_X_chunk" for merge +step s1_row_exclusive_lock: <... completed> +step s1_commit: commit; +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 2 +(1 row) + + +starting permutation: s4_wp_enable s2_merge_chunks s3_merge_chunks s4_wp_release s1_show_data s1_show_chunks +step s4_wp_enable: SELECT debug_waitpoint_enable('merge_chunks_before_heap_swap'); +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s3_merge_chunks: + call merge_all_chunks('readings'); + +step s4_wp_release: SELECT debug_waitpoint_release('merge_chunks_before_heap_swap'); +debug_waitpoint_release +----------------------- + +(1 row) + +step s2_merge_chunks: <... completed> +step s3_merge_chunks: <... completed> +ERROR: relation does not exist +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + + +starting permutation: s4_wp_enable s2_merge_chunks s3_compress_chunks s4_wp_release s1_show_data s1_show_chunks +step s4_wp_enable: SELECT debug_waitpoint_enable('merge_chunks_before_heap_swap'); +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s3_compress_chunks: + select compress_chunk(show_chunks('readings')); + +step s4_wp_release: SELECT debug_waitpoint_release('merge_chunks_before_heap_swap'); +debug_waitpoint_release +----------------------- + +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +step s1_show_data: <... completed> +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s2_merge_chunks: <... completed> +step s3_compress_chunks: <... completed> +ERROR: deadlock detected +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + + +starting permutation: s4_wp_enable s2_merge_chunks s3_drop_chunks s4_wp_release s1_show_data s1_show_chunks +step s4_wp_enable: SELECT debug_waitpoint_enable('merge_chunks_before_heap_swap'); +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_merge_chunks: + call merge_all_chunks('readings'); + +step s3_drop_chunks: + call drop_one_chunk('readings'); + +step s4_wp_release: SELECT debug_waitpoint_release('merge_chunks_before_heap_swap'); +debug_waitpoint_release +----------------------- + +(1 row) + +step s1_show_data: + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; + +step s1_show_data: <... completed> +time |device|temp +----------------------------+------+---- +Mon Jan 01 02:00:00 2024 PST| 3| 3 +Mon Jan 01 02:00:00 2024 PST| 4| 4 +Mon Jan 01 01:01:00 2024 PST| 2| 2 +Mon Jan 01 01:00:00 2024 PST| 1| 1 +(4 rows) + +num_device_all|num_device_1|num_device_5 +--------------+------------+------------ + 4| 1| 0 +(1 row) + +step s2_merge_chunks: <... completed> +step s3_drop_chunks: <... completed> +ERROR: deadlock detected +step s1_show_chunks: select count(*) from show_chunks('readings'); +count +----- + 1 +(1 row) + diff --git a/tsl/test/isolation/specs/CMakeLists.txt b/tsl/test/isolation/specs/CMakeLists.txt index aa20561638e..629f53094fb 100644 --- a/tsl/test/isolation/specs/CMakeLists.txt +++ b/tsl/test/isolation/specs/CMakeLists.txt @@ -42,7 +42,8 @@ if(CMAKE_BUILD_TYPE MATCHES Debug) compression_freeze.spec compression_merge_race.spec compression_recompress.spec - decompression_chunk_and_parallel_query_wo_idx.spec) + decompression_chunk_and_parallel_query_wo_idx.spec + merge_chunks_concurrent.spec) if(PG_VERSION VERSION_GREATER_EQUAL "14.0") list(APPEND TEST_FILES freeze_chunk.spec compression_dml_iso.spec) endif() diff --git a/tsl/test/isolation/specs/merge_chunks_concurrent.spec b/tsl/test/isolation/specs/merge_chunks_concurrent.spec new file mode 100644 index 00000000000..5e02dbade77 --- /dev/null +++ b/tsl/test/isolation/specs/merge_chunks_concurrent.spec @@ -0,0 +1,174 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +setup +{ + create table readings (time timestamptz, device int, temp float); + select create_hypertable('readings', 'time', chunk_time_interval => interval '1 hour'); + insert into readings values ('2024-01-01 01:00', 1, 1.0), ('2024-01-01 01:01', 2, 2.0), ('2024-01-01 02:00', 3, 3.0), ('2024-01-01 02:00', 4, 4.0); + alter table readings set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device'); + + create or replace procedure merge_all_chunks(hypertable regclass) as $$ + declare + chunks_arr regclass[]; + begin + select array_agg(cl.oid) into chunks_arr + from pg_class cl + join pg_inherits inh + on (cl.oid = inh.inhrelid) + where inh.inhparent = hypertable; + + call merge_chunks(variadic chunks_arr); + end; + $$ LANGUAGE plpgsql; + + create or replace procedure drop_one_chunk(hypertable regclass) as $$ + declare + chunk regclass; + begin + select cl.oid into chunk + from pg_class cl + join pg_inherits inh + on (cl.oid = inh.inhrelid) + where inh.inhparent = hypertable + limit 1; + execute format('drop table %s cascade', chunk); + end; + $$ LANGUAGE plpgsql; + + create or replace procedure lock_one_chunk(hypertable regclass) as $$ + declare + chunk regclass; + begin + select ch into chunk from show_chunks(hypertable) ch offset 1 limit 1; + execute format('lock %s in row exclusive mode', chunk); + end; + $$ LANGUAGE plpgsql; + + reset timescaledb.merge_chunks_lock_upgrade_mode; +} + +teardown { + drop table readings; +} + +session "s1" +setup { + set local lock_timeout = '5000ms'; + set local deadlock_timeout = '10ms'; +} + +# The transaction will not "pick" a snapshot until the first query, so +# do a simple select on pg_class to pick one for the transaction. We +# don't want to query any tables involved in the test since that will +# grab locks on them. +step "s1_begin" { + start transaction isolation level repeatable read; + select count(*) > 0 from pg_class; +} + +step "s1_show_chunks" { select count(*) from show_chunks('readings'); } +step "s1_show_data" { + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; +} +step "s1_row_exclusive_lock" { call lock_one_chunk('readings'); } +step "s1_commit" { commit; } + +session "s2" +setup { + set local lock_timeout = '500ms'; + set local deadlock_timeout = '100ms'; + reset timescaledb.merge_chunks_lock_upgrade_mode; +} + +step "s2_show_chunks" { select count(*) from show_chunks('readings'); } +step "s2_merge_chunks" { + call merge_all_chunks('readings'); +} + +step "s2_set_lock_upgrade" { + set timescaledb.merge_chunks_lock_upgrade_mode='upgrade'; +} +step "s2_set_lock_upgrade_conditional" { + set timescaledb.merge_chunks_lock_upgrade_mode='conditional'; +} + +session "s3" +setup { + set local lock_timeout = '500ms'; + set local deadlock_timeout = '100ms'; +} + +step "s3_begin" { + start transaction isolation level read committed; + select count(*) > 0 from pg_class; +} +step "s3_show_data" { + select * from readings order by time desc, device; + select count(*) as num_device_all, count(*) filter (where device=1) as num_device_1, count(*) filter (where device=5) as num_device_5 from readings; +} +step "s3_show_chunks" { select count(*) from show_chunks('readings'); } +step "s3_merge_chunks" { + call merge_all_chunks('readings'); +} +step "s3_compress_chunks" { + select compress_chunk(show_chunks('readings')); +} +step "s3_drop_chunks" { + call drop_one_chunk('readings'); +} +step "s3_commit" { commit; } + +session "s4" +setup { + set local lock_timeout = '500ms'; + set local deadlock_timeout = '100ms'; +} + +step "s4_modify" { + delete from readings where device=1; + insert into readings values ('2024-01-01 01:05', 5, 5.0); +} + +step "s4_wp_enable" { SELECT debug_waitpoint_enable('merge_chunks_before_heap_swap'); } +step "s4_wp_release" { SELECT debug_waitpoint_release('merge_chunks_before_heap_swap'); } + +# Run 4 backends: +# +# s1: will read data in REPEATABLE READ (should not see changes after merge) +# s2: will merge chunks +# s3: will read data in READ COMMITTED (should see changes immediately after merge) +# s4: will modify data during TX s1 and s3 but before merge +# +# Expectation: s1 should see the original data as it was before s4 +# modifications and merge while s3 should see the changes +permutation "s2_show_chunks" "s3_show_data" "s1_begin" "s3_begin" "s4_modify" "s2_merge_chunks" "s1_show_chunks" "s3_show_chunks" "s1_show_data" "s3_show_data" "s1_commit" "s1_show_data" "s3_commit" + +# Merge chunks with AccessExclusiveLock (default). s2_merge_chunks +# need to wait for readers to finish before even starting merge +permutation "s2_show_chunks" "s1_begin" "s1_show_data" "s2_merge_chunks" "s1_show_data" "s1_commit" "s1_show_data" "s1_show_chunks" + +# Merge chunks with lock upgrade. s2_merge_chunks can merge +# concurrently with readers but need to wait for readers to finish +# before doing the heap swap. +permutation "s2_set_lock_upgrade" "s2_show_chunks" "s1_begin" "s1_show_data" "s2_merge_chunks" "s1_show_data" "s1_commit" "s1_show_data" "s1_show_chunks" + +# Same as the above, but it will deadlock because a reader takes a +# heavier lock. +permutation "s2_set_lock_upgrade" "s4_wp_enable" "s2_show_chunks" "s1_begin" "s1_show_data" "s2_merge_chunks" "s1_show_data" "s1_row_exclusive_lock" "s4_wp_release" "s1_commit" "s1_show_data" "s1_show_chunks" + +# Same as above but with a conditional lock. The merge process should +# fail with an error saying it can't take the lock needed for the +# merge. +permutation "s2_set_lock_upgrade_conditional" "s4_wp_enable" "s2_show_chunks" "s1_begin" "s1_show_data" "s2_merge_chunks" "s1_show_data" "s1_row_exclusive_lock" "s4_wp_release" "s1_commit" "s1_show_data" "s1_show_chunks" + +# Test concurrent merges +permutation "s4_wp_enable" "s2_merge_chunks" "s3_merge_chunks" "s4_wp_release" "s1_show_data" "s1_show_chunks" + +# Test concurrent compress_chunk() +permutation "s4_wp_enable" "s2_merge_chunks" "s3_compress_chunks" "s4_wp_release" "s1_show_data" "s1_show_chunks" + +# Test concurrent drop table +permutation "s4_wp_enable" "s2_merge_chunks" "s3_drop_chunks" "s4_wp_release" "s1_show_data" "s1_show_chunks" diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index fc8c707689d..310d4ba836c 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -256,6 +256,8 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text interpolate(smallint,record,record) last(anyelement,"any") locf(anyelement,anyelement,boolean) + merge_chunks(regclass,regclass) + merge_chunks(regclass[]) move_chunk(regclass,name,name,regclass,boolean) recompress_chunk(regclass,boolean) refresh_continuous_aggregate(regclass,"any","any",boolean) diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 6401bf70e8f..243aab81a00 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -146,6 +146,7 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "15")) list( APPEND TEST_FILES + merge_chunks.sql cagg_refresh_using_merge.sql compress_sort_transform.sql hypercore_columnar.sql diff --git a/tsl/test/sql/merge_chunks.sql b/tsl/test/sql/merge_chunks.sql new file mode 100644 index 00000000000..d073c0b0367 --- /dev/null +++ b/tsl/test/sql/merge_chunks.sql @@ -0,0 +1,247 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + + +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE ACCESS METHOD testam TYPE TABLE HANDLER heap_tableam_handler; +set role :ROLE_DEFAULT_PERM_USER; + +------------------ +-- Helper views -- +------------------- +create view partitions as +select c.table_name, d.column_name, ds.range_start, ds.range_end +from _timescaledb_catalog.hypertable h +join _timescaledb_catalog.chunk c on (c.hypertable_id = h.id) +join _timescaledb_catalog.dimension d on (d.hypertable_id = h.id) +join _timescaledb_catalog.dimension_slice ds on (d.id = ds.dimension_id) +join _timescaledb_catalog.chunk_constraint cc on (cc.chunk_id = c.id and cc.dimension_slice_id = ds.id) +where h.table_name = 'mergeme' +order by d.id, ds.range_start, ds.range_end; + +create view orphaned_slices as +select ds.id, cc.constraint_name from _timescaledb_catalog.dimension_slice ds +left join _timescaledb_catalog.chunk_constraint cc on (ds.id = cc.dimension_slice_id) +where cc.constraint_name is null; + +----------------- +-- Setup table -- +----------------- +create table mergeme (time timestamptz not null, device int, temp float); +select create_hypertable('mergeme', 'time', 'device', 3, chunk_time_interval => interval '1 day'); + +-- +-- Insert data to create two chunks with same time ranges like this: +-- _______ +-- | | +-- | 1 | +-- |_____| +-- | | +-- | 2 | +-- |_____| +--- +insert into mergeme values ('2024-01-01', 1, 1.0), ('2024-01-01', 2, 2.0); + + +select "Constraint", "Columns", "Expr" from test.show_constraints('_timescaledb_internal._hyper_1_1_chunk'); + +-- Show partition layout +select * from partitions; + +-- Now merge chunk 1 and 2: +begin; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +select * from _timescaledb_internal._hyper_1_1_chunk; +select reltuples from pg_class where oid='_timescaledb_internal._hyper_1_1_chunk'::regclass; +select * from partitions; +select "Constraint", "Columns", "Expr" from test.show_constraints('_timescaledb_internal._hyper_1_1_chunk'); +select count(*) as num_orphaned_slices from orphaned_slices; +select * from show_chunks('mergeme'); +select * from mergeme; +rollback; + + +-- create a new chunk as a third space partition +-- _______ +-- | | +-- | 1 | +-- |_____| +-- | | +-- | 2 | +-- |_____| +-- | | +-- | 3 | +-- |_____| +--- + +insert into mergeme values ('2024-01-01', 3, 3.0); + +-- Test some basic error cases +\set ON_ERROR_STOP 0 +-- Can't merge chunk 1 and 3 +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_3_chunk'); +call merge_chunks(NULL); +call merge_chunks(NULL, NULL); +call merge_chunks(999999,999991); +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk']); +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', NULL); +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', NULL]); +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_1_chunk'); + + +-- Check permissions +reset role; +set role :ROLE_1; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +reset role; +set role :ROLE_DEFAULT_PERM_USER; +\set ON_ERROR_STOP 1 + +-- Show new partition +select * from partitions; + +begin; +-- Should be able to merge all three chunks +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk']); +select * from partitions; +-- Note that no space partition CHECK constraint is added because it +-- now covers the entire range from -inf to +inf. +select "Constraint", "Columns", "Expr" from test.show_constraints('_timescaledb_internal._hyper_1_1_chunk'); +select count(*) as num_orphaned_slices from orphaned_slices; +select * from show_chunks('mergeme'); +select * from mergeme; +rollback; + +-- create two new chunks, 4 and 5, as follows: +-- _____________ _______ +-- | | | | | +-- | 1 | 4 | | 5 | +-- |_____|_____| |_____| +-- | | +-- | 2 | +-- |_____| +-- | | +-- | 3 | +-- |_____| +--- +insert into mergeme values ('2024-01-02', 1, 4.0), ('2024-01-04', 1, 5.0); + +-- Show new partitions +select * from partitions; + +\set ON_ERROR_STOP 0 +-- can't merge 3 and 4 +call merge_chunks('_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +-- can't merge 1 and 5 +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_5_chunk'); +-- can't merge 2 and 4 +call merge_chunks('_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +-- can't merge 4 and 5 +call merge_chunks('_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +-- currently can't merge 1,2,3,4 due to limitation in how we validate the merge +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_4_chunk', '_timescaledb_internal._hyper_1_1_chunk']); + +begin; +-- Should be able to merge all three chunks 1,2,3 +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_3_chunk']); +-- But merging the merged 1,2,3 chunk with 4 is currently not +-- possible, although we chould do it in theory +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk'); +rollback; +\set ON_ERROR_STOP 1 + +alter table mergeme set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device'); +select compress_chunk('_timescaledb_internal._hyper_1_1_chunk'); +select compress_chunk('_timescaledb_internal._hyper_1_3_chunk'); + +\set ON_ERROR_STOP 0 +-- Currently cannot merge compressed chunks +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +call merge_chunks('_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_3_chunk'); +\set ON_ERROR_STOP 1 + +-- Currently cannot merge chunks using Hypercore TAM +alter table _timescaledb_internal._hyper_1_1_chunk set access method hypercore; +alter table _timescaledb_internal._hyper_1_3_chunk set access method hypercore; + +select relname, amname from pg_class cl +join pg_am am on (cl.relam = am.oid) +where cl.oid in ('_timescaledb_internal._hyper_1_1_chunk'::regclass, '_timescaledb_internal._hyper_1_3_chunk'::regclass); + +\set ON_ERROR_STOP 0 +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +call merge_chunks('_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_3_chunk'); +\set ON_ERROR_STOP 1 + +--- +-- Test some error cases when merging chunks with non-chunks or chunks +-- from other hypertables +--- +-- Decompress all chunks to ensure we only have non-compressed chunks +select decompress_chunk(ch) from show_chunks('mergeme') ch; + +-- Create a non-chunk table +create table mergeme_too(time timestamptz not null, device int, temp float); +select create_hypertable('mergeme_too', 'time', 'device', 3, chunk_time_interval => interval '1 day'); +create table mergeme_regular(time timestamptz not null, device int, temp float); + +insert into mergeme_too values ('2024-01-01', 1, 1.0); +insert into mergeme_regular select * from mergeme_too; + +create materialized view mergeme_mat as +select * from mergeme_too where device=1; + +select * from show_chunks('mergeme_too'); + + +\set ON_ERROR_STOP 0 +-- Merge chunk and regular table +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', 'mergeme_regular'); +call merge_chunks('mergeme_regular', '_timescaledb_internal._hyper_1_1_chunk'); +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', 'mergeme_mat'); +-- Merge chunks from different hypertables +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_3_8_chunk'); + +-- Merge with unsupported access method +alter table _timescaledb_internal._hyper_1_1_chunk set access method testam; +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +alter table _timescaledb_internal._hyper_1_1_chunk set access method heap; + +-- Merge OSM chunks +reset role; +update _timescaledb_catalog.chunk ch set osm_chunk = true where table_name = '_hyper_1_1_chunk'; +set role :ROLE_DEFAULT_PERM_USER; + +call merge_chunks('_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_2_chunk'); +reset role; +update _timescaledb_catalog.chunk ch set osm_chunk = false where table_name = '_hyper_1_1_chunk'; +set role :ROLE_DEFAULT_PERM_USER; + +\set ON_ERROR_STOP 1 + + +-- Set seed to consistently generate same data and same set of chunks +select setseed(0.2); +-- Test merge with bigger data set and chunks with more blocks +insert into mergeme (time, device, temp) +select t, ceil(random()*10), random()*40 +from generate_series('2024-01-01'::timestamptz, '2024-01-04', '0.5s') t; + +-- Show partitions before merge +select * from partitions; + +-- Merge all chunks until only 1 remains +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_1_chunk', '_timescaledb_internal._hyper_1_4_chunk','_timescaledb_internal._hyper_1_5_chunk', '_timescaledb_internal._hyper_1_11_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; +select * from partitions; +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_2_chunk', '_timescaledb_internal._hyper_1_9_chunk','_timescaledb_internal._hyper_1_12_chunk', '_timescaledb_internal._hyper_1_14_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; +select * from partitions; +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_10_chunk','_timescaledb_internal._hyper_1_13_chunk', '_timescaledb_internal._hyper_1_15_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; +select * from partitions; +call merge_chunks(ARRAY['_timescaledb_internal._hyper_1_3_chunk', '_timescaledb_internal._hyper_1_1_chunk','_timescaledb_internal._hyper_1_2_chunk']); +select count(*), sum(device), round(sum(temp)::numeric, 4) from mergeme; +select * from partitions;