diff --git a/bdb/llmeta.c b/bdb/llmeta.c index 8a006f76f8..4c34daae37 100644 --- a/bdb/llmeta.c +++ b/bdb/llmeta.c @@ -36,6 +36,8 @@ #include #include "debug_switches.h" #include "alias.h" +#include "sc_version.h" + extern int gbl_maxretries; extern int gbl_disable_access_controls; extern int get_csc2_version_tran(const char *table, tran_type *tran); @@ -118,7 +120,7 @@ typedef enum { data = no data does db use authentication? */ , - LLMETA_ACCESSCONTROL_TABLExNODE = 19 /* XXX Deprecated */ + LLMETA_ACCESSCONTROL_TABLExNODE = 19 /* XXX Deprecated */ , LLMETA_SQLITE_STAT1_PREV_DONT_USE = 20 /* store previous sqlite-stat1 records- dont use this. */ @@ -176,7 +178,8 @@ typedef enum { LLMETA_LUA_SFUNC_FLAG = 54, LLMETA_NEWSC_REDO_GENID = 55, /* 55 + TABLENAME + GENID -> MAX-LSN */ LLMETA_SCHEMACHANGE_STATUS_V2 = 56, - LLMETA_SCHEMACHANGE_LIST = 57, /* list of all sc-s in a uuid txh */ + LLMETA_SCHEMACHANGE_STATUS_VERSIONED = 57, + LLMETA_SCHEMACHANGE_LIST = 58, /* list of all sc-s in a uuid txh */ } llmetakey_t; struct llmeta_file_type_key { @@ -11336,30 +11339,42 @@ int bdb_del_view(tran_type *t, const char *view_name) coincide with the first 4 bytes of the rqid (fastseed) stored as the first member in old (7.0's) LLMETA_SCHEMACHANGE_STATUS payload. */ -static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end) +static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end, int *version) { int first = 0; - if (p_buf >= p_buf_end) return -1; + if (p_buf >= p_buf_end) + return -1; - buf_get(&first, sizeof(first), p_buf, p_buf_end); + p_buf = (void *)buf_get(&first, sizeof(first), p_buf, p_buf_end); + if (first == SC_VERSIONED) { + buf_get(version, sizeof(int), p_buf, p_buf_end); + return LLMETA_SCHEMACHANGE_STATUS_VERSIONED; + } if (first > SC_INVALID && first < SC_LAST) { return LLMETA_SCHEMACHANGE_STATUS_V2; } return LLMETA_SCHEMACHANGE_STATUS; } -void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, - void *p_buf_end) +void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end) { - int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end); + int version = 0; + int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end, &version); switch (sc_key_type) { + case LLMETA_SCHEMACHANGE_STATUS_VERSIONED: + if (version < SC_MIN_VERSION || version > SC_VERSION) { + logmsg(LOGMSG_ERROR, "%s: unknown sc-version %d\n", __func__, version); + return NULL; + } + p_buf = p_buf + sizeof(int) + sizeof(int); + return buf_get_schemachange_versioned(s, (void *)p_buf, (void *)p_buf_end, version); case LLMETA_SCHEMACHANGE_STATUS: return buf_get_schemachange_v1(s, (void *)p_buf, (void *)p_buf_end); case LLMETA_SCHEMACHANGE_STATUS_V2: - return buf_get_schemachange_v2(s, (void *)p_buf, (void *)p_buf_end); + return buf_get_schemachange_versioned(s, (void *)p_buf, (void *)p_buf_end, 2); default: break; } diff --git a/db/config.c b/db/config.c index 0d6d84eb74..166e093e51 100644 --- a/db/config.c +++ b/db/config.c @@ -447,6 +447,8 @@ static char *legacy_options[] = { "setattr max_sql_idle_time 864000", "retrieve_gen_from_ckp 0", "recovery_ckp 0", + "sc_versioned 0", + "sc_current_version 2", }; // clang-format on diff --git a/db/db_tunables.c b/db/db_tunables.c index 970fc7d480..2ea1d34297 100644 --- a/db/db_tunables.c +++ b/db/db_tunables.c @@ -304,6 +304,8 @@ extern int gbl_memp_dump_cache_threshold; extern int gbl_disable_ckp; extern int gbl_abort_on_illegal_log_put; extern int gbl_sc_close_txn; +extern int gbl_sc_versioned; +extern int gbl_sc_current_version; extern int gbl_master_sends_query_effects; extern int gbl_create_dba_user; extern int gbl_lock_dba_user; diff --git a/db/db_tunables.h b/db/db_tunables.h index 859811a2ee..3251e790cf 100644 --- a/db/db_tunables.h +++ b/db/db_tunables.h @@ -2385,6 +2385,10 @@ REGISTER_TUNABLE("wal_osync", "Open WAL files using the O_SYNC flag (Default: of NULL, NULL, NULL, NULL); REGISTER_TUNABLE("sc_headroom", "Percentage threshold for low headroom calculation. (Default: 10)", TUNABLE_DOUBLE, &gbl_sc_headroom, INTERNAL | SIGNED, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("sc_versioned", "Enable versioned schema changes (Default: on)", TUNABLE_BOOLEAN, &gbl_sc_versioned, 0, + NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("sc_current_version", "Current schema-change version (Default: 2)", TUNABLE_INTEGER, + &gbl_sc_current_version, 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("fdb_incoherence_percentage", "Generate random incoherent errors in remsql", TUNABLE_INTEGER, &gbl_fdb_incoherence_percentage, INTERNAL, NULL, percent_verify, NULL, NULL); REGISTER_TUNABLE("fdb_socket_timeout_ms", "Timeout ms for fdb communications. (Default: 10000)", TUNABLE_INTEGER, diff --git a/db/osqlcomm.c b/db/osqlcomm.c index fda9d17d32..784f6c0dd8 100644 --- a/db/osqlcomm.c +++ b/db/osqlcomm.c @@ -806,19 +806,22 @@ static uint8_t *osqlcomm_schemachange_type_get(struct schema_change_type *sc, return tmp_buf; } +extern int gbl_sc_current_version; + static uint8_t * osqlcomm_schemachange_rpl_type_put(osql_rpl_t *hd, struct schema_change_type *sc, uint8_t *p_buf, uint8_t *p_buf_end) { - size_t sc_len = schemachange_packed_size(sc); + int sc_version = gbl_sc_current_version; + size_t sc_len = schemachange_packed_size(sc, sc_version); if (p_buf_end < p_buf || OSQLCOMM_RPL_TYPE_LEN + sc_len > (p_buf_end - p_buf)) return NULL; p_buf = osqlcomm_rpl_type_put(hd, p_buf, p_buf_end); - p_buf = buf_put_schemachange(sc, p_buf, p_buf_end); + p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version); return p_buf; } @@ -828,14 +831,15 @@ osqlcomm_schemachange_uuid_rpl_type_put(osql_uuid_rpl_t *hd, struct schema_change_type *sc, uint8_t *p_buf, uint8_t *p_buf_end) { - size_t sc_len = schemachange_packed_size(sc); + int sc_version = gbl_sc_current_version; + size_t sc_len = schemachange_packed_size(sc, sc_version); if (p_buf_end < p_buf || OSQLCOMM_UUID_RPL_TYPE_LEN + sc_len > (p_buf_end - p_buf)) return NULL; p_buf = osqlcomm_uuid_rpl_type_put(hd, p_buf, p_buf_end); - p_buf = buf_put_schemachange(sc, p_buf, p_buf_end); + p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version); return p_buf; } @@ -8393,8 +8397,6 @@ netinfo_type *osql_get_netinfo(void) int osql_send_schemachange(osql_target_t *target, unsigned long long rqid, uuid_t uuid, struct schema_change_type *sc, int type) { - - schemachange_packed_size(sc); size_t osql_rpl_size = ((rqid == OSQL_RQID_USE_UUID) ? OSQLCOMM_UUID_RPL_TYPE_LEN : OSQLCOMM_RPL_TYPE_LEN) + diff --git a/db/process_message.c b/db/process_message.c index f3f0e769fc..d9fd9e98c4 100644 --- a/db/process_message.c +++ b/db/process_message.c @@ -2622,6 +2622,14 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) } else if (tokcmp(tok, ltok, "scforceabort") == 0) { logmsg(LOGMSG_USER, "Forcibly resetting schema change flat\n"); wait_for_sc_to_stop("forceabort", __func__, __LINE__); + } else if (tokcmp(tok, ltok, "sc_version_test") == 0) { + int sc_version_test(void); + int rc = sc_version_test(); + if (rc) { + logmsg(LOGMSG_ERROR, "Schema change version test failed\n"); + } else { + logmsg(LOGMSG_USER, "Schema change version test passed\n"); + } } else if (tokcmp(tok, ltok, "get_db_dir") == 0) { logmsg(LOGMSG_USER, "Database Base Directory: %s\n", thedb->basedir); } else if (tokcmp(tok, ltok, "debug") == 0) { diff --git a/schemachange/sc_struct.c b/schemachange/sc_struct.c index 51af9ca502..b61c78034a 100644 --- a/schemachange/sc_struct.c +++ b/schemachange/sc_struct.c @@ -16,6 +16,7 @@ #include "schemachange.h" #include "sc_struct.h" +#include "sc_version.h" #include "logmsg.h" #include "sc_csc2.h" #include "sc_schema.h" @@ -132,7 +133,10 @@ static size_t _partition_packed_size(struct comdb2_partition *p) } } -size_t schemachange_packed_size(struct schema_change_type *s) +int gbl_sc_current_version = SC_VERSION; +int gbl_sc_versioned = 1; + +size_t schemachange_packed_size(struct schema_change_type *s, int sc_version) { s->tablename_len = strlen(s->tablename) + 1; s->fname_len = strlen(s->fname) + 1; @@ -161,6 +165,14 @@ size_t schemachange_packed_size(struct schema_change_type *s) sizeof(s->usedbtablevers) + sizeof(s->qdb_file_ver) + _partition_packed_size(&s->partition); + if (gbl_sc_versioned) { + s->packed_len += 8; + + if (sc_version > 2) { + s->packed_len += sizeof(s->version_test); + } + } + return s->packed_len; } @@ -184,10 +196,25 @@ static void *buf_put_dests(struct schema_change_type *s, void *p_buf, return p_buf; } -void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end) +void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version) { + int versioned = gbl_sc_versioned; + if (p_buf >= p_buf_end) + return NULL; - if (p_buf >= p_buf_end) return NULL; + if (versioned) { + if (version < SC_MIN_VERSION || version > SC_VERSION) { + logmsg(LOGMSG_ERROR, "%s: invalid version, %d\n", __func__, version); + return NULL; + } + int neg1 = -1; + p_buf = buf_put(&neg1, sizeof(neg1), p_buf, p_buf_end); + p_buf = buf_put(&version, sizeof(version), p_buf, p_buf_end); + } + + if (versioned && version > 2) { + p_buf = buf_put(&s->version_test, sizeof(s->version_test), p_buf, p_buf_end); + } p_buf = buf_put(&s->kind, sizeof(s->kind), p_buf, p_buf_end); @@ -317,6 +344,50 @@ void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_bu return p_buf; } +int sc_version_test(void) +{ + int fail_count = 0; + + for (int i = SC_MIN_VERSION - 1; i <= SC_VERSION + 1; i++) { + struct schema_change_type *s = new_schemachange_type(); + s->version_test = i; + int packed_len = schemachange_packed_size(s, i); + uint8_t *buf = malloc(packed_len); + uint8_t *p_buf = buf; + uint8_t *p_buf_end = buf + packed_len; + p_buf = buf_put_schemachange(s, p_buf, p_buf_end, i); + if (!p_buf) { + if (i >= SC_MIN_VERSION && i <= SC_VERSION) { + logmsg(LOGMSG_ERROR, "Failed to pack version %d\n", i); + fail_count++; + } + } else { + struct schema_change_type unpacked = {0}; + p_buf = buf; + p_buf = buf_get_schemachange(&unpacked, p_buf, p_buf_end); + if (i == 2) { + if (unpacked.version_test != -1) { + logmsg(LOGMSG_ERROR, "Failed to unpack version %d\n", i); + fail_count++; + } + } + if (i == 3) { + if (unpacked.version_test != 3) { + logmsg(LOGMSG_ERROR, "Failed to unpack version %d\n", i); + fail_count++; + } + } + } + free(buf); + free(s); + } + if (fail_count) { + logmsg(LOGMSG_ERROR, "%s: failed %d tests\n", __func__, fail_count); + } + + return fail_count ? -1 : 0; +} + static const void *buf_get_dests(struct schema_change_type *s, const void *p_buf, void *p_buf_end) { @@ -557,11 +628,18 @@ void *buf_get_schemachange_v1(struct schema_change_type *s, void *p_buf, return p_buf; } -void *buf_get_schemachange_v2(struct schema_change_type *s, - void *p_buf, void *p_buf_end) +void *buf_get_schemachange_versioned(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version) { - if (p_buf >= p_buf_end) return NULL; + if (p_buf >= p_buf_end) + return NULL; + + /* version_test is just for testing - I will remove when I use it */ + if (version > 2) { + buf_get(&s->version_test, sizeof(s->version_test), p_buf, p_buf_end); + } else { + s->version_test = -1; + } p_buf = (uint8_t *)buf_get(&s->kind, sizeof(s->kind), p_buf, p_buf_end); @@ -752,8 +830,9 @@ int pack_schema_change_type(struct schema_change_type *s, void **packed, size_t *packed_len) { + int sc_version = gbl_sc_current_version; /* compute the length of our buffer */ - *packed_len = schemachange_packed_size(s); + *packed_len = schemachange_packed_size(s, sc_version); /* grab memory for our buffer */ *packed = malloc(*packed_len); @@ -770,7 +849,7 @@ int pack_schema_change_type(struct schema_change_type *s, void **packed, uint8_t *p_buf_end = (p_buf + *packed_len); /* pack all the data */ - p_buf = buf_put_schemachange(s, p_buf, p_buf_end); + p_buf = buf_put_schemachange(s, p_buf, p_buf_end, sc_version); if (p_buf != (uint8_t *)((char *)(*packed)) + *packed_len) { logmsg(LOGMSG_ERROR, @@ -1198,8 +1277,9 @@ int schema_change_headers(struct schema_change_type *s) struct schema_change_type * clone_schemachange_type(struct schema_change_type *sc) { + int sc_version = gbl_sc_current_version; struct schema_change_type *newsc; - size_t sc_len = schemachange_packed_size(sc); + size_t sc_len = schemachange_packed_size(sc, sc_version); uint8_t *p_buf, *p_buf_end, *buf; p_buf = buf = calloc(1, sc_len); @@ -1208,7 +1288,7 @@ clone_schemachange_type(struct schema_change_type *sc) p_buf_end = p_buf + sc_len; - p_buf = buf_put_schemachange(sc, p_buf, p_buf_end); + p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version); if (!p_buf) { free(buf); return NULL; diff --git a/schemachange/sc_version.h b/schemachange/sc_version.h new file mode 100644 index 0000000000..33fddd3bc5 --- /dev/null +++ b/schemachange/sc_version.h @@ -0,0 +1,23 @@ +/* + Copyright 2025 Bloomberg Finance L.P. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#ifndef INCLUDE_SC_VERSION_H +#define INCLUDE_SC_VERSION_H + +#define SC_MIN_VERSION 2 +#define SC_VERSION 3 + +#endif diff --git a/schemachange/schemachange.c b/schemachange/schemachange.c index efce2754bb..ce8d77714d 100644 --- a/schemachange/schemachange.c +++ b/schemachange/schemachange.c @@ -1548,6 +1548,8 @@ const char *schema_change_kind(struct schema_change_type *s) return "UNKNOWN"; } +extern int gbl_sc_current_version; + /* all scs part of the same txn -> in a sc_list object; * pointer fields are malloced */ @@ -1561,10 +1563,10 @@ int sc_list_create(sc_list_t *scl, void *vscs, uuid_t uuid) int sizes[scl->count]; struct schema_change_type *sc; - int i = 0; + int i = 0, sc_version = gbl_sc_current_version; LISTC_FOR_EACH(scs, sc, scs_lnk) { - sizes[i] = schemachange_packed_size(sc); + sizes[i] = schemachange_packed_size(sc, sc_version); scl->ser_scs_len += sizes[i]; i++; } @@ -1587,7 +1589,7 @@ int sc_list_create(sc_list_t *scl, void *vscs, uuid_t uuid) /* save offset */ scl->offsets[i] = offset; /* save schema change */ - p_buf = buf_put_schemachange(sc, p_buf, p_buf_end); + p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version); if (!p_buf) { return -1; } diff --git a/schemachange/schemachange.h b/schemachange/schemachange.h index 54cc2d3ee2..7ea6e07b55 100644 --- a/schemachange/schemachange.h +++ b/schemachange/schemachange.h @@ -90,6 +90,7 @@ struct timepart_view; /* in sync with do_schema_change_if */ enum schema_change_kind { + SC_VERSIONED = -1, SC_INVALID = 0, SC_LEGACY_QUEUE = 1, SC_LEGACY_MORESTRIPE = 2, @@ -286,6 +287,7 @@ struct schema_change_type { unsigned is_osql : 1; unsigned set_running : 1; uint64_t seed; + int version_test; int (*publish)(tran_type *, struct schema_change_type *); void (*unpublish)(struct schema_change_type *); @@ -386,7 +388,7 @@ typedef struct sc_list sc_list_t; */ int sc_list_create(sc_list_t *scl, void *vscs, uuid_t uuid); -size_t schemachange_packed_size(struct schema_change_type *s); +size_t schemachange_packed_size(struct schema_change_type *s, int sc_version); int start_schema_change_tran(struct ireq *, tran_type *tran); int start_schema_change(struct schema_change_type *); int create_queue(struct dbenv *, char *queuename, int avgitem, int pagesize); @@ -423,14 +425,11 @@ void cleanup_strptr(char **schemabuf); void free_schema_change_type(struct schema_change_type *s); -void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, - void *p_buf_end); -void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, - void *p_buf_end); +void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version); +void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end); void *buf_get_schemachange_v1(struct schema_change_type *s, void *p_buf, void *p_buf_end); -void *buf_get_schemachange_v2(struct schema_change_type *s, void *p_buf, - void *p_buf_end); +void *buf_get_schemachange_versioned(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version); /* This belong into sc_util.h */ int check_sc_ok(struct schema_change_type *s); diff --git a/tests/sc_version.test/Makefile b/tests/sc_version.test/Makefile new file mode 100644 index 0000000000..b4c0ac1057 --- /dev/null +++ b/tests/sc_version.test/Makefile @@ -0,0 +1,8 @@ +ifeq ($(TESTSROOTDIR),) + include ../testcase.mk +else + include $(TESTSROOTDIR)/testcase.mk +endif +ifeq ($(TEST_TIMEOUT),) + export TEST_TIMEOUT=5m +endif diff --git a/tests/sc_version.test/lrl.options b/tests/sc_version.test/lrl.options new file mode 100644 index 0000000000..662d13bc84 --- /dev/null +++ b/tests/sc_version.test/lrl.options @@ -0,0 +1,2 @@ +nowatch +logmsg level info diff --git a/tests/sc_version.test/runit b/tests/sc_version.test/runit new file mode 100755 index 0000000000..0e67827f28 --- /dev/null +++ b/tests/sc_version.test/runit @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +bash -n "$0" | exit 1 + +export debug=1 + +[[ "$debug" == "1" ]] && set -x + +x=$($CDB2SQL_EXE --tabs $CDB2_OPTIONS $DBNAME default "exec procedure sys.cmd.send('sc_version_test')") +echo "$x" +echo "$x" | egrep "Schema change version test passed" +if [[ $? -ne 0 ]]; then + echo "Failed sc-version test" + exit 1 +fi +echo "Success!" diff --git a/tests/tunables.test/t00_all_tunables.expected b/tests/tunables.test/t00_all_tunables.expected index 509bfb1455..e594e13d94 100644 --- a/tests/tunables.test/t00_all_tunables.expected +++ b/tests/tunables.test/t00_all_tunables.expected @@ -860,6 +860,7 @@ (name='sc_async', description='Run transactional schema changes asynchronously.', type='BOOLEAN', value='ON', read_only='N') (name='sc_async_maxthreads', description='Max number of threads for asynchronous schema changes.', type='INTEGER', value='5', read_only='N') (name='sc_check_lockwaits_sec', description='Frequency of checking lockwaits during schemachange (in seconds).', type='INTEGER', value='1', read_only='N') +(name='sc_current_version', description='Current schema-change version (Default: 2)', type='INTEGER', value='2', read_only='N') (name='sc_decrease_thrds_on_deadlock', description='Decrease number of schema change threads on deadlock - way to have schema change backoff.', type='BOOLEAN', value='ON', read_only='N') (name='sc_del_unused_files_threshold', description='', type='INTEGER', value='30000', read_only='Y') (name='sc_delay_verify_error', description='', type='INTEGER', value='100', read_only='N') @@ -876,6 +877,7 @@ (name='sc_resume_watchdog_timer', description='sc_resuming_watchdog timer', type='INTEGER', value='60', read_only='N') (name='sc_status_max_rows', description='Max number of rows returned in comdb2_sc_status (Default: 1000)', type='INTEGER', value='1000', read_only='N') (name='sc_use_num_threads', description='Start up to this many threads for parallel rebuilding during schema change. 0 means use one per dtastripe. Setting is capped at dtastripe.', type='INTEGER', value='0', read_only='N') +(name='sc_versioned', description='Enable versioned schema changes (Default: on)', type='BOOLEAN', value='ON', read_only='N') (name='sc_via_ddl_only', description='If set, we don't do checks needed for comdb2sc.', type='BOOLEAN', value='OFF', read_only='N') (name='scatterkeys', description='', type='BOOLEAN', value='OFF', read_only='N') (name='scconvert_finish_delay', description='Delay returning from convert_record when a stripe finishes. This would create a scenario where scgenids are on the right of any new genids to reproduce a vutf8 schema change bug. ', type='BOOLEAN', value='OFF', read_only='N')