Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor verify_constraints_exist #4967

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 136 additions & 73 deletions db/constraints.c
Original file line number Diff line number Diff line change
Expand Up @@ -2141,97 +2141,160 @@ static inline int key_has_expressions_members(struct schema *key)
return 0;
}

/* Verify that the tables and keys referred to by this table's constraints all
* exist & have the correct column count. If they don't it's a bit of a show
* stopper. */
int verify_constraints_exist(struct dbtable *from_db, struct dbtable *to_db,
struct dbtable *new_db,
struct schema_change_type *s)
/*
* Helper function for `verify_constraint`
*
* Performs the following verifications
* 1. The constrained-on table exists
* 2. The constrained-on table is not a time partition
* 3. The constrained-on table has the constrained-on key
* 4. The constrained-on table has the expected number of columns
*
* If `updated_target_db` is set, then verifications 2-4 are performed on
* `updated_target_db` instead of the current version of the table.
*
* Logs each failure and returns the number of failed verifications.
*/
static int verify_constraint_rule(constraint_t *ct, int rule_ix,
struct dbtable *source_db,
struct dbtable *updated_target_db,
struct schema *fky,
struct schema_change_type *s)
{
int ii, jj;
char keytag[MAXTAGLEN];
struct schema *bky, *fky;
int n_errors = 0;

if (!from_db) {
for (ii = 0; ii < thedb->num_dbs; ii++) {
from_db = get_newer_db(thedb->dbs[ii], new_db);
n_errors += verify_constraints_exist(
from_db, from_db == to_db ? NULL : to_db, new_db, s);
Copy link
Contributor Author

@morgando morgando Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this line, calling

verify_constraints_exist(/*from_db*/ NULL, /*to_db*/ to_db, /*new_db*/ new_db)

will generate this call under the hood:

verify_constraints_exist(/*from_db*/ to_db, /*to_db*/ NULL, /*new_db*/ new_db).

To make this behavior explicit, I replaced all calls to verify_constraints_exist that have no value for from_db but a value for to_db with calls to verify_constraints_to_and_from_dbtable.

}
return n_errors;
}
struct dbtable * target_db = get_dbtable_by_name(ct->table[rule_ix]);

for (ii = 0; ii < from_db->n_constraints; ii++) {
constraint_t *ct = &from_db->constraints[ii];
if (target_db && updated_target_db)
target_db = get_newer_db(target_db, updated_target_db);
else if (strcasecmp(ct->table[rule_ix], source_db->tablename) == 0)
target_db = source_db;

if (from_db == new_db) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->lclkeyname);
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->lclkeyname);
}
if (!(fky = find_tag_schema(from_db, keytag))) {
/* Referencing a nonexistent key */
constraint_err(s, from_db, ct, 0, "foreign key not found");
if (!target_db) {
/* Referencing a non-existent table */
constraint_err(s, source_db, ct, rule_ix, "parent table not found");
return ++n_errors;
} else if (target_db->timepartition_name) {
constraint_err(s, source_db, ct, rule_ix, "A foreign key cannot refer to a time partition");
return ++n_errors;
}

char keytag[MAXTAGLEN];
struct schema *bky;
if (target_db == updated_target_db) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->keynm[rule_ix]);
if (!(bky = find_tag_schema_by_name(ct->table[rule_ix], keytag))) {
constraint_err(s, source_db, ct, rule_ix, "parent key not found");
n_errors++;
}
if (from_db->ix_expr && key_has_expressions_members(fky) &&
(ct->flags & CT_UPD_CASCADE)) {
constraint_err(s, from_db, ct, 0, "cascading update on expression indexes is not allowed");
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->keynm[rule_ix]);
if (!(bky = find_tag_schema(target_db, keytag))) {
constraint_err(s, source_db, ct, rule_ix, "parent key not found");
n_errors++;
}
for (jj = 0; jj < ct->nrules; jj++) {
struct dbtable *rdb;
}

/* If we have a target table (to_db) only look at rules pointing
* to that table. */
if (to_db && strcasecmp(ct->table[jj], to_db->tablename) != 0)
continue;
if (constraint_key_check(fky, bky)) {
constraint_err(s, source_db, ct, rule_ix, "invalid number of columns");
n_errors++;
}

rdb = get_dbtable_by_name(ct->table[jj]);
if (rdb)
rdb = get_newer_db(rdb, new_db);
else if (strcasecmp(ct->table[jj], from_db->tablename) == 0)
rdb = from_db;
if (!rdb) {
/* Referencing a non-existent table */
constraint_err(s, from_db, ct, jj, "parent table not found");
n_errors++;
continue;
} else {
if (rdb->timepartition_name) {
constraint_err(s, from_db, ct, jj, "A foreign key cannot refer to a time partition");
n_errors++;
continue;
}
}
if (rdb == new_db) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->keynm[jj]);
if (!(bky = find_tag_schema_by_name(ct->table[jj], keytag))) {
/* Referencing a nonexistent key */
constraint_err(s, from_db, ct, jj, "parent key not found");
n_errors++;
}
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->keynm[jj]);
if (!(bky = find_tag_schema(rdb, keytag))) {
/* Referencing a nonexistent key */
constraint_err(s, from_db, ct, jj, "parent key not found");
n_errors++;
}
}
return n_errors;
}

if (constraint_key_check(fky, bky)) {
/* Invalid constraint index */
constraint_err(s, from_db, ct, jj, "invalid number of columns");
n_errors++;
}
/*
* Helper function for `verify_constraints_from_dbtable`/`verify_constraints_to_dbtable`
*
* Performs the following verifications
* 1. `source_db` has the constrained-on key
* 2. Constraint does not have a cascading update on expression indexes
* 3. Each of the constraint's rules is valid. See `verify_constraint_rule` for
* details on rule verification. (if `target_db` is provided,
* only rules referring to `target_db` are checked)
*
* `source_db_is_updated`/`target_db_is_updated` should be 1 if
* `source_db`/`target_db` refers to the new btree for an existing table.
*
* Logs each failure and returns the number of failed verifications.
*/
static int verify_constraint(constraint_t * const ct,
struct dbtable *source_db,
struct dbtable *target_db,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Give source_db and target_db variables clearer names--maybe ct_source_db/ct_target_db?

int source_db_is_updated,
int target_db_is_updated,
struct schema_change_type *s)
{
int n_errors = 0;

char keytag[MAXTAGLEN];
if (source_db_is_updated) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->lclkeyname);
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->lclkeyname);
}

struct schema *fky = find_tag_schema(source_db, keytag);
if (!fky) {
constraint_err(s, source_db, ct, 0, "foreign key not found");
n_errors++;
}
if (source_db->ix_expr && key_has_expressions_members(fky) &&
(ct->flags & CT_UPD_CASCADE)) {
constraint_err(s, source_db, ct, 0, "cascading update on expression indexes is not allowed");
n_errors++;
}
for (int jj = 0; jj < ct->nrules; jj++) {
const int constraint_rule_is_eligible_for_verification = !target_db
|| strcasecmp(ct->table[jj], target_db->tablename) == 0;
if (constraint_rule_is_eligible_for_verification) {
n_errors += verify_constraint_rule(ct, jj, source_db, target_db_is_updated ? target_db : NULL, fky, s);
}
}

return n_errors;
}

int verify_constraints_from_dbtable(struct dbtable *source_db,
int is_updated,
struct schema_change_type *s)
{
int n_errors = 0;

for (int ii = 0; ii < source_db->n_constraints; ii++) {
constraint_t * const ct = &source_db->constraints[ii];
n_errors += verify_constraint(ct, source_db, NULL,
/* src_db_is_updated */ is_updated, /* target_db_is_updated */ 0, s);
}

return n_errors;
}

static int verify_constraints_to_dbtable(struct dbtable *target_db,
int is_updated,
struct schema_change_type *s)
{
int n_errors = 0;
for (int ii = 0; ii < thedb->num_dbs; ii++) {
// Need to call `get_newer_db` for the case when there is a self-referencing constraint.
struct dbtable * const source_db = get_newer_db(thedb->dbs[ii], target_db);

for (int ii = 0; ii < source_db->n_constraints; ii++) {
constraint_t * const ct = &source_db->constraints[ii];
n_errors += verify_constraint(ct, source_db, target_db,
/* source_db_is_updated */ 0, /* target_db_is_updated */ is_updated, s);
}
}
return n_errors;
}

int verify_constraints_to_and_from_dbtable(struct dbtable *new_db, int is_updated, struct schema_change_type *s)
Copy link
Contributor Author

@morgando morgando Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Considering renaming all is_updated variables to undergoing_rebuild/is_rebuild/is_rebuild_needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Rename new_db parameter just db

{
int n_errors = verify_constraints_from_dbtable(new_db, is_updated, s);
n_errors += verify_constraints_to_dbtable(new_db, is_updated, s);
return n_errors;
}

/* creates a reverse constraint in the referenced table for each of the db's
* constraint rules, if the referenced table already has the constraint a
* duplicate is not added
Expand Down
4 changes: 2 additions & 2 deletions schemachange/sc_add_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ int add_table_to_environment(char *table, const char *csc2,
newdb->timepartition_name = timepartition_name;

if ((iq == NULL || iq->tranddl <= 1) &&
verify_constraints_exist(newdb, NULL, NULL, s) != 0) {
verify_constraints_from_dbtable(newdb, /* is_updated */ 0, s) != 0) {
logmsg(LOGMSG_ERROR, "%s: failed to verify constraints\n", __func__);
rc = -1;
goto err;
Expand Down Expand Up @@ -249,7 +249,7 @@ int finalize_add_table(struct ireq *iq, struct schema_change_type *s,
sc_errf(s, "failed to lock comdb2_tables (%s:%d)\n", __func__, __LINE__);
return -1;
}
if (iq && iq->tranddl > 1 && verify_constraints_exist(db, NULL, NULL, s) != 0) {
if (iq && iq->tranddl > 1 && verify_constraints_from_dbtable(db, /* is_updated */ 0, s) != 0) {
sc_errf(s, "error verifying constraints\n");
return -1;
}
Expand Down
6 changes: 4 additions & 2 deletions schemachange/sc_alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
Pthread_mutex_unlock(&csc2_subsystem_mtx);

if ((iq == NULL || iq->tranddl <= 1) &&
verify_constraints_exist(NULL, newdb, newdb, s) != 0) {
verify_constraints_to_and_from_dbtable(newdb,
/* is_updated */ 1, s) != 0) {
if (local_lock)
unlock_schema_lk();
backout(newdb);
Expand Down Expand Up @@ -920,7 +921,8 @@ int finalize_alter_table(struct ireq *iq, struct schema_change_type *s,
}

if (iq && iq->tranddl > 1 &&
verify_constraints_exist(NULL, newdb, newdb, s) != 0) {
verify_constraints_to_and_from_dbtable(newdb,
/* is_updated */ 1, s) != 0) {
sc_errf(s, "error verifying constraints\n");
BACKOUT;
}
Expand Down
3 changes: 2 additions & 1 deletion schemachange/sc_logic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,8 @@ int dryrun_int(struct schema_change_type *s, struct dbtable *db, struct dbtable
sbuf2printf(s->sb, ">Forcing blob file rebuild\n");
}

if (verify_constraints_exist(NULL, newdb, newdb, s)) {
if (verify_constraints_to_and_from_dbtable(newdb,
/* is_updated */ 1, s)) {
return -1;
}

Expand Down
30 changes: 25 additions & 5 deletions schemachange/sc_logic.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,31 @@ int do_setcompr(struct ireq *iq, const char *rec, const char *blob);
int delete_temp_table(struct ireq *iq, struct dbtable *newdb);

int verify_new_temp_sc_db(struct dbtable *p_db, struct dbtable *p_newdb, tran_type *tran);
/****** This function is inside constraints.c and it was used by schemachange.c,
* before it was declared extern in the c files I am putting the definition here
* to move a step towards a proper header file. */
int verify_constraints_exist(struct dbtable *from_db, struct dbtable *to_db,
struct dbtable *new_db, struct schema_change_type *s);


/*
* Verifies constraints that refer to `target_db`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: This is slightly inaccurate.

Every constraint in the database will be validated, but only constraint rules referring to target_db will be validated.

Update comment to clarify

*
* `is_updated` should be 1 if `target_db` refers to the new btree for
* an existing table.
*
* Logs each failure and returns the number of failed verifications.
*/
int verify_constraints_to_and_from_dbtable(struct dbtable *target_db,
int is_updated,
struct schema_change_type *s);

/*
* Verifies constraints that originate from `source_db`
*
* `is_updated` should be 1 if `source_db` refers to the new btree for
* an existing table.
*
* Logs each failure and returns the number of failed verifications.
*/
int verify_constraints_from_dbtable(struct dbtable *source_db,
int is_updated,
struct schema_change_type *s);

int do_schema_change_tran(sc_arg_t *);
int do_schema_change_tran_thd(sc_arg_t *);
Expand Down
3 changes: 2 additions & 1 deletion schemachange/schemachange.c
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,8 @@ static int add_table_for_recovery(struct ireq *iq, struct schema_change_type *s)
newdb->instant_schema_change = s->headers && s->instant_sc;
newdb->schema_version = get_csc2_version(newdb->tablename);

if (verify_constraints_exist(NULL, newdb, newdb, s) != 0) {
if (verify_constraints_to_and_from_dbtable(newdb,
/* is_updated */ 1, s) != 0) {
backout_schemas(newdb->tablename);
abort();
}
Expand Down