Skip to content

Commit

Permalink
Refactor verify_constraints_exist
Browse files Browse the repository at this point in the history
Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Refactor

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>

Tweak impl

Signed-off-by: Morgan Douglas <[email protected]>
  • Loading branch information
morgando committed Jan 16, 2025
1 parent faa452e commit cc513f0
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 84 deletions.
194 changes: 121 additions & 73 deletions db/constraints.c
Original file line number Diff line number Diff line change
Expand Up @@ -2141,97 +2141,145 @@ static inline int key_has_expressions_members(struct schema *key)
return 0;
}

/* Verify that the tables and keys referred to by this table's constraints all
* exist & have the correct column count. If they don't it's a bit of a show
* stopper. */
int verify_constraints_exist(struct dbtable *from_db, struct dbtable *to_db,
struct dbtable *new_db,
struct schema_change_type *s)
/*
* Helper function for `verify_constraint`
*
* Performs the following verifications
* 1. The constrained-on table exists
* 2. The constrained-on table is not a time partition
* 3. The constrained-on table has the constrained-on key
* 4. The constrained-on table has the expected number of columns
*
* If `new_target_db` is set, then verifications 2-4 are performed on
* `new_target_db` instead of the current version of the table.
*
* Logs each failure and returns the number of failed verifications.
*/
static int verify_constraint_rule(constraint_t *ct, int rule_ix,
struct dbtable *source_db,
struct dbtable *new_target_db,
struct schema *fky,
struct schema_change_type *s)
{
int ii, jj;
char keytag[MAXTAGLEN];
struct schema *bky, *fky;
int n_errors = 0;

if (!from_db) {
for (ii = 0; ii < thedb->num_dbs; ii++) {
from_db = get_newer_db(thedb->dbs[ii], new_db);
n_errors += verify_constraints_exist(
from_db, from_db == to_db ? NULL : to_db, new_db, s);
}
return n_errors;
}
struct dbtable * target_db = get_dbtable_by_name(ct->table[rule_ix]);

for (ii = 0; ii < from_db->n_constraints; ii++) {
constraint_t *ct = &from_db->constraints[ii];
if (target_db)
target_db = get_newer_db(target_db, new_target_db);
else if (strcasecmp(ct->table[rule_ix], source_db->tablename) == 0)
target_db = source_db;

if (from_db == new_db) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->lclkeyname);
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->lclkeyname);
}
if (!(fky = find_tag_schema(from_db, keytag))) {
/* Referencing a nonexistent key */
constraint_err(s, from_db, ct, 0, "foreign key not found");
if (!target_db) {
/* Referencing a non-existent table */
constraint_err(s, source_db, ct, rule_ix, "parent table not found");
return ++n_errors;
} else if (target_db->timepartition_name) {
constraint_err(s, source_db, ct, rule_ix, "A foreign key cannot refer to a time partition");
return ++n_errors;
}

char keytag[MAXTAGLEN];
struct schema *bky;
if (target_db == new_target_db) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->keynm[rule_ix]);
if (!(bky = find_tag_schema_by_name(ct->table[rule_ix], keytag))) {
constraint_err(s, source_db, ct, rule_ix, "parent key not found");
n_errors++;
}
if (from_db->ix_expr && key_has_expressions_members(fky) &&
(ct->flags & CT_UPD_CASCADE)) {
constraint_err(s, from_db, ct, 0, "cascading update on expression indexes is not allowed");
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->keynm[rule_ix]);
if (!(bky = find_tag_schema(target_db, keytag))) {
constraint_err(s, source_db, ct, rule_ix, "parent key not found");
n_errors++;
}
for (jj = 0; jj < ct->nrules; jj++) {
struct dbtable *rdb;
}

/* If we have a target table (to_db) only look at rules pointing
* to that table. */
if (to_db && strcasecmp(ct->table[jj], to_db->tablename) != 0)
continue;
if (constraint_key_check(fky, bky)) {
constraint_err(s, source_db, ct, rule_ix, "invalid number of columns");
n_errors++;
}

rdb = get_dbtable_by_name(ct->table[jj]);
if (rdb)
rdb = get_newer_db(rdb, new_db);
else if (strcasecmp(ct->table[jj], from_db->tablename) == 0)
rdb = from_db;
if (!rdb) {
/* Referencing a non-existent table */
constraint_err(s, from_db, ct, jj, "parent table not found");
n_errors++;
continue;
} else {
if (rdb->timepartition_name) {
constraint_err(s, from_db, ct, jj, "A foreign key cannot refer to a time partition");
n_errors++;
continue;
}
}
if (rdb == new_db) {
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->keynm[jj]);
if (!(bky = find_tag_schema_by_name(ct->table[jj], keytag))) {
/* Referencing a nonexistent key */
constraint_err(s, from_db, ct, jj, "parent key not found");
n_errors++;
}
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->keynm[jj]);
if (!(bky = find_tag_schema(rdb, keytag))) {
/* Referencing a nonexistent key */
constraint_err(s, from_db, ct, jj, "parent key not found");
n_errors++;
}
}
return n_errors;
}

if (constraint_key_check(fky, bky)) {
/* Invalid constraint index */
constraint_err(s, from_db, ct, jj, "invalid number of columns");
n_errors++;
}
/*
* Helper function for `verify_constraints_from_dbtable`/`verify_constraints_to_new_dbtable`
*
* Performs the following verifications
* 1. `source_db` has the constrained-on key
* 2. Constraint does not have a cascading update on expression indexes
* 3. Each of the constraint's rules is valid. See `verify_constraint_rule` for
* details on rule verification. (if `new_target_db` is provided,
* only rules referring to `new_target_db` are checked)
*
* Logs each failure and returns the number of failed verifications.
*/
static int verify_constraint(constraint_t * const ct,
struct dbtable *source_db,
struct dbtable *new_target_db,
struct schema_change_type *s)
{
int n_errors = 0;

char keytag[MAXTAGLEN];
if (source_db == new_target_db) {
// Need to do this for the case when there is a self-referencing constraint.
snprintf(keytag, sizeof(keytag), ".NEW.%s", ct->lclkeyname);
} else {
snprintf(keytag, sizeof(keytag), "%s", ct->lclkeyname);
}

struct schema *fky = find_tag_schema(source_db, keytag);
if (!fky) {
constraint_err(s, source_db, ct, 0, "foreign key not found");
n_errors++;
}
if (source_db->ix_expr && key_has_expressions_members(fky) &&
(ct->flags & CT_UPD_CASCADE)) {
constraint_err(s, source_db, ct, 0, "cascading update on expression indexes is not allowed");
n_errors++;
}
for (int jj = 0; jj < ct->nrules; jj++) {
const int constraint_rule_is_eligible_for_verification = !new_target_db
|| strcasecmp(ct->table[jj], new_target_db->tablename) == 0;
if (constraint_rule_is_eligible_for_verification) {
n_errors += verify_constraint_rule(ct, jj, source_db, new_target_db, fky, s);
}
}

return n_errors;
}

int verify_constraints_to_new_dbtable(struct dbtable *new_target_db,
struct schema_change_type *s)
{
int n_errors = 0;
for (int ii = 0; ii < thedb->num_dbs; ii++) {
// Need to call `get_newer_db` for the case when there is a self-referencing constraint.
struct dbtable * const source_db = get_newer_db(thedb->dbs[ii], new_target_db);

for (int ii = 0; ii < source_db->n_constraints; ii++) {
constraint_t * const ct = &source_db->constraints[ii];
n_errors += verify_constraint(ct, source_db, new_target_db, s);
}
}
return n_errors;
}

int verify_constraints_from_dbtable(struct dbtable *source_db,
struct schema_change_type *s)
{
int n_errors = 0;

for (int ii = 0; ii < source_db->n_constraints; ii++) {
constraint_t * const ct = &source_db->constraints[ii];
n_errors += verify_constraint(ct, source_db, NULL, s);
}

return n_errors;
}

/* creates a reverse constraint in the referenced table for each of the db's
* constraint rules, if the referenced table already has the constraint a
* duplicate is not added
Expand Down
4 changes: 2 additions & 2 deletions schemachange/sc_add_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ int add_table_to_environment(char *table, const char *csc2,
newdb->timepartition_name = timepartition_name;

if ((iq == NULL || iq->tranddl <= 1) &&
verify_constraints_exist(newdb, NULL, NULL, s) != 0) {
verify_constraints_from_dbtable(newdb, s) != 0) {
logmsg(LOGMSG_ERROR, "%s: failed to verify constraints\n", __func__);
rc = -1;
goto err;
Expand Down Expand Up @@ -249,7 +249,7 @@ int finalize_add_table(struct ireq *iq, struct schema_change_type *s,
sc_errf(s, "failed to lock comdb2_tables (%s:%d)\n", __func__, __LINE__);
return -1;
}
if (iq && iq->tranddl > 1 && verify_constraints_exist(db, NULL, NULL, s) != 0) {
if (iq && iq->tranddl > 1 && verify_constraints_from_dbtable(db, s) != 0) {
sc_errf(s, "error verifying constraints\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions schemachange/sc_alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
Pthread_mutex_unlock(&csc2_subsystem_mtx);

if ((iq == NULL || iq->tranddl <= 1) &&
verify_constraints_exist(NULL, newdb, newdb, s) != 0) {
verify_constraints_to_new_dbtable(newdb, s) != 0) {
if (local_lock)
unlock_schema_lk();
backout(newdb);
Expand Down Expand Up @@ -920,7 +920,7 @@ int finalize_alter_table(struct ireq *iq, struct schema_change_type *s,
}

if (iq && iq->tranddl > 1 &&
verify_constraints_exist(NULL, newdb, newdb, s) != 0) {
verify_constraints_to_new_dbtable(newdb, s) != 0) {
sc_errf(s, "error verifying constraints\n");
BACKOUT;
}
Expand Down
2 changes: 1 addition & 1 deletion schemachange/sc_logic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,7 @@ int dryrun_int(struct schema_change_type *s, struct dbtable *db, struct dbtable
sbuf2printf(s->sb, ">Forcing blob file rebuild\n");
}

if (verify_constraints_exist(NULL, newdb, newdb, s)) {
if (verify_constraints_to_new_dbtable(newdb, s)) {
return -1;
}

Expand Down
21 changes: 16 additions & 5 deletions schemachange/sc_logic.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,22 @@ int do_setcompr(struct ireq *iq, const char *rec, const char *blob);
int delete_temp_table(struct ireq *iq, struct dbtable *newdb);

int verify_new_temp_sc_db(struct dbtable *p_db, struct dbtable *p_newdb, tran_type *tran);
/****** This function is inside constraints.c and it was used by schemachange.c,
* before it was declared extern in the c files I am putting the definition here
* to move a step towards a proper header file. */
int verify_constraints_exist(struct dbtable *from_db, struct dbtable *to_db,
struct dbtable *new_db, struct schema_change_type *s);

/*
* Verifies constraints that refer to `new_target_db`.
*
* Logs each failure and returns the number of failed verifications.
*/
int verify_constraints_to_new_dbtable(struct dbtable *new_target_db,
struct schema_change_type *s);

/*
* Verifies constraints that originate from `source_db`
*
* Logs each failure and returns the number of failed verifications.
*/
int verify_constraints_from_dbtable(struct dbtable *source_db,
struct schema_change_type *s);

int do_schema_change_tran(sc_arg_t *);
int do_schema_change_tran_thd(sc_arg_t *);
Expand Down
2 changes: 1 addition & 1 deletion schemachange/schemachange.c
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ static int add_table_for_recovery(struct ireq *iq, struct schema_change_type *s)
newdb->instant_schema_change = s->headers && s->instant_sc;
newdb->schema_version = get_csc2_version(newdb->tablename);

if (verify_constraints_exist(NULL, newdb, newdb, s) != 0) {
if (verify_constraints_to_new_dbtable(newdb, s) != 0) {
backout_schemas(newdb->tablename);
abort();
}
Expand Down

0 comments on commit cc513f0

Please sign in to comment.