Skip to content

Commit fc25ea4

Browse files
committed
Fix bugs in partition merge async resume
Signed-off-by: Morgan Douglas <[email protected]>
1 parent f42413d commit fc25ea4

File tree

11 files changed

+222
-71
lines changed

11 files changed

+222
-71
lines changed

cdb2api/cdb2api.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ static void do_init_once(void)
492492
if (min_retries) {
493493
MIN_RETRIES = atoi(min_retries);
494494
}
495+
char *max_retries = getenv("COMDB2_CONFIG_MAX_RETRIES");
496+
if (max_retries) {
497+
MAX_RETRIES = atoi(max_retries);
498+
}
495499
}
496500

497501
/* if sqlstr is a read stmt will return 1 otherwise return 0

db/db_tunables.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
/* Separator for composite tunable components. */
4242
#define COMPOSITE_TUNABLE_SEP '.'
4343

44+
extern int gbl_enable_partitioned_table_merge_resume;
4445
extern int gbl_bulk_import_validation_werror;
4546
extern int gbl_debug_sleep_during_bulk_import;
4647
extern int gbl_waitalive_iterations;

db/db_tunables.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ REGISTER_TUNABLE("abort_during_downgrade_if_scs_dont_stop", "Abort if scs don't
2828
"after starting a downgrade (default OFF)", TUNABLE_BOOLEAN,
2929
&gbl_abort_during_downgrade_if_scs_dont_stop, 0, NULL, NULL,
3030
NULL, NULL);
31+
REGISTER_TUNABLE("partitioned_table_merge_resume", "Toggle the ability to resume "
32+
"partitioned table collapse SCs (default OFF)", TUNABLE_BOOLEAN,
33+
&gbl_enable_partitioned_table_merge_resume, 0, NULL, NULL,
34+
NULL, NULL);
3135
REGISTER_TUNABLE("abort_on_in_use_rqid", NULL, TUNABLE_BOOLEAN,
3236
&gbl_abort_on_clear_inuse_rqid, READONLY | NOARG, NULL, NULL,
3337
NULL, NULL);

db/osqlcomm.c

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ extern int g_osql_ready;
7373
extern int gbl_goslow;
7474
extern int gbl_partial_indexes;
7575

76+
int gbl_enable_partitioned_table_merge_resume = 1;
7677
int gbl_master_sends_query_effects = 1;
7778
int gbl_toblock_random_deadlock_trans;
7879
int gbl_toblock_random_verify_error;
@@ -6312,6 +6313,7 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,
63126313
alter_sc->force_rebuild = 1; /* we are moving rows here */
63136314
/* alter only in parallel mode for live */
63146315
alter_sc->scanmode = SCAN_PARALLEL;
6316+
alter_sc->resume = sc->resume;
63156317
/* link the sc */
63166318
iq->sc = alter_sc;
63176319

@@ -6328,7 +6330,7 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,
63286330
/* link the alter */
63296331
iq->sc->sc_next = iq->sc_pending;
63306332
iq->sc_pending = iq->sc;
6331-
iq->sc->newdb = NULL; /* lose ownership, otherwise double free */
6333+
if (alter_sc->nothrevent) { iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ }
63326334

63336335
if (arg->lockless) {
63346336
*pview = timepart_reaquire_view(arg->part_name);
@@ -6407,15 +6409,16 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64076409
struct dbtable *first_shard = get_dbtable_by_name(first_shard_name);
64086410
free(first_shard_name);
64096411

6410-
/* we need to move data */
6411-
sc->force_rebuild = 1;
6412+
const int latched_nothrevent = sc->nothrevent;
6413+
6414+
sc->force_rebuild = 1; /* we need to move data */
6415+
sc->nothrevent = 1; /* we need do_add_table / do_alter_table to run first */
6416+
sc->finalize = 0;
64126417

64136418
if (!first_shard->sqlaliasname) {
64146419
/*
64156420
* create a table with the same name as the partition
64166421
*/
6417-
sc->nothrevent = 1; /* we need do_add_table to run first */
6418-
sc->finalize = 0; /* make sure */
64196422
sc->kind = SC_ADDTABLE;
64206423

64216424
rc = start_schema_change_tran(iq, NULL);
@@ -6430,10 +6433,17 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64306433
iq->sc_pending = iq->sc;
64316434
} else {
64326435
/*
6433-
* use the fast shard as the destination, after first altering it
6436+
* use the first shard as the destination, after first altering it
64346437
*/
6435-
sc->nothrevent = 1; /* we need do_alter_table to run first */
6436-
sc->finalize = 0;
6438+
if (gbl_enable_partitioned_table_merge_resume) {
6439+
sc->partition.type = PARTITION_NONE;
6440+
} else {
6441+
assert(sc->partition.type == PARTITION_MERGE);
6442+
// If partitioned table merge resumes are disabled,
6443+
// then we keep the type equal to PARTITION_MERGE.
6444+
// There is code later on that blocks the resume if
6445+
// the type is PARTITION_MERGE.
6446+
}
64376447

64386448
strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename));
64396449

@@ -6456,6 +6466,7 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64566466
/* at this point we have created the future btree, launch an alter
64576467
* for each of the shards of the partition
64586468
*/
6469+
sc->nothrevent = latched_nothrevent;
64596470
arg.s = sc;
64606471
arg.s->iq = iq;
64616472
arg.part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/

schemachange/sc_alter_table.c

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -591,13 +591,14 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
591591

592592
/* set sc_genids, 0 them if we are starting a new schema change, or
593593
* restore them to their previous values if we are resuming */
594-
if (init_sc_genids(newdb, s)) {
594+
/*if (init_sc_genids(newdb, s)) {
595595
sc_errf(s, "failed initilizing sc_genids\n");
596596
delete_temp_table(iq, newdb);
597597
change_schemas_recover(s->tablename);
598598
decrement_sc_yet_to_resume_counter();
599599
return -1;
600-
}
600+
}*/
601+
// todo: detect if this is alter or merge and init genids accordingly.
601602

602603
Pthread_rwlock_wrlock(&db->sc_live_lk);
603604
db->sc_from = s->db = db;
@@ -654,7 +655,10 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
654655
changed == SC_CONSTRAINT_CHANGE) {
655656
if (!s->live)
656657
gbl_readonly_sc = 1;
657-
rc = convert_all_records(db, newdb, newdb->sc_genids, s);
658+
659+
unsigned long long sc_genids[MAXDTASTRIPE];
660+
memset(sc_genids, 0, MAXDTASTRIPE);
661+
rc = convert_all_records(db, newdb, sc_genids, s);
658662
if (rc == 1) rc = 0;
659663
} else
660664
rc = 0;
@@ -775,13 +779,6 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
775779
return -1;
776780
}
777781

778-
/* set sc_genids, 0 them if we are starting a new schema change, or
779-
* restore them to their previous values if we are resuming */
780-
if (init_sc_genids(newdb, s)) {
781-
sc_client_error(s, "Failed to initialize sc_genids");
782-
return -1;
783-
}
784-
785782
Pthread_rwlock_wrlock(&db->sc_live_lk);
786783
db->sc_from = s->db = db;
787784
db->sc_to = s->newdb = newdb;
@@ -794,6 +791,9 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
794791
assert(db->sc_from == db && s->db == db);
795792
assert(db->sc_to == newdb && s->newdb == newdb);
796793
assert(db->doing_conversion == 1);
794+
if (s->resume && IS_ALTERTABLE(s)) {
795+
decrement_sc_yet_to_resume_counter();
796+
}
797797
MEMORY_SYNC;
798798

799799
if (get_stopsc(__func__, __LINE__)) {
@@ -813,8 +813,10 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
813813
struct schema *tag = find_tag_schema(newdb, ".NEW..ONDISK");
814814
if (!tag) {
815815
struct errstat err = {0};
816+
Pthread_mutex_lock(&csc2_subsystem_mtx);
816817
rc =
817818
populate_db_with_alt_schema(thedb, newdb, newdb->csc2_schema, &err);
819+
Pthread_mutex_unlock(&csc2_subsystem_mtx);
818820
if (rc) {
819821
logmsg(LOGMSG_ERROR, "%s\ncsc2: \"%s\"\n", err.errstr,
820822
newdb->csc2_schema);
@@ -825,9 +827,12 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
825827

826828
add_ongoing_alter(s);
827829

830+
unsigned long long sc_genids[MAXDTASTRIPE];
831+
memset(sc_genids, 0, MAXDTASTRIPE);
832+
828833
/* skip converting records for fastinit and planned schema change
829834
* that doesn't require rebuilding anything. */
830-
rc = convert_all_records(db, newdb, newdb->sc_genids, s);
835+
rc = convert_all_records(db, newdb, sc_genids, s);
831836
if (rc == 1) rc = 0;
832837

833838
remove_ongoing_alter(s);
@@ -866,7 +871,7 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
866871

867872
for (i = 0; i < gbl_dtastripe; i++) {
868873
sc_errf(s, " > [%s] stripe %2d was at 0x%016llx\n", s->tablename,
869-
i, newdb->sc_genids[i]);
874+
i, sc_genids[i]);
870875
}
871876

872877
while (s->logical_livesc) {
@@ -885,6 +890,14 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
885890
/* handle renaming sqlite_stat1 entries for idx */
886891
check_for_idx_rename(s->newdb, s->db);
887892

893+
// All shards point to the same newdb.
894+
//
895+
// By setting it to NULL here, we ensure that
896+
// all shards except for the first shard (which does not run
897+
// do_merge_table) have newdb set to NULL.
898+
// This avoids double-freeing it.
899+
if (rc == SC_MASTER_DOWNGRADE) { s->newdb = NULL; }
900+
888901
return SC_OK;
889902
}
890903

schemachange/sc_logic.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void comdb2_cheapstack_sym(FILE *f, char *fmt, ...);
4747

4848
extern int gbl_is_physical_replicant;
4949

50-
int gbl_multitable_ddl = 0;
50+
int gbl_multitable_ddl = 1;
5151

5252
/**** Utility functions */
5353

tests/TODO

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,5 @@ rowlocks_blkseq.test
8383
sc_async_constraints.test
8484
sigstopcluster.test
8585
weighted_standing_queue.test -- failing in rhel8 + podman
86-
sc_resume_partition.test
8786

8887
# vim: set sw=4 ts=4 et:
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
multitable_ddl 1
2+
partitioned_table_merge_resume on
Lines changed: 98 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,112 @@
11
#!/usr/bin/env bash
22

3+
set -ex
4+
dbnm=$1
5+
tier="local"
6+
CDB2SQL_EXE="cdb2sql"
7+
DBNAME=$dbnm
8+
39
source ${TESTSROOTDIR}/tools/runit_common.sh
410
source ${TESTSROOTDIR}/tools/cluster_utils.sh
11+
source util.sh
512

6-
set -ex
13+
declare -r CRASH=1
14+
declare -r DOWNGRADE=2
715

8-
[ -z "${CLUSTER}" ] && { echo "Test requires a cluster"; exit 0; }
16+
test_driver() {
17+
# Given
18+
(
19+
local -r num_shards=$1 num_records_per_shard=$2 resume_trigger=$3
920

10-
dbnm=$1
21+
local master
22+
master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm ${tier} 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'`
1123

12-
restart_cluster() {
13-
set +e
14-
for node in ${CLUSTER} ; do
15-
kill_restart_node ${node} &
16-
done
17-
set -e
24+
local -r tbl_name="t"
25+
setup_table ${tbl_name} ${num_shards} ${num_records_per_shard}
26+
27+
trap "cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} 'drop table ${tbl_name}'" EXIT
28+
29+
# cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('convert_record_sleep 1')"
30+
31+
# When
32+
COMDB2_CONFIG_MIN_RETRIES=0 \
33+
COMDB2_CONFIG_MAX_RETRIES=0 \
34+
cdb2sql ${CDB2_OPTIONS} ${dbnm} ${tier} "alter table ${tbl_name} partitioned by none" &
35+
local -r waitpid=$!
36+
37+
sleep 5
38+
39+
if (( resume_trigger == CRASH )); then
40+
restart_cluster ${master}
41+
elif (( resume_trigger == DOWNGRADE )); then
42+
downgrade ${master}
43+
else
44+
echo "FAIL: expected resume_trigger to be one of: KILL, DOWNGRADE"
45+
return 1
46+
fi
47+
48+
# Then
49+
if wait ${waitpid};
50+
then
51+
echo "FAIL: Merge succeeded before cluster bounced. Test is buggy."
52+
return 1
53+
fi
1854

19-
sleep 2
55+
wait_for_outstanding_scs &> /dev/null
2056

21-
wait_for_cluster
57+
local timepart
58+
timepart=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} ${tier} "select * from comdb2_timepartitions where name='${tbl_name}'")
59+
if [[ -n ${timepart} ]];
60+
then
61+
echo "FAIL: Found time partition that should not exist"
62+
return 1
63+
fi
64+
65+
local num_records
66+
num_records=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} ${tier} "select count(*) from ${tbl_name}")
67+
if (( num_records != num_shards*num_records_per_shard ));
68+
then
69+
echo "FAIL: Unexpected number of records after partition merge"
70+
return 1
71+
fi
72+
)
2273
}
2374

24-
test_partition_merge_resume() {
25-
# Given
26-
local starttime
27-
starttime=$(get_timestamp 120)
28-
local master
29-
master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'`
30-
31-
cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('convert_record_sleep 1')"
32-
cdb2sql ${CDB2_OPTIONS} ${dbnm} default "create table t(a int) partitioned by time period 'daily' retention 2 start '${starttime}'"
33-
cdb2sql ${CDB2_OPTIONS} $dbnm default "insert into t values (1)"
34-
35-
# When
36-
COMDB2_CONFIG_MIN_RETRIES=1 cdb2sql ${CDB2_OPTIONS} ${dbnm} default "alter table t partitioned by none" &
37-
waitpid=$!
38-
sleep 1
39-
restart_cluster &> /dev/null
40-
41-
# Then
42-
if wait ${waitpid};
43-
then
44-
echo "FAIL: Merge succeeded before cluster bounced. Test is buggy."
45-
return 1
46-
fi
47-
48-
local timepart
49-
timepart=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select * from comdb2_timepartitions where name='t'")
50-
if [[ -z ${timepart} ]];
51-
then
52-
echo "FAIL: Could not find expected time partition"
53-
return 1
54-
fi
55-
56-
# Cleanup
57-
cdb2sql ${CDB2_OPTIONS} ${dbnm} default "drop table t"
75+
test_resume_tiny_sc_after_master_downgrade() {
76+
local -r num_shards=5 num_records_per_shard=1 resume_trigger=${DOWNGRADE}
77+
test_driver ${num_shards} ${num_records_per_shard} ${resume_trigger}
78+
}
79+
80+
test_resume_tiny_sc_after_master_crash() {
81+
local -r num_shards=5 num_records_per_shard=1 resume_trigger=${CRASH}
82+
test_driver ${num_shards} ${num_records_per_shard} ${resume_trigger}
83+
}
84+
85+
test_resume_big_sc_after_master_downgrade() {
86+
local -r num_shards=5 num_records_per_shard=10000 resume_trigger=${DOWNGRADE}
87+
test_driver ${num_shards} ${num_records_per_shard} ${resume_trigger}
88+
}
89+
90+
test_resume_big_sc_after_master_crash() {
91+
local -r num_shards=5 num_records_per_shard=10000 resume_trigger=${CRASH}
92+
test_driver ${num_shards} ${num_records_per_shard} ${resume_trigger}
93+
}
94+
95+
96+
main() {
97+
# local -r tests=$(compgen -A function | grep -oh "test_\w*")
98+
tests=test_resume_big_sc_after_master_downgrade
99+
100+
for testcase in ${tests};
101+
do
102+
echo "Running ${testcase} ----------"
103+
if ${testcase}; then
104+
echo "Passed ${testcase} ----------"
105+
else
106+
echo "Failed ${testcase} ----------"
107+
return 1
108+
fi
109+
done
58110
}
59111

60-
test_partition_merge_resume
112+
main

0 commit comments

Comments
 (0)