diff --git a/cdb2api/cdb2api.c b/cdb2api/cdb2api.c index 3f1153b51c..6b70000181 100644 --- a/cdb2api/cdb2api.c +++ b/cdb2api/cdb2api.c @@ -492,6 +492,10 @@ static void do_init_once(void) if (min_retries) { MIN_RETRIES = atoi(min_retries); } + char *max_retries = getenv("COMDB2_CONFIG_MAX_RETRIES"); + if (max_retries) { + MAX_RETRIES = atoi(max_retries); + } } /* if sqlstr is a read stmt will return 1 otherwise return 0 diff --git a/db/db_tunables.c b/db/db_tunables.c index 1be1dc3645..eac55f767f 100644 --- a/db/db_tunables.c +++ b/db/db_tunables.c @@ -42,6 +42,7 @@ #define COMPOSITE_TUNABLE_SEP '.' extern int gbl_transactional_drop_plus_rename; +extern int gbl_enable_partitioned_table_merge_resume; extern int gbl_bulk_import_validation_werror; extern int gbl_debug_sleep_during_bulk_import; extern int gbl_waitalive_iterations; diff --git a/db/db_tunables.h b/db/db_tunables.h index 8a1ee9d44d..7397da785d 100644 --- a/db/db_tunables.h +++ b/db/db_tunables.h @@ -28,6 +28,10 @@ REGISTER_TUNABLE("abort_during_downgrade_if_scs_dont_stop", "Abort if scs don't "after starting a downgrade (default OFF)", TUNABLE_BOOLEAN, &gbl_abort_during_downgrade_if_scs_dont_stop, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("partitioned_table_merge_resume", "Toggle the ability to resume " + "partitioned table collapse SCs (default OFF)", TUNABLE_BOOLEAN, + &gbl_enable_partitioned_table_merge_resume, 0, NULL, NULL, + NULL, NULL); REGISTER_TUNABLE("abort_on_in_use_rqid", NULL, TUNABLE_BOOLEAN, &gbl_abort_on_clear_inuse_rqid, READONLY | NOARG, NULL, NULL, NULL, NULL); diff --git a/db/osqlblockproc.c b/db/osqlblockproc.c index 08e1a14a87..02e80a97dc 100644 --- a/db/osqlblockproc.c +++ b/db/osqlblockproc.c @@ -1598,7 +1598,7 @@ int resume_sc_multiddl_txn(sc_list_t *scl) init_fake_ireq(thedb, iq); /* this starts schema changeas; - * the alters have to register themselves inline, + * the alters have to register themselves inline (by incrementing gbl_sc_resume_start), * but the rest of the execution is done in parallel * waiting is done by a separate thread that will finalize * the schema change diff --git a/db/osqlcomm.c b/db/osqlcomm.c index 9472630ed6..6941b78d5c 100644 --- a/db/osqlcomm.c +++ b/db/osqlcomm.c @@ -73,6 +73,7 @@ extern int g_osql_ready; extern int gbl_goslow; extern int gbl_partial_indexes; +int gbl_enable_partitioned_table_merge_resume = 1; int gbl_master_sends_query_effects = 1; int gbl_toblock_random_deadlock_trans; int gbl_toblock_random_verify_error; @@ -6287,13 +6288,13 @@ static int _process_single_table_sc(struct ireq *iq) return rc; } -static int start_schema_change_tran_wrapper_merge(const char *tblname, +static int setup_schema_change_tran_wrapper_merge(const char *tblname, timepart_view_t **pview, timepart_sc_arg_t *arg) { + struct schema_change_type ** shard_scs = (struct schema_change_type **) arg->extra_args; struct schema_change_type *sc = arg->s; struct ireq *iq = sc->iq; - int rc; /* first shard drops partition also */ if (arg->pos & LAST_SHARD) { @@ -6312,23 +6313,56 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname, alter_sc->force_rebuild = 1; /* we are moving rows here */ /* alter only in parallel mode for live */ alter_sc->scanmode = SCAN_PARALLEL; + alter_sc->resume = sc->resume; /* link the sc */ iq->sc = alter_sc; - /** - * if view is provided, this is part of a shard walk; - * release views lock here since sc can take awhile - * - */ - if (arg->lockless) - views_unlock(); + // Release views lock if we're doing shard walk + if (arg->lockless) views_unlock(); + + int rc = setup_schema_change(iq, NULL); + if (rc) { + if (rc != SC_MASTER_DOWNGRADE) { + // ? TODO + + iq->osql_flags |= OSQL_FLAGS_SCDONE; + } + return rc; + } - rc = start_schema_change_tran(iq, NULL); + // sets last genid + rc = setup_merge(iq, iq->sc, NULL); + + if (arg->lockless) { + *pview = timepart_reaquire_view(arg->part_name); + if (!pview) { + logmsg(LOGMSG_ERROR, "%s view %s dropped while processing\n", + __func__, arg->part_name); + return VIEW_ERR_SC; + } + } + + shard_scs[arg->indx] = alter_sc; + return rc; +} + +static int start_schema_change_tran_wrapper_merge(const char *tblname, + timepart_view_t **pview, + timepart_sc_arg_t *arg) +{ + struct schema_change_type ** shard_scs = (struct schema_change_type **) arg->extra_args; + struct ireq *iq = arg->s->iq; + iq->sc = shard_scs[arg->indx]; + + // Release views lock if we're doing a shard walk + if (arg->lockless) views_unlock(); + + int rc = launch_schema_change(iq, NULL); /* link the alter */ iq->sc->sc_next = iq->sc_pending; iq->sc_pending = iq->sc; - iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ + if (iq->sc->nothrevent) { iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ } if (arg->lockless) { *pview = timepart_reaquire_view(arg->part_name); @@ -6389,6 +6423,164 @@ static int _process_single_table_sc_merge(struct ireq *iq) return rc; } +static int setup_first_shard_sc_for_merge(struct ireq *iq, struct dbtable *first_shard) { + + struct schema_change_type *sc = iq->sc; + if (!first_shard->sqlaliasname) { + sc->kind = SC_ADDTABLE; + } else { + if (gbl_enable_partitioned_table_merge_resume) { + sc->partition.type = PARTITION_NONE; + } else { + assert(sc->partition.type == PARTITION_MERGE); + // If partitioned table merge resumes are disabled, + // then we keep the type equal to PARTITION_MERGE. + // There is code later on that blocks the resume if + // the type is PARTITION_MERGE. + } + + strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename)); + } + + int rc = setup_schema_change(iq, NULL); + if (rc) { + if (rc != SC_MASTER_DOWNGRADE) iq->osql_flags |= OSQL_FLAGS_SCDONE; + return ERR_SC; + } + + return 0; +} + +static int setup_later_shard_scs_for_merge(const char *tblname, + timepart_view_t **pview, + timepart_sc_arg_t *arg) +{ + struct schema_change_type ** shard_scs = (struct schema_change_type **) arg->extra_args; + struct schema_change_type *sc = arg->s; + struct ireq *iq = sc->iq; + + /* first shard drops partition also */ + if (arg->pos & LAST_SHARD) { + sc->publish = partition_publish; + sc->unpublish = partition_unpublish; + } + + struct schema_change_type *alter_sc = clone_schemachange_type(sc); + + /* new target */ + strncpy0(alter_sc->tablename, tblname, sizeof(sc->tablename)); + /*alter_sc->usedbtablevers = sc->partition.u.mergetable.version;*/ + alter_sc->kind = SC_ALTERTABLE; + /* use the created file as target */ + alter_sc->newdb = sc->newdb; + alter_sc->force_rebuild = 1; /* we are moving rows here */ + /* alter only in parallel mode for live */ + alter_sc->scanmode = SCAN_PARALLEL; + alter_sc->resume = sc->resume; + /* link the sc */ + iq->sc = alter_sc; + + // Release views lock if we're doing shard walk + if (arg->lockless) views_unlock(); + + int rc = setup_schema_change(iq, NULL); + if (rc) { + if (rc != SC_MASTER_DOWNGRADE) { + // ? TODO + + iq->osql_flags |= OSQL_FLAGS_SCDONE; + } + return rc; + } + + // sets last genid + rc = setup_merge(iq, iq->sc, NULL); + + if (arg->lockless) { + *pview = timepart_reaquire_view(arg->part_name); + if (!pview) { + logmsg(LOGMSG_ERROR, "%s view %s dropped while processing\n", + __func__, arg->part_name); + return VIEW_ERR_SC; + } + } + + shard_scs[arg->indx] = alter_sc; + return rc; +} + +static int setup_scs_for_merge(struct ireq *iq, struct schema_change_type ** first_shard_sc, + struct schema_change_type **later_shard_scs) { + struct schema_change_type *sc = *first_shard_sc = iq->sc; + assert(sc->kind == SC_ALTERTABLE); + + char *first_shard_name = timepart_shard_name(sc->tablename, 0, 0, NULL); + struct dbtable *first_shard = get_dbtable_by_name(first_shard_name); + free(first_shard_name); + + int rc = setup_first_shard_sc_for_merge(iq, first_shard); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: Failed to setup first shard sc\n", __func__); + return rc; + } + + + if (first_shard->sqlaliasname) { + strncpy(sc->newtable, sc->tablename, sizeof(sc->newtable)); /* piggyback a rename with alter */ + } + + *later_shard_scs = calloc(timepart_get_num_shards(sc->tablename), sizeof(struct schema_change_type *)); + if (*later_shard_scs) return VIEW_ERR_MALLOC; + timepart_sc_arg_t arg = {.s = sc, .start = 1 /* no publishing */, + .extra_args = (void *) later_shard_scs + .check_extra_shard = first_shard->sqlaliasname ? 1 : 0}; + s->iq = iq; + + rc = timepart_foreach_shard(setup_later_shard_scs_for_merge, &arg); +} + +static int launch_scs_for_merge(struct ireq *iq, struct schema_change_type *first_shard_sc, + struct schema_change_type **later_shard_scs) +{ + + iq->sc = first_shard_sc; + int shard_ix = 0; + + do { + if (!iq->sc) { logmsg(LOGMSG_WARN, "shard %d sc is null\n", shard_ix); continue; } + + int rc = launch_schema_change(iq, NULL); + if (rc) { + if (rc != SC_MASTER_DOWNGRADE) iq->osql_flags |= OSQL_FLAGS_SCDONE; + return ERR_SC; + } + + iq->sc->sc_next = iq->sc_pending; + iq->sc_pending = iq->sc + iq->sc = later_shard_scs[shard_ix] + } while (++shard_ix < num_shards); +} + +static int process_partitioned_table_merge(struct ireq *iq) +{ + int num_shards; + struct schema_change_type *first_shard_sc, *later_shard_scs; + + int rc = setup_scs_for_merge(iq, &first_shard_sc, &later_shard_scs, &num_shards); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: Failed to setup scs for partition merge. rc(%d)\n", __func__, rc); + return rc; + } + + rc = launch_scs_for_merge(iq); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: Failed to launch scs for partition merge. rc(%d)\n", __func__, rc); + return rc; + } + + return 0; +} + static int _process_partitioned_table_merge(struct ireq *iq) { struct schema_change_type *sc = iq->sc; @@ -6407,15 +6599,18 @@ static int _process_partitioned_table_merge(struct ireq *iq) struct dbtable *first_shard = get_dbtable_by_name(first_shard_name); free(first_shard_name); - /* we need to move data */ - sc->force_rebuild = 1; + const int latched_nothrevent = sc->nothrevent; + sc->force_rebuild = 1; /* we need to move data */ + sc->nothrevent = 1; /* we need do_add_table / do_alter_table to run first */ + sc->finalize = 0; + + // TODO: check if the first shard sc is quick enough to + // not be problematic if it blocks new master from coming up if (!first_shard->sqlaliasname) { /* * create a table with the same name as the partition */ - sc->nothrevent = 1; /* we need do_add_table to run first */ - sc->finalize = 0; /* make sure */ sc->kind = SC_ADDTABLE; rc = start_schema_change_tran(iq, NULL); @@ -6425,15 +6620,19 @@ static int _process_partitioned_table_merge(struct ireq *iq) iq->osql_flags |= OSQL_FLAGS_SCDONE; return ERR_SC; } - - iq->sc->sc_next = iq->sc_pending; - iq->sc_pending = iq->sc; } else { /* - * use the fast shard as the destination, after first altering it + * use the first shard as the destination, after first altering it */ - sc->nothrevent = 1; /* we need do_alter_table to run first */ - sc->finalize = 0; + if (gbl_enable_partitioned_table_merge_resume) { + sc->partition.type = PARTITION_NONE; + } else { + assert(sc->partition.type == PARTITION_MERGE); + // If partitioned table merge resumes are disabled, + // then we keep the type equal to PARTITION_MERGE. + // There is code later on that blocks the resume if + // the type is PARTITION_MERGE. + } strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename)); @@ -6444,25 +6643,34 @@ static int _process_partitioned_table_merge(struct ireq *iq) return ERR_SC; } - iq->sc->sc_next = iq->sc_pending; - iq->sc_pending = iq->sc; - arg.check_extra_shard = 1; strncpy(sc->newtable, sc->tablename, sizeof(sc->newtable)); /* piggyback a rename with alter */ arg.start = 1; /* since this is a partition drop, we do not need to set/reset arg.pos here */ } + iq->sc->sc_next = iq->sc_pending; + iq->sc_pending = iq->sc; + /* at this point we have created the future btree, launch an alter * for each of the shards of the partition */ + sc->nothrevent = latched_nothrevent; arg.s = sc; arg.s->iq = iq; arg.part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/ if (!arg.part_name) return VIEW_ERR_MALLOC; - arg.lockless = 1; - /* note: we have already set nothrevent depending on the number of shards */ + arg.lockless = 1; + + struct schema_change_type ** shard_scs = calloc(timepart_get_num_shards(sc->tablename), + sizeof(struct schema_change_type *)); + if (!shard_scs) return VIEW_ERR_MALLOC; + arg.extra_args = (void *) shard_scs; + + rc = timepart_foreach_shard(setup_schema_change_tran_wrapper_merge, &arg); + + // TODO: Do this in a thread rc = timepart_foreach_shard(start_schema_change_tran_wrapper_merge, &arg); free(arg.part_name); @@ -6646,7 +6854,8 @@ static int _process_partition_alter_and_drop(struct ireq *iq) sc->nothrevent = nshards > gbl_dohsql_sc_max_threads; if (sc->partition.type == PARTITION_MERGE) { - return _process_partitioned_table_merge(iq); + return process_partitioned_table_merge(iq); + // return _process_partitioned_table_merge(iq); } timepart_sc_arg_t arg = {0}; diff --git a/db/views.h b/db/views.h index 6b5384cab6..376cf62f72 100644 --- a/db/views.h +++ b/db/views.h @@ -89,6 +89,7 @@ typedef struct timepart_sc_arg { /* output */ int pos; /* is this the first and/or the last shard */ int indx; /* currently selected shard index */ + void *extra_args; /* any extra args we want to pass to the function */ } timepart_sc_arg_t; extern int gbl_partitioned_table_enabled; diff --git a/schemachange/sc_alter_table.c b/schemachange/sc_alter_table.c index ae53c43294..74b9d68b1d 100644 --- a/schemachange/sc_alter_table.c +++ b/schemachange/sc_alter_table.c @@ -389,33 +389,23 @@ static void decrement_sc_yet_to_resume_counter() } } -int do_alter_table(struct ireq *iq, struct schema_change_type *s, - tran_type *tran) +int setup_alter(struct ireq *iq, struct schema_change_type *s, tran_type *tran) { - struct dbtable *db; int rc; int bdberr = 0; struct dbtable *newdb; int datacopy_odh = 0; int changed; - int i; char new_prefix[32]; struct scinfo scinfo; struct errstat err = {0}; - db = get_dbtable_by_name(s->tablename); + struct dbtable *db = get_dbtable_by_name(s->tablename); if (db == NULL) { sc_errf(s, "Table not found:%s\n", s->tablename); return SC_TABLE_DOESNOT_EXIST; } - - /* note for a partition merge, if we reuse the first shard (i.e. it is aliased), - * we need to alter it here instead of running do_merge_table - */ - if (s->partition.type == PARTITION_MERGE && !db->sqlaliasname) - return do_merge_table(iq, s, tran); - #ifdef DEBUG_SC logmsg(LOGMSG_INFO, "do_alter_table() %s\n", s->resume ? "resuming" : ""); #endif @@ -426,7 +416,7 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s, if (s->resume == SC_PREEMPT_RESUME) { newdb = db->sc_to; changed = s->schema_change; - goto convert_records; + return 0; } set_schemachange_options_tran(s, db, &scinfo, tran); @@ -591,7 +581,7 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s, /* set sc_genids, 0 them if we are starting a new schema change, or * restore them to their previous values if we are resuming */ - if (init_sc_genids(newdb, s)) { + if (init_sc_genids(db, newdb, s)) { sc_errf(s, "failed initilizing sc_genids\n"); delete_temp_table(iq, newdb); change_schemas_recover(s->tablename); @@ -606,8 +596,42 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s, db->sc_downgrading = 0; db->doing_conversion = 1; /* live_sc_off will unset it */ Pthread_rwlock_unlock(&db->sc_live_lk); + return 0; +} + +int do_alter_table(struct ireq *iq, struct schema_change_type *s, + tran_type *tran) +{ + int rc; + struct dbtable *newdb; + int changed; + int i; + + struct dbtable *db = get_dbtable_by_name(s->tablename); + if (db == NULL) { + sc_errf(s, "Table not found:%s\n", s->tablename); + return SC_TABLE_DOESNOT_EXIST; + } + + /* note for a partition merge, if we reuse the first shard (i.e. it is aliased), + * we need to alter it here instead of running do_merge_table + */ + if (s->partition.type == PARTITION_MERGE && !db->sqlaliasname) + return do_merge_table(iq, s, tran); + + // TODO: Not sure if this is a good indicator + if (!s->newdb) { + rc = setup_alter(iq, s, tran); + if (rc) return rc; + } + + if (s->resume == SC_PREEMPT_RESUME) { + newdb = db->sc_to; + changed = s->schema_change; + } else { + newdb = s->newdb; + } -convert_records: assert(db->sc_from == db && s->db == db); assert(db->sc_to == newdb && s->newdb == newdb); assert(db->doing_conversion == 1); @@ -654,7 +678,8 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s, changed == SC_CONSTRAINT_CHANGE) { if (!s->live) gbl_readonly_sc = 1; - rc = convert_all_records(db, newdb, newdb->sc_genids, s); + + rc = convert_all_records(db, newdb, db->sc_genids, s); if (rc == 1) rc = 0; } else rc = 0; @@ -725,37 +750,28 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s, return SC_OK; } -static int do_merge_table(struct ireq *iq, struct schema_change_type *s, - tran_type *tran) +// Runs after the sc is set 'running' and before it is actually +// launched. +int setup_merge(struct ireq *iq, struct schema_change_type *s, + tran_type *tran) { - struct dbtable *db; - struct dbtable *newdb; - int i; - int rc; - struct scinfo scinfo; - -#ifdef DEBUG_SC - logmsg(LOGMSG_INFO, "do_alter_table() %s\n", s->resume ? "resuming" : ""); -#endif - - gbl_sc_last_writer_time = 0; - - db = get_dbtable_by_name(s->tablename); + struct dbtable *db = get_dbtable_by_name(s->tablename); if (db == NULL) { sc_errf(s, "Table not found:%s\n", s->tablename); + if (s->resume) { decrement_sc_yet_to_resume_counter(); } return SC_TABLE_DOESNOT_EXIST; } - if (s->resume == SC_PREEMPT_RESUME) { - newdb = db->sc_to; - goto convert_records; + return 0; } - newdb = s->newdb; + struct dbtable *newdb = s->newdb; + struct scinfo scinfo; set_schemachange_options_tran(s, db, &scinfo, tran); + int rc; if ((rc = check_option_coherency(s, db, &scinfo))) return rc; sc_printf(s, "starting table merge with seed %0#16llx\n", @@ -764,6 +780,7 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, if ((iq == NULL || iq->tranddl <= 1) && db->n_rev_constraints > 0 && !self_referenced_only(db)) { sc_client_error(s, "Cannot drop a table referenced by a foreign key"); + if (s->resume) { decrement_sc_yet_to_resume_counter(); } return -1; } @@ -772,13 +789,14 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, /* ban old settings */ if (db->dbnum) { sc_client_error(s, "Cannot comdbg tables"); + if (s->resume) { decrement_sc_yet_to_resume_counter(); } return -1; } - /* set sc_genids, 0 them if we are starting a new schema change, or - * restore them to their previous values if we are resuming */ - if (init_sc_genids(newdb, s)) { - sc_client_error(s, "Failed to initialize sc_genids"); + db->sc_genids = (unsigned long long *) calloc(MAXDTASTRIPE, sizeof(unsigned long long)); + if (!db->sc_genids) { + sc_errf(s, "failed initilizing sc_genids\n"); + if (s->resume) { decrement_sc_yet_to_resume_counter(); } return -1; } @@ -790,7 +808,35 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, db->doing_conversion = 1; /* live_sc_off will unset it */ Pthread_rwlock_unlock(&db->sc_live_lk); -convert_records: + if (s->resume && IS_ALTERTABLE(s)) { + if (gbl_test_sc_resume_race && !get_stopsc(__func__, __LINE__)) { + logmsg(LOGMSG_INFO, "%s:%d sleeping 5s for sc_resume test\n", + __func__, __LINE__); + sleep(5); + } + decrement_sc_yet_to_resume_counter(); + } + return 0; +} + +static int do_merge_table(struct ireq *iq, struct schema_change_type *s, + tran_type *tran) +{ + int i; + int rc; + +#ifdef DEBUG_SC + logmsg(LOGMSG_INFO, "do_merge_table() %s\n", s->resume ? "resuming" : ""); +#endif + struct dbtable *db = get_dbtable_by_name(s->tablename); + if (db == NULL) { + sc_errf(s, "Table not found:%s\n", s->tablename); + if (s->resume) { decrement_sc_yet_to_resume_counter(); } + return SC_TABLE_DOESNOT_EXIST; + } + + struct dbtable *newdb = s->resume == SC_PREEMPT_RESUME ? db->sc_to : s->newdb; + assert(db->sc_from == db && s->db == db); assert(db->sc_to == newdb && s->newdb == newdb); assert(db->doing_conversion == 1); @@ -813,8 +859,10 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, struct schema *tag = find_tag_schema(newdb, ".NEW..ONDISK"); if (!tag) { struct errstat err = {0}; + Pthread_mutex_lock(&csc2_subsystem_mtx); rc = populate_db_with_alt_schema(thedb, newdb, newdb->csc2_schema, &err); + Pthread_mutex_unlock(&csc2_subsystem_mtx); if (rc) { logmsg(LOGMSG_ERROR, "%s\ncsc2: \"%s\"\n", err.errstr, newdb->csc2_schema); @@ -827,7 +875,7 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, /* skip converting records for fastinit and planned schema change * that doesn't require rebuilding anything. */ - rc = convert_all_records(db, newdb, newdb->sc_genids, s); + rc = convert_all_records(db, newdb, db->sc_genids, s); if (rc == 1) rc = 0; remove_ongoing_alter(s); @@ -866,7 +914,7 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, for (i = 0; i < gbl_dtastripe; i++) { sc_errf(s, " > [%s] stripe %2d was at 0x%016llx\n", s->tablename, - i, newdb->sc_genids[i]); + i, db->sc_genids[i]); } while (s->logical_livesc) { @@ -885,6 +933,14 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, /* handle renaming sqlite_stat1 entries for idx */ check_for_idx_rename(s->newdb, s->db); + // All shards point to the same newdb. + // + // By setting it to NULL here, we ensure that + // all shards except for the first shard (which does not run + // do_merge_table) have newdb set to NULL. + // This avoids double-freeing it. + if (rc == SC_MASTER_DOWNGRADE) { s->newdb = NULL; } + return SC_OK; } @@ -1238,7 +1294,7 @@ int do_upgrade_table_int(struct schema_change_type *s) sc_printf(s, "Starting FULL table upgrade.\n"); } - if (init_sc_genids(db, s)) { + if (init_sc_genids(db, db, s)) { sc_errf(s, "failed initilizing sc_genids\n"); return SC_LLMETA_ERR; } diff --git a/schemachange/sc_callbacks.c b/schemachange/sc_callbacks.c index 3051115971..9292164447 100644 --- a/schemachange/sc_callbacks.c +++ b/schemachange/sc_callbacks.c @@ -259,7 +259,7 @@ int live_sc_post_update_delayed_key_adds_int(struct ireq *iq, void *trans, /* need to check where the cursor is, even tho that check was done once in * post_update */ int is_gen_gt_scptr = is_genid_right_of_stripe_pointer( - iq->usedb->handle, newgenid, usedb->sc_to->sc_genids); + iq->usedb->handle, newgenid, usedb->sc_from->sc_genids); if (is_gen_gt_scptr) { if (iq->debug) { reqprintf(iq, "live_sc_post_update_delayed_key_adds_int: skip " diff --git a/schemachange/sc_records.c b/schemachange/sc_records.c index 2ec5e71aae..32b547a3c5 100644 --- a/schemachange/sc_records.c +++ b/schemachange/sc_records.c @@ -192,7 +192,7 @@ static inline void lkcounter_check(struct convert_record_data *data, int now) * stripe. * If the schema change is not resuming it sets them all to zero * If success it returns 0, if failure it returns <0 */ -int init_sc_genids(struct dbtable *db, struct schema_change_type *s) +int init_sc_genids(struct dbtable *db, struct dbtable *newdb, struct schema_change_type *s) { void *rec; int orglen, bdberr, stripe; @@ -238,8 +238,8 @@ int init_sc_genids(struct dbtable *db, struct schema_change_type *s) /* get this stripe's newest genid and store it in sc_genids, * if we have been rebuilding the data files we can grab the genids * straight from there, otherwise we look in the llmeta table */ - if (is_dta_being_rebuilt(db->plan)) { - rc = bdb_find_newest_genid(db->handle, NULL, stripe, rec, &dtalen, + if (is_dta_being_rebuilt(newdb->plan)) { + rc = bdb_find_newest_genid(newdb->handle, NULL, stripe, rec, &dtalen, dtalen, &sc_genids[stripe], &ver, &bdberr); if (rc == 1) @@ -779,7 +779,7 @@ static int convert_record(struct convert_record_data *data) rc = 0; if (usellmeta && !is_dta_being_rebuilt(data->to->plan)) { int bdberr; - rc = bdb_set_high_genid_stripe(NULL, data->to->tablename, + rc = bdb_set_high_genid_stripe(NULL, data->from->tablename, data->stripe, -1ULL, &bdberr); if (rc != 0) rc = -1; // convert_record expects -1 } @@ -1027,7 +1027,7 @@ static int convert_record(struct convert_record_data *data) (data->nrecs % BDB_ATTR_GET(thedb->bdb_attr, INDEXREBUILD_SAVE_EVERY_N)) == 0) { int bdberr; - rc = bdb_set_high_genid(data->trans, data->to->tablename, genid, + rc = bdb_set_high_genid(data->trans, data->from->tablename, genid, &bdberr); if (rc != 0) { if (bdberr == BDBERR_DEADLOCK) @@ -2680,7 +2680,7 @@ static int live_sc_redo_add(struct convert_record_data *data, DB_LOGC *logc, if (!is_dta_being_rebuilt(data->to->plan)) { int bdberr; - rc = bdb_set_high_genid(data->trans, data->to->tablename, genid, + rc = bdb_set_high_genid(data->trans, data->from->tablename, genid, &bdberr); if (rc != 0) { if (bdberr == BDBERR_DEADLOCK) diff --git a/schemachange/sc_records.h b/schemachange/sc_records.h index dfc7dd3fe3..033206ab06 100644 --- a/schemachange/sc_records.h +++ b/schemachange/sc_records.h @@ -96,7 +96,7 @@ int upgrade_all_records(struct dbtable *db, unsigned long long *sc_genids, void convert_record_data_cleanup(struct convert_record_data *data); -int init_sc_genids(struct dbtable *db, struct schema_change_type *s); +int init_sc_genids(struct dbtable *db, struct dbtable *newdb, struct schema_change_type *s); void live_sc_enter_exclusive_all(bdb_state_type *, tran_type *); diff --git a/schemachange/sc_schema.c b/schemachange/sc_schema.c index 3fb9aa72cc..8d5ab19e24 100644 --- a/schemachange/sc_schema.c +++ b/schemachange/sc_schema.c @@ -615,7 +615,7 @@ void verify_schema_change_constraint(struct ireq *iq, void *trans, goto unlock; if (is_genid_right_of_stripe_pointer(usedb->handle, newgenid, - usedb->sc_to->sc_genids)) { + usedb->sc_from->sc_genids)) { goto unlock; } diff --git a/schemachange/schemachange.c b/schemachange/schemachange.c index 4481bf82f1..c43495ed39 100644 --- a/schemachange/schemachange.c +++ b/schemachange/schemachange.c @@ -42,8 +42,8 @@ const char *get_hostname_with_crc32(bdb_state_type *bdb_state, extern int gbl_test_sc_resume_race; -/* If this is successful, it increments */ -int start_schema_change_tran(struct ireq *iq, tran_type *trans) +// Does everything to prepare a schema change for launch +int setup_schema_change(struct ireq *iq, tran_type *trans) { struct schema_change_type *s = iq->sc; int maxcancelretry = 10; @@ -305,10 +305,6 @@ int start_schema_change_tran(struct ireq *iq, tran_type *trans) } iq->sc_seed = seed; - sc_arg_t *arg = malloc(sizeof(sc_arg_t)); - arg->trans = trans; - arg->iq = iq; - arg->sc = iq->sc; s->started = 0; if (s->resume && s->resume != SC_OSQL_RESUME && IS_ALTERTABLE(s)) { @@ -319,6 +315,19 @@ int start_schema_change_tran(struct ireq *iq, tran_type *trans) } ATOMIC_ADD32(gbl_sc_resume_start, 1); } + return 0; +} + +int launch_schema_change(struct ireq *iq, tran_type *trans) +{ + int rc = 0; + struct schema_change_type *s = iq->sc; + + sc_arg_t *arg = malloc(sizeof(sc_arg_t)); + arg->trans = trans; + arg->iq = iq; + arg->sc = iq->sc; + /* ** if s->kind == SC_PARTIALUPRECS, we're going radio silent from this point *forward @@ -388,6 +397,29 @@ int start_schema_change_tran(struct ireq *iq, tran_type *trans) return rc; } +/* Spawns a schema change + * + * resuming alters will increment gbl_sc_resume_start before + * they are spawned and then decrement it once they + * have set their last converted genid. + * + * When gbl_sc_resume_start is zero, the db knows + * that it is safe to apply new writes. + */ +int start_schema_change_tran(struct ireq *iq, tran_type *trans) +{ + int rc = setup_schema_change(iq, trans); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: Failed to setup schema change\n", __func__); + return rc; + } + rc = launch_schema_change(iq, trans); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: Failed to launch schema change\n", __func__); + return rc; + } + return 0; +} int start_schema_change(struct schema_change_type *s) { @@ -613,7 +645,7 @@ int live_sc_post_delete_int(struct ireq *iq, void *trans, } if (is_genid_right_of_stripe_pointer(iq->usedb->handle, genid, - iq->usedb->sc_to->sc_genids)) { + iq->usedb->sc_genids)) { return 0; } @@ -711,7 +743,7 @@ int live_sc_post_add_int(struct ireq *iq, void *trans, unsigned long long genid, } if (is_genid_right_of_stripe_pointer(iq->usedb->handle, genid, - iq->usedb->sc_to->sc_genids)) { + iq->usedb->sc_genids)) { return 0; } @@ -800,7 +832,7 @@ int live_sc_post_update_int(struct ireq *iq, void *trans, return 0; } - unsigned long long *sc_genids = iq->usedb->sc_to->sc_genids; + unsigned long long *sc_genids = iq->usedb->sc_genids; if (iq->debug) { reqpushprefixf(iq, "live_sc_post_update: "); } diff --git a/schemachange/schemachange.h b/schemachange/schemachange.h index a49cf10111..7a672d6802 100644 --- a/schemachange/schemachange.h +++ b/schemachange/schemachange.h @@ -418,7 +418,11 @@ typedef struct sc_list sc_list_t; */ int sc_list_create(sc_list_t *scl, void *vscs, uuid_t uuid); +int setup_merge(struct ireq *iq, struct schema_change_type *s, + tran_type *tran); size_t schemachange_packed_size(struct schema_change_type *s); +int setup_schema_change(struct ireq *, tran_type *tran); +int launch_schema_change(struct ireq *, tran_type *tran); int start_schema_change_tran(struct ireq *, tran_type *tran); int start_schema_change(struct schema_change_type *); int create_queue(struct dbenv *, char *queuename, int avgitem, int pagesize); diff --git a/tests/TODO b/tests/TODO index d3efb4f94f..45b8d4c758 100644 --- a/tests/TODO +++ b/tests/TODO @@ -83,6 +83,5 @@ rowlocks_blkseq.test sc_async_constraints.test sigstopcluster.test weighted_standing_queue.test -- failing in rhel8 + podman -sc_resume_partition.test # vim: set sw=4 ts=4 et: diff --git a/tests/sc_resume_partition.test/lrl.options b/tests/sc_resume_partition.test/lrl.options new file mode 100644 index 0000000000..a6ef418ac2 --- /dev/null +++ b/tests/sc_resume_partition.test/lrl.options @@ -0,0 +1,3 @@ +multitable_ddl 1 +partitioned_table_merge_resume on +test_sc_resume_race 1 diff --git a/tests/sc_resume_partition.test/mdouglas47db.failexit b/tests/sc_resume_partition.test/mdouglas47db.failexit new file mode 100644 index 0000000000..cef279cc8c --- /dev/null +++ b/tests/sc_resume_partition.test/mdouglas47db.failexit @@ -0,0 +1 @@ +Failed kill_by_pidfile: pidfile /mdouglas47db.m4.pid does not exist diff --git a/tests/sc_resume_partition.test/runit b/tests/sc_resume_partition.test/runit index 2adf67a9ab..d868a72d4a 100755 --- a/tests/sc_resume_partition.test/runit +++ b/tests/sc_resume_partition.test/runit @@ -1,60 +1,114 @@ #!/usr/bin/env bash +set -ex +dbnm=$1 +tier="local" +CDB2SQL_EXE="cdb2sql" +DBNAME=$dbnm + source ${TESTSROOTDIR}/tools/runit_common.sh source ${TESTSROOTDIR}/tools/cluster_utils.sh +source util.sh -set -ex - -[ -z "${CLUSTER}" ] && { echo "Test requires a cluster"; exit 0; } +declare -r CRASH=1 +declare -r DOWNGRADE=2 -dbnm=$1 +declare -r CREATE=10 +declare -r ALTER=11 -restart_cluster() { - set +e - for node in ${CLUSTER} ; do - kill_restart_node ${node} & - done - set -e +setup() { + local -r tbl_name=$1 num_shards=$2 num_records_per_shard=$3 op_for_creating_partitioned_table=$4 - sleep 2 + create_table ${tbl_name} ${op_for_creating_table} + insert_records_into_table ${tbl_name} ${num_shards} ${num_records_per_shard} +} - wait_for_cluster +teardown() { + local -r tbl_name=$1 + cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "drop table ${tbl_name}" } -test_partition_merge_resume() { - # Given - local starttime - starttime=$(get_timestamp 120) +do_resume() { + local -r tbl_name=$1 num_shards=$2 num_records_inserted_per_shard_during_resume=$3 resume_trigger=$4 local master - master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'` + master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm ${tier} 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'` - cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('convert_record_sleep 1')" - cdb2sql ${CDB2_OPTIONS} ${dbnm} default "create table t(a int) partitioned by time period 'daily' retention 2 start '${starttime}'" - cdb2sql ${CDB2_OPTIONS} $dbnm default "insert into t values (1)" + COMDB2_CONFIG_MIN_RETRIES=0 \ + COMDB2_CONFIG_MAX_RETRIES=0 \ + cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "alter table ${tbl_name} partitioned by none" & + local -r waitpid=$! + sleep 2 - # When - COMDB2_CONFIG_MIN_RETRIES=1 cdb2sql ${CDB2_OPTIONS} ${dbnm} default "alter table t partitioned by none" & - waitpid=$! - sleep 1 - restart_cluster &> /dev/null + # cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "rebuild ${tbl_name}" & + trigger_resume ${master} ${resume_trigger} - # Then if wait ${waitpid}; then - echo "FAIL: Merge succeeded before cluster bounced. Test is buggy." + echo "FAIL: Merge succeeded before master swing. Test is buggy." return 1 fi + insert_records_into_table ${tbl_name} ${num_shards} ${num_records_inserted_per_shard_during_resume} + + wait_for_outstanding_scs &> /dev/null + +} + +# Verifies that resume completed correctly +verify() { + local -r tbl_name=$1 num_shards=$2 num_records_per_shard=$3 num_records_inserted_per_shard_during_resume=$4 + local timepart - timepart=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select * from comdb2_timepartitions where name='t'") - if [[ -z ${timepart} ]]; + timepart=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} ${tier} "select * from comdb2_timepartitions where name='${tbl_name}'") + if [[ -n ${timepart} ]]; + then + echo "FAIL: Found time partition that should not exist" + return 1 + fi + + local -r expected_num_records=$((num_shards*num_records_per_shard+num_shards*num_records_inserted_per_shard_during_resume)) + local actual_num_records + actual_num_records=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} ${tier} "select count(*) from ${tbl_name}") + if (( actual_num_records != expected_num_records )); then - echo "FAIL: Could not find expected time partition" + echo "FAIL: Unexpected number of records after partition merge" return 1 fi +} + +run_test() { + local -r num_shards=$1 num_records_per_shard=$2 resume_trigger=$3 op_for_creating_partition=$4 + local -r tbl_name="t" num_records_per_shard_inserted_during_resume=1 - # Cleanup - cdb2sql ${CDB2_OPTIONS} ${dbnm} default "drop table t" + setup ${tbl_name} ${num_shards} ${num_records_per_shard} ${op_for_creating_partition} + do_resume ${tbl_name} ${num_shards} ${num_records_per_shard_inserted_during_resume} ${resume_trigger} + verify ${tbl_name} ${num_shards} ${num_records_per_shard} ${num_records_per_shard_inserted_during_resume} + teardown ${tbl_name} +} + +main() { + local -r num_shards=5 + for num_records_per_shard in 10000; + do + for resume_trigger in ${CRASH}; + do + for op_for_creating_partition in ${CREATE} ${ALTER}; + do + echo "------------------------" + echo "Testing resume of a partitioned table triggered by a ${resume_trigger} " + echo "Partitioned table was created using ${op_for_creating_partition} and contains ${num_records_per_shard} " + echo "records in each of ${num_shards} shards" + run_test ${num_shards} ${num_records_per_shard} ${resume_trigger} ${op_for_creating_partition} + local rc=$? + if (( rc == 0 )); then + echo "Passed test --------------" + else + echo "Failed test --------------" + exit 1 + fi + done + done + done } -test_partition_merge_resume +main diff --git a/tests/sc_resume_partition.test/util.sh b/tests/sc_resume_partition.test/util.sh new file mode 100644 index 0000000000..bcbe947afe --- /dev/null +++ b/tests/sc_resume_partition.test/util.sh @@ -0,0 +1,85 @@ +restart_cluster() { + local -r restart_delay_secs=10 + + set +e + if [ -z $CLUSTER ]; + then + kill_restart_node $1 ${restart_delay_secs} & + else + for node in ${CLUSTER} ; do + kill_restart_node ${node} ${restart_delay_secs} & + done + fi + + sleep 2 + wait_for_cluster + set -e +} + +downgrade() { + local master=$1 + cdb2sql --host ${master} ${CDB2_OPTIONS} ${dbnm} ${tier} "exec procedure sys.cmd.send('downgrade')" + + local new_master=${master} + while [[ "${new_master}" == "${master}" ]]; do + sleep 1 + new_master=$(get_master) + done +} + +num_outstanding_scs() { + cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} ${tier} "select count(*) from comdb2_sc_status where status!='COMMITTED'" +} + +wait_for_outstanding_scs() { + local num_scs + + num_scs=$(num_outstanding_scs) + while (( num_scs > 0 )); do + sleep 1 + num_scs=$(num_outstanding_scs) + done +} + +create_table() { + local -r tbl_name=$1 + + local starttime + starttime=$(get_timestamp 120) + #cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "create table ${tbl_name}(i int, j int)" + if (( op_for_creating_partitioned_table == CREATE )); then + cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "create table ${tbl_name}(a int) partitioned by time period 'daily' retention ${num_shards} start '${starttime}'" + elif (( op_for_creating_partitioned_table == ALTER )); then + cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "create table ${tbl_name}(a int)" + cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "alter table ${tbl_name} partitioned by time period 'daily' retention ${num_shards} start '${starttime}'" + else + echo "Don't recognize op for creating partitioned table '${op_for_creating_partitioned_table}'" + fi +} + +insert_records_into_table() { + local -r tbl_name=$1 num_shards=$2 num_records_per_shard=$3 + for i in $(seq 0 1 $((num_shards-1))); + do + local shard + shard=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} ${tier} "select shardname from comdb2_timepartshards limit 1 offset ${i}") + for (( i=1; i<=${num_records_per_shard}; i+=10000 )); do + local min="${i}" + local max="$(( ((${i} + 9999) < ${num_records_per_shard}) ? (${i} + 9999) : ${num_records_per_shard}))" + cdb2sql ${CDB2_OPTIONS} ${DBNAME} ${tier} "insert into '${shard}' select * from generate_series(${min}, ${max})" + #cdb2sql ${CDB2_OPTIONS} ${DBNAME} ${tier} "insert into '${tbl_name}' select 1, * from generate_series(${min}, ${max})" + done + done +} + +trigger_resume() { + local -r master=$1 resume_trigger=$2 + if (( resume_trigger == CRASH )); then + restart_cluster ${master} + elif (( resume_trigger == DOWNGRADE )); then + downgrade ${master} + else + echo "FAIL: expected resume_trigger to be one of: CRASH, DOWNGRADE" + return 1 + fi +} diff --git a/tests/tools/cluster_utils.sh b/tests/tools/cluster_utils.sh index f7e16a936b..728b189db7 100644 --- a/tests/tools/cluster_utils.sh +++ b/tests/tools/cluster_utils.sh @@ -254,7 +254,11 @@ function kill_restart_tertiary_node function wait_for_cluster { - for node in ${CLUSTER} ; do - waitmach ${node} - done + if [ -z "$CLUSTER" ]; then + waitmach default + else + for node in ${CLUSTER} ; do + waitmach ${node} + done + fi }