Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdb2api/cdb2api.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ static void do_init_once(void)
if (min_retries) {
MIN_RETRIES = atoi(min_retries);
}
char *max_retries = getenv("COMDB2_CONFIG_MAX_RETRIES");
if (max_retries) {
MAX_RETRIES = atoi(max_retries);
}
}

/* if sqlstr is a read stmt will return 1 otherwise return 0
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#define COMPOSITE_TUNABLE_SEP '.'

extern int gbl_transactional_drop_plus_rename;
extern int gbl_enable_partitioned_table_merge_resume;
extern int gbl_bulk_import_validation_werror;
extern int gbl_debug_sleep_during_bulk_import;
extern int gbl_waitalive_iterations;
Expand Down
4 changes: 4 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ REGISTER_TUNABLE("abort_during_downgrade_if_scs_dont_stop", "Abort if scs don't
"after starting a downgrade (default OFF)", TUNABLE_BOOLEAN,
&gbl_abort_during_downgrade_if_scs_dont_stop, 0, NULL, NULL,
NULL, NULL);
REGISTER_TUNABLE("partitioned_table_merge_resume", "Toggle the ability to resume "
"partitioned table collapse SCs (default OFF)", TUNABLE_BOOLEAN,
&gbl_enable_partitioned_table_merge_resume, 0, NULL, NULL,
NULL, NULL);
REGISTER_TUNABLE("abort_on_in_use_rqid", NULL, TUNABLE_BOOLEAN,
&gbl_abort_on_clear_inuse_rqid, READONLY | NOARG, NULL, NULL,
NULL, NULL);
Expand Down
2 changes: 1 addition & 1 deletion db/osqlblockproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1598,7 +1598,7 @@ int resume_sc_multiddl_txn(sc_list_t *scl)
init_fake_ireq(thedb, iq);

/* this starts schema changeas;
* the alters have to register themselves inline,
* the alters have to register themselves inline (by incrementing gbl_sc_resume_start),
* but the rest of the execution is done in parallel
* waiting is done by a separate thread that will finalize
* the schema change
Expand Down
263 changes: 236 additions & 27 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ extern int g_osql_ready;
extern int gbl_goslow;
extern int gbl_partial_indexes;

int gbl_enable_partitioned_table_merge_resume = 1;
int gbl_master_sends_query_effects = 1;
int gbl_toblock_random_deadlock_trans;
int gbl_toblock_random_verify_error;
Expand Down Expand Up @@ -6287,13 +6288,13 @@ static int _process_single_table_sc(struct ireq *iq)
return rc;
}

static int start_schema_change_tran_wrapper_merge(const char *tblname,
static int setup_schema_change_tran_wrapper_merge(const char *tblname,
timepart_view_t **pview,
timepart_sc_arg_t *arg)
{
struct schema_change_type ** shard_scs = (struct schema_change_type **) arg->extra_args;
struct schema_change_type *sc = arg->s;
struct ireq *iq = sc->iq;
int rc;

/* first shard drops partition also */
if (arg->pos & LAST_SHARD) {
Expand All @@ -6312,23 +6313,56 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,
alter_sc->force_rebuild = 1; /* we are moving rows here */
/* alter only in parallel mode for live */
alter_sc->scanmode = SCAN_PARALLEL;
alter_sc->resume = sc->resume;
/* link the sc */
iq->sc = alter_sc;

/**
* if view is provided, this is part of a shard walk;
* release views lock here since sc can take awhile
*
*/
if (arg->lockless)
views_unlock();
// Release views lock if we're doing shard walk
if (arg->lockless) views_unlock();

int rc = setup_schema_change(iq, NULL);
if (rc) {
if (rc != SC_MASTER_DOWNGRADE) {
// ? TODO

iq->osql_flags |= OSQL_FLAGS_SCDONE;
}
return rc;
}

rc = start_schema_change_tran(iq, NULL);
// sets last genid
rc = setup_merge(iq, iq->sc, NULL);

if (arg->lockless) {
*pview = timepart_reaquire_view(arg->part_name);
if (!pview) {
logmsg(LOGMSG_ERROR, "%s view %s dropped while processing\n",
__func__, arg->part_name);
return VIEW_ERR_SC;
}
}

shard_scs[arg->indx] = alter_sc;
return rc;
}

static int start_schema_change_tran_wrapper_merge(const char *tblname,
timepart_view_t **pview,
timepart_sc_arg_t *arg)
{
struct schema_change_type ** shard_scs = (struct schema_change_type **) arg->extra_args;
struct ireq *iq = arg->s->iq;
iq->sc = shard_scs[arg->indx];

// Release views lock if we're doing a shard walk
if (arg->lockless) views_unlock();

int rc = launch_schema_change(iq, NULL);

/* link the alter */
iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;
iq->sc->newdb = NULL; /* lose ownership, otherwise double free */
if (iq->sc->nothrevent) { iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ }

if (arg->lockless) {
*pview = timepart_reaquire_view(arg->part_name);
Expand Down Expand Up @@ -6389,6 +6423,164 @@ static int _process_single_table_sc_merge(struct ireq *iq)
return rc;
}

static int setup_first_shard_sc_for_merge(struct ireq *iq, struct dbtable *first_shard) {

struct schema_change_type *sc = iq->sc;
if (!first_shard->sqlaliasname) {
sc->kind = SC_ADDTABLE;
} else {
if (gbl_enable_partitioned_table_merge_resume) {
sc->partition.type = PARTITION_NONE;
} else {
assert(sc->partition.type == PARTITION_MERGE);
// If partitioned table merge resumes are disabled,
// then we keep the type equal to PARTITION_MERGE.
// There is code later on that blocks the resume if
// the type is PARTITION_MERGE.
}

strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename));
}

int rc = setup_schema_change(iq, NULL);
if (rc) {
if (rc != SC_MASTER_DOWNGRADE) iq->osql_flags |= OSQL_FLAGS_SCDONE;
return ERR_SC;
}

return 0;
}

static int setup_later_shard_scs_for_merge(const char *tblname,
timepart_view_t **pview,
timepart_sc_arg_t *arg)
{
struct schema_change_type ** shard_scs = (struct schema_change_type **) arg->extra_args;
struct schema_change_type *sc = arg->s;
struct ireq *iq = sc->iq;

/* first shard drops partition also */
if (arg->pos & LAST_SHARD) {
sc->publish = partition_publish;
sc->unpublish = partition_unpublish;
}

struct schema_change_type *alter_sc = clone_schemachange_type(sc);

/* new target */
strncpy0(alter_sc->tablename, tblname, sizeof(sc->tablename));
/*alter_sc->usedbtablevers = sc->partition.u.mergetable.version;*/
alter_sc->kind = SC_ALTERTABLE;
/* use the created file as target */
alter_sc->newdb = sc->newdb;
alter_sc->force_rebuild = 1; /* we are moving rows here */
/* alter only in parallel mode for live */
alter_sc->scanmode = SCAN_PARALLEL;
alter_sc->resume = sc->resume;
/* link the sc */
iq->sc = alter_sc;

// Release views lock if we're doing shard walk
if (arg->lockless) views_unlock();

int rc = setup_schema_change(iq, NULL);
if (rc) {
if (rc != SC_MASTER_DOWNGRADE) {
// ? TODO

iq->osql_flags |= OSQL_FLAGS_SCDONE;
}
return rc;
}

// sets last genid
rc = setup_merge(iq, iq->sc, NULL);

if (arg->lockless) {
*pview = timepart_reaquire_view(arg->part_name);
if (!pview) {
logmsg(LOGMSG_ERROR, "%s view %s dropped while processing\n",
__func__, arg->part_name);
return VIEW_ERR_SC;
}
}

shard_scs[arg->indx] = alter_sc;
return rc;
}

static int setup_scs_for_merge(struct ireq *iq, struct schema_change_type ** first_shard_sc,
struct schema_change_type **later_shard_scs) {
struct schema_change_type *sc = *first_shard_sc = iq->sc;
assert(sc->kind == SC_ALTERTABLE);

char *first_shard_name = timepart_shard_name(sc->tablename, 0, 0, NULL);
struct dbtable *first_shard = get_dbtable_by_name(first_shard_name);
free(first_shard_name);

int rc = setup_first_shard_sc_for_merge(iq, first_shard);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: Failed to setup first shard sc\n", __func__);
return rc;
}


if (first_shard->sqlaliasname) {
strncpy(sc->newtable, sc->tablename, sizeof(sc->newtable)); /* piggyback a rename with alter */
}

*later_shard_scs = calloc(timepart_get_num_shards(sc->tablename), sizeof(struct schema_change_type *));
if (*later_shard_scs) return VIEW_ERR_MALLOC;
timepart_sc_arg_t arg = {.s = sc, .start = 1 /* no publishing */,
.extra_args = (void *) later_shard_scs
.check_extra_shard = first_shard->sqlaliasname ? 1 : 0};
s->iq = iq;

rc = timepart_foreach_shard(setup_later_shard_scs_for_merge, &arg);
}

static int launch_scs_for_merge(struct ireq *iq, struct schema_change_type *first_shard_sc,
struct schema_change_type **later_shard_scs)
{

iq->sc = first_shard_sc;
int shard_ix = 0;

do {
if (!iq->sc) { logmsg(LOGMSG_WARN, "shard %d sc is null\n", shard_ix); continue; }

int rc = launch_schema_change(iq, NULL);
if (rc) {
if (rc != SC_MASTER_DOWNGRADE) iq->osql_flags |= OSQL_FLAGS_SCDONE;
return ERR_SC;
}

iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc
iq->sc = later_shard_scs[shard_ix]
} while (++shard_ix < num_shards);
}

static int process_partitioned_table_merge(struct ireq *iq)
{
int num_shards;
struct schema_change_type *first_shard_sc, *later_shard_scs;

int rc = setup_scs_for_merge(iq, &first_shard_sc, &later_shard_scs, &num_shards);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: Failed to setup scs for partition merge. rc(%d)\n", __func__, rc);
return rc;
}

rc = launch_scs_for_merge(iq);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: Failed to launch scs for partition merge. rc(%d)\n", __func__, rc);
return rc;
}

return 0;
}

static int _process_partitioned_table_merge(struct ireq *iq)
{
struct schema_change_type *sc = iq->sc;
Expand All @@ -6407,15 +6599,18 @@ static int _process_partitioned_table_merge(struct ireq *iq)
struct dbtable *first_shard = get_dbtable_by_name(first_shard_name);
free(first_shard_name);

/* we need to move data */
sc->force_rebuild = 1;
const int latched_nothrevent = sc->nothrevent;

sc->force_rebuild = 1; /* we need to move data */
sc->nothrevent = 1; /* we need do_add_table / do_alter_table to run first */
sc->finalize = 0;

// TODO: check if the first shard sc is quick enough to
// not be problematic if it blocks new master from coming up
if (!first_shard->sqlaliasname) {
/*
* create a table with the same name as the partition
*/
sc->nothrevent = 1; /* we need do_add_table to run first */
sc->finalize = 0; /* make sure */
sc->kind = SC_ADDTABLE;

rc = start_schema_change_tran(iq, NULL);
Expand All @@ -6425,15 +6620,19 @@ static int _process_partitioned_table_merge(struct ireq *iq)
iq->osql_flags |= OSQL_FLAGS_SCDONE;
return ERR_SC;
}

iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;
} else {
/*
* use the fast shard as the destination, after first altering it
* use the first shard as the destination, after first altering it
*/
sc->nothrevent = 1; /* we need do_alter_table to run first */
sc->finalize = 0;
if (gbl_enable_partitioned_table_merge_resume) {
sc->partition.type = PARTITION_NONE;
} else {
assert(sc->partition.type == PARTITION_MERGE);
// If partitioned table merge resumes are disabled,
// then we keep the type equal to PARTITION_MERGE.
// There is code later on that blocks the resume if
// the type is PARTITION_MERGE.
}

strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename));

Expand All @@ -6444,25 +6643,34 @@ static int _process_partitioned_table_merge(struct ireq *iq)
return ERR_SC;
}

iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;

arg.check_extra_shard = 1;
strncpy(sc->newtable, sc->tablename, sizeof(sc->newtable)); /* piggyback a rename with alter */
arg.start = 1;
/* since this is a partition drop, we do not need to set/reset arg.pos here */
}

iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;

/* at this point we have created the future btree, launch an alter
* for each of the shards of the partition
*/
sc->nothrevent = latched_nothrevent;
arg.s = sc;
arg.s->iq = iq;
arg.part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/
if (!arg.part_name)
return VIEW_ERR_MALLOC;
arg.lockless = 1;
/* note: we have already set nothrevent depending on the number of shards */
arg.lockless = 1;

struct schema_change_type ** shard_scs = calloc(timepart_get_num_shards(sc->tablename),
sizeof(struct schema_change_type *));
if (!shard_scs) return VIEW_ERR_MALLOC;
arg.extra_args = (void *) shard_scs;

rc = timepart_foreach_shard(setup_schema_change_tran_wrapper_merge, &arg);

// TODO: Do this in a thread
rc = timepart_foreach_shard(start_schema_change_tran_wrapper_merge, &arg);
free(arg.part_name);

Expand Down Expand Up @@ -6646,7 +6854,8 @@ static int _process_partition_alter_and_drop(struct ireq *iq)
sc->nothrevent = nshards > gbl_dohsql_sc_max_threads;

if (sc->partition.type == PARTITION_MERGE) {
return _process_partitioned_table_merge(iq);
return process_partitioned_table_merge(iq);
// return _process_partitioned_table_merge(iq);
}

timepart_sc_arg_t arg = {0};
Expand Down
Loading