Skip to content

Commit f4ee997

Browse files
committed
Fix bugs in partition merge async resume
Signed-off-by: Morgan Douglas <[email protected]>
1 parent b899f4c commit f4ee997

File tree

8 files changed

+93
-32
lines changed

8 files changed

+93
-32
lines changed

cdb2api/cdb2api.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ static void do_init_once(void)
492492
if (min_retries) {
493493
MIN_RETRIES = atoi(min_retries);
494494
}
495+
char *max_retries = getenv("COMDB2_CONFIG_MAX_RETRIES");
496+
if (max_retries) {
497+
MAX_RETRIES = atoi(max_retries);
498+
}
495499
}
496500

497501
/* if sqlstr is a read stmt will return 1 otherwise return 0

db/db_tunables.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
/* Separator for composite tunable components. */
4242
#define COMPOSITE_TUNABLE_SEP '.'
4343

44+
extern int gbl_enable_partitioned_table_merge_resume;
4445
extern int gbl_bulk_import_validation_werror;
4546
extern int gbl_debug_sleep_during_bulk_import;
4647
extern int gbl_waitalive_iterations;

db/db_tunables.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
at multiple places.
2525
*/
2626

27+
REGISTER_TUNABLE("partitioned_table_merge_resume", "Toggle the ability to resume "
28+
"partitioned table collapse SCs (default OFF)", TUNABLE_BOOLEAN,
29+
&gbl_enable_partitioned_table_merge_resume, 0, NULL, NULL,
30+
NULL, NULL);
2731
REGISTER_TUNABLE("abort_on_in_use_rqid", NULL, TUNABLE_BOOLEAN,
2832
&gbl_abort_on_clear_inuse_rqid, READONLY | NOARG, NULL, NULL,
2933
NULL, NULL);

db/osqlcomm.c

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ extern int g_osql_ready;
7373
extern int gbl_goslow;
7474
extern int gbl_partial_indexes;
7575

76+
int gbl_enable_partitioned_table_merge_resume = 0;
7677
int gbl_master_sends_query_effects = 1;
7778
int gbl_toblock_random_deadlock_trans;
7879
int gbl_toblock_random_verify_error;
@@ -6312,6 +6313,7 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,
63126313
alter_sc->force_rebuild = 1; /* we are moving rows here */
63136314
/* alter only in parallel mode for live */
63146315
alter_sc->scanmode = SCAN_PARALLEL;
6316+
alter_sc->resume = sc->resume;
63156317
/* link the sc */
63166318
iq->sc = alter_sc;
63176319

@@ -6328,7 +6330,7 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,
63286330
/* link the alter */
63296331
iq->sc->sc_next = iq->sc_pending;
63306332
iq->sc_pending = iq->sc;
6331-
iq->sc->newdb = NULL; /* lose ownership, otherwise double free */
6333+
if (alter_sc->nothrevent) { iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ }
63326334

63336335
if (arg->lockless) {
63346336
*pview = timepart_reaquire_view(arg->part_name);
@@ -6407,15 +6409,16 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64076409
struct dbtable *first_shard = get_dbtable_by_name(first_shard_name);
64086410
free(first_shard_name);
64096411

6410-
/* we need to move data */
6411-
sc->force_rebuild = 1;
6412+
const int latched_nothrevent = sc->nothrevent;
6413+
6414+
sc->force_rebuild = 1; /* we need to move data */
6415+
sc->nothrevent = 1; /* we need do_add_table / do_alter_table to run first */
6416+
sc->finalize = 0;
64126417

64136418
if (!first_shard->sqlaliasname) {
64146419
/*
64156420
* create a table with the same name as the partition
64166421
*/
6417-
sc->nothrevent = 1; /* we need do_add_table to run first */
6418-
sc->finalize = 0; /* make sure */
64196422
sc->kind = SC_ADDTABLE;
64206423

64216424
rc = start_schema_change_tran(iq, NULL);
@@ -6430,10 +6433,17 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64306433
iq->sc_pending = iq->sc;
64316434
} else {
64326435
/*
6433-
* use the fast shard as the destination, after first altering it
6436+
* use the first shard as the destination, after first altering it
64346437
*/
6435-
sc->nothrevent = 1; /* we need do_alter_table to run first */
6436-
sc->finalize = 0;
6438+
if (gbl_enable_partitioned_table_merge_resume) {
6439+
sc->partition.type = PARTITION_NONE;
6440+
} else {
6441+
assert(sc->partition.type == PARTITION_MERGE);
6442+
// If partitioned table merge resumes are disabled,
6443+
// then we keep the type equal to PARTITION_MERGE.
6444+
// There is code later on that blocks the resume if
6445+
// the type is PARTITION_MERGE.
6446+
}
64376447

64386448
strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename));
64396449

@@ -6456,6 +6466,7 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64566466
/* at this point we have created the future btree, launch an alter
64576467
* for each of the shards of the partition
64586468
*/
6469+
sc->nothrevent = latched_nothrevent;
64596470
arg.s = sc;
64606471
arg.s->iq = iq;
64616472
arg.part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/

schemachange/sc_alter_table.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -775,13 +775,6 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
775775
return -1;
776776
}
777777

778-
/* set sc_genids, 0 them if we are starting a new schema change, or
779-
* restore them to their previous values if we are resuming */
780-
if (init_sc_genids(newdb, s)) {
781-
sc_client_error(s, "Failed to initialize sc_genids");
782-
return -1;
783-
}
784-
785778
Pthread_rwlock_wrlock(&db->sc_live_lk);
786779
db->sc_from = s->db = db;
787780
db->sc_to = s->newdb = newdb;
@@ -794,6 +787,9 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
794787
assert(db->sc_from == db && s->db == db);
795788
assert(db->sc_to == newdb && s->newdb == newdb);
796789
assert(db->doing_conversion == 1);
790+
if (s->resume && IS_ALTERTABLE(s)) {
791+
decrement_sc_yet_to_resume_counter();
792+
}
797793
MEMORY_SYNC;
798794

799795
if (get_stopsc(__func__, __LINE__)) {
@@ -813,8 +809,10 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
813809
struct schema *tag = find_tag_schema(newdb, ".NEW..ONDISK");
814810
if (!tag) {
815811
struct errstat err = {0};
812+
Pthread_mutex_lock(&csc2_subsystem_mtx);
816813
rc =
817814
populate_db_with_alt_schema(thedb, newdb, newdb->csc2_schema, &err);
815+
Pthread_mutex_unlock(&csc2_subsystem_mtx);
818816
if (rc) {
819817
logmsg(LOGMSG_ERROR, "%s\ncsc2: \"%s\"\n", err.errstr,
820818
newdb->csc2_schema);
@@ -825,9 +823,12 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
825823

826824
add_ongoing_alter(s);
827825

826+
unsigned long long sc_genids[MAXDTASTRIPE];
827+
memset(sc_genids, 0, MAXDTASTRIPE);
828+
828829
/* skip converting records for fastinit and planned schema change
829830
* that doesn't require rebuilding anything. */
830-
rc = convert_all_records(db, newdb, newdb->sc_genids, s);
831+
rc = convert_all_records(db, newdb, sc_genids, s);
831832
if (rc == 1) rc = 0;
832833

833834
remove_ongoing_alter(s);

tests/TODO

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,5 @@ rowlocks_blkseq.test
8383
sc_async_constraints.test
8484
sigstopcluster.test
8585
weighted_standing_queue.test -- failing in rhel8 + podman
86-
sc_resume_partition.test
8786

8887
# vim: set sw=4 ts=4 et:
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
multitable_ddl 1
2+
partitioned_table_merge_resume on

tests/sc_resume_partition.test/runit

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,62 @@ source ${TESTSROOTDIR}/tools/cluster_utils.sh
55

66
set -ex
77

8-
[ -z "${CLUSTER}" ] && { echo "Test requires a cluster"; exit 0; }
9-
108
dbnm=$1
119

1210
restart_cluster() {
11+
local -r restart_delay_secs=10
12+
1313
set +e
14-
for node in ${CLUSTER} ; do
15-
kill_restart_node ${node} &
16-
done
17-
set -e
14+
if [ -z $CLUSTER ];
15+
then
16+
kill_restart_node $1 &
17+
else
18+
for node in ${CLUSTER} ; do
19+
kill_restart_node ${node} &
20+
done
21+
fi
1822

1923
sleep 2
20-
2124
wait_for_cluster
25+
set -e
26+
}
27+
28+
num_outstanding_scs() {
29+
cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select count(*) from comdb2_sc_status where status!='COMMITTED'"
30+
}
31+
32+
wait_for_outstanding_scs() {
33+
local num_scs
34+
35+
num_scs=$(num_outstanding_scs)
36+
while (( num_scs > 0 )); do
37+
sleep 1
38+
done
2239
}
2340

2441
test_partition_merge_resume() {
2542
# Given
43+
local -r num_shards=$1
2644
local starttime
2745
starttime=$(get_timestamp 120)
2846
local master
2947
master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'`
3048

3149
cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('convert_record_sleep 1')"
32-
cdb2sql ${CDB2_OPTIONS} ${dbnm} default "create table t(a int) partitioned by time period 'daily' retention 2 start '${starttime}'"
33-
cdb2sql ${CDB2_OPTIONS} $dbnm default "insert into t values (1)"
50+
cdb2sql ${CDB2_OPTIONS} ${dbnm} default "create table t(a int) partitioned by time period 'daily' retention ${num_shards} start '${starttime}'"
51+
52+
for i in $(seq 0 1 $((num_shards-1)));
53+
do
54+
local shard
55+
shard=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select shardname from comdb2_timepartshards limit 1 offset ${i}")
56+
cdb2sql ${CDB2_OPTIONS} ${dbnm} default "insert into '${shard}' values(1)"
57+
done
3458

3559
# When
36-
COMDB2_CONFIG_MIN_RETRIES=1 cdb2sql ${CDB2_OPTIONS} ${dbnm} default "alter table t partitioned by none" &
37-
waitpid=$!
60+
COMDB2_CONFIG_MIN_RETRIES=0 COMDB2_CONFIG_MAX_RETRIES=0 cdb2sql ${CDB2_OPTIONS} ${dbnm} default "alter table t partitioned by none" &
61+
local -r waitpid=$!
3862
sleep 1
39-
restart_cluster &> /dev/null
63+
restart_cluster $master &> /dev/null
4064

4165
# Then
4266
if wait ${waitpid};
@@ -45,16 +69,31 @@ test_partition_merge_resume() {
4569
return 1
4670
fi
4771

72+
wait_for_outstanding_scs
73+
4874
local timepart
4975
timepart=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select * from comdb2_timepartitions where name='t'")
50-
if [[ -z ${timepart} ]];
76+
if [[ -n ${timepart} ]];
5177
then
52-
echo "FAIL: Could not find expected time partition"
78+
echo "FAIL: Found time partition that should not exist"
79+
return 1
80+
fi
81+
82+
local num_records
83+
num_records=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select count(*) from t")
84+
if (( num_records != num_shards ));
85+
then
86+
echo "FAIL: Unexpected number of records after partition merge"
5387
return 1
5488
fi
5589

5690
# Cleanup
5791
cdb2sql ${CDB2_OPTIONS} ${dbnm} default "drop table t"
5892
}
5993

60-
test_partition_merge_resume
94+
main() {
95+
local -r num_shards=5
96+
test_partition_merge_resume ${num_shards}
97+
}
98+
99+
main

0 commit comments

Comments
 (0)