Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bdb/rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ int gbl_prefault_latency = 0;
int gbl_long_log_truncation_warn_thresh_sec = INT_MAX;
int gbl_long_log_truncation_abort_thresh_sec = INT_MAX;
int gbl_debug_drop_nth_rep_message = 0;
int gbl_broadcast_newmaster = 1;
extern int gbl_debug_stat4dump_loop;

extern struct thdpool *gbl_udppfault_thdpool;
Expand Down Expand Up @@ -4838,6 +4839,10 @@ void berkdb_receive_msg(void *ack_handle, void *usr_ptr, char *from_host,

bdb_state->dbenv->rep_flush(bdb_state->dbenv);

if (gbl_broadcast_newmaster) {
bdb_state->dbenv->rep_newmaster(bdb_state->dbenv);
}

logmsg(LOGMSG_INFO, "USER_TYPE_LSNCMP %d %d %d %d host:%s\n", lsn_cmp.lsn.file,
cur_lsn.file, lsn_cmp.lsn.offset, cur_lsn.offset, from_host);

Expand Down
1 change: 1 addition & 0 deletions berkdb/build/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,7 @@ struct __db_env {
void *rep_handle; /* Replication handle and methods. */
int (*rep_elect) __P((DB_ENV *, int, int, u_int32_t, u_int32_t *, int *, char **));
int (*rep_flush) __P((DB_ENV *));
int (*rep_newmaster) __P((DB_ENV *));
int (*rep_process_message) __P((DB_ENV *, DBT *, DBT *,
char **, DB_LSN *, uint32_t *, uint32_t *, char **, int));
int (*rep_verify_will_recover) __P((DB_ENV *, DBT *, DBT *));
Expand Down
57 changes: 50 additions & 7 deletions berkdb/rep/rep_method.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ static int __rep_elect __P((DB_ENV *, int, int, u_int32_t, u_int32_t *, int *, c
static int __rep_elect_init
__P((DB_ENV *, DB_LSN *, int, int, int *, u_int32_t *));
static int __rep_flush __P((DB_ENV *));
static int __rep_newmaster __P((DB_ENV *));
static int __rep_restore_prepared __P((DB_ENV *));
static int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));
static int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
Expand Down Expand Up @@ -116,6 +117,7 @@ __rep_dbenv_create(dbenv)
{
dbenv->rep_elect = __rep_elect;
dbenv->rep_flush = __rep_flush;
dbenv->rep_newmaster = __rep_newmaster;
dbenv->rep_process_message = __rep_process_message;
dbenv->rep_verify_will_recover = __rep_verify_will_recover;
dbenv->rep_start = __rep_start;
Expand All @@ -129,13 +131,13 @@ __rep_dbenv_create(dbenv)
dbenv->set_rep_request = __rep_set_request;
dbenv->set_rep_transport = __rep_set_rep_transport;
dbenv->set_rep_ignore = __rep_set_ignore;
dbenv->set_log_trigger = __rep_set_log_trigger;
dbenv->set_log_trigger = __rep_set_log_trigger;
dbenv->set_truncate_sc_callback = __rep_set_truncate_sc_callback;
dbenv->set_rep_truncate_callback = __rep_set_rep_truncate_callback;
dbenv->set_rep_recovery_cleanup = __rep_set_rep_recovery_cleanup;
dbenv->rep_set_gen = __rep_set_gen_pp;
dbenv->wrlock_recovery_lock = __rep_wrlock_recovery_lock;
dbenv->wrlock_recovery_blocked = __rep_wrlock_recovery_blocked;
dbenv->wrlock_recovery_blocked = __rep_wrlock_recovery_blocked;
dbenv->lock_recovery_lock = __rep_lock_recovery_lock;
dbenv->unlock_recovery_lock = __rep_unlock_recovery_lock;
dbenv->set_check_standalone = __rep_set_check_standalone;
Expand Down Expand Up @@ -407,8 +409,8 @@ __rep_start(dbenv, dbt, gen, flags)
*/
logmsg(LOGMSG_DEBUG, "%s line %d sending REP_NEWMASTER\n",
__func__, __LINE__);
(void)__rep_send_message(dbenv,
db_eid_broadcast, REP_NEWMASTER, &lsn, NULL, 0, NULL);
(void)__rep_send_message_gen(dbenv,
db_eid_broadcast, REP_NEWMASTER, &lsn, NULL, 0, gen, NULL);
ret = 0;
if (role_chg) {
ret = __txn_reset(dbenv);
Expand Down Expand Up @@ -1640,6 +1642,8 @@ __rep_elect_init(dbenv, lsnp, nsites, priority, beginp, otally)
DB_REP *db_rep;
REP *rep;
int ret;
u_int32_t gen;
int ismaster = 0;

db_rep = dbenv->rep_handle;
rep = db_rep->region;
Expand All @@ -1648,14 +1652,18 @@ __rep_elect_init(dbenv, lsnp, nsites, priority, beginp, otally)

/* We may miscount, as we don't hold the replication mutex here. */
rep->stat.st_elections++;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
ismaster = F_ISSET(rep, REP_F_MASTER);
gen = rep->gen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);

/* If we are already a master; simply broadcast that fact and return. */
if (F_ISSET(rep, REP_F_MASTER)) {
if (ismaster) {
logmsg(LOGMSG_USER,
"%s line %d sending REP_NEWMASTER\n",
__func__, __LINE__);
(void)__rep_send_message(dbenv,
db_eid_broadcast, REP_NEWMASTER, lsnp, NULL, 0, NULL);
(void)__rep_send_message_gen(dbenv,
db_eid_broadcast, REP_NEWMASTER, lsnp, NULL, 0, gen, NULL);
rep->stat.st_elections_won++;
return (DB_REP_NEWMASTER);
}
Expand Down Expand Up @@ -1824,6 +1832,41 @@ err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
return (ret);
}

static int
__rep_newmaster(dbenv)
DB_ENV *dbenv;
{
REP *rep;
DB_LSN lsn;
DB_REP *db_rep;
DB_LOG *dblp;
int ismaster = 0;
u_int32_t gen;

PANIC_CHECK(dbenv);
ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_newmaster", DB_INIT_REP);
db_rep = dbenv->rep_handle;
rep = db_rep->region;

MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
ismaster = F_ISSET(rep, REP_F_MASTER);
gen = rep->gen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (ismaster) {
dblp = (DB_LOG *)dbenv->lg_handle;
R_LOCK(dbenv, &dblp->reginfo);
lsn = ((LOG *)dblp->reginfo.primary)->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);

logmsg(LOGMSG_DEBUG, "%s line %d sending REP_NEWMASTER\n",
__func__, __LINE__);
(void)__rep_send_message_gen(dbenv,
db_eid_broadcast, REP_NEWMASTER, &lsn, NULL, 0, gen, NULL);
}

return 0;
}

int
__rep_get_eid(dbenv, eid_out)
DB_ENV *dbenv;
Expand Down
9 changes: 6 additions & 3 deletions berkdb/rep/rep_record.c
Original file line number Diff line number Diff line change
Expand Up @@ -1264,11 +1264,13 @@ __rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, n
* except requests that are indicative of a new client that needs
* to get in sync.
*/
if ((gbl_is_physical_replicant && rp->gen < rep->log_gen) ||
(rp->gen < gen &&
int check_rectype = (gbl_is_physical_replicant && rp->gen < rep->log_gen) ||
(!gbl_is_physical_replicant && rp->gen < gen);

if (check_rectype &&
rp->rectype != REP_ALIVE_REQ &&
rp->rectype != REP_NEWCLIENT &&
rp->rectype != REP_MASTER_REQ)) {
rp->rectype != REP_MASTER_REQ) {
/*
* We don't hold the rep mutex, and could miscount if we race.
*/
Expand Down Expand Up @@ -8722,6 +8724,7 @@ __rep_verify_match(dbenv, rp, savetime, online)
rep_truncate_flags |= DB_REP_TRUNCATE_ONLINE;
dbenv->rep_truncate_callback(dbenv, &trunclsn, rep_truncate_flags);
}
__rep_set_last_locked(dbenv, &trunclsn);

logmsg(LOGMSG_WARN, "skip-recovery truncate log lsn [%d:%d]\n", trunclsn.file,
trunclsn.offset);
Expand Down
44 changes: 36 additions & 8 deletions berkdb/rep/rep_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,23 @@ static inline int is_logput(int type) {
}

/*
* __rep_send_message --
* __rep_send_message_gen --
* This is a wrapper for sending a message. It takes care of constructing
* the REP_CONTROL structure and calling the user's specified send function.
*
* PUBLIC: int __rep_send_message __P((DB_ENV *, char*,
* PUBLIC: int __rep_send_message_gen __P((DB_ENV *, char*,
* PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t,
* PUBLIC: void *usr_ptr));
* PUBLIC: u_int32_t, void *usr_ptr));
*/
int
__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr)
__rep_send_message_gen(dbenv, eid, rtype, lsnp, dbtp, flags, gen, usr_ptr)
DB_ENV *dbenv;
char *eid;
u_int32_t rtype;
DB_LSN *lsnp;
const DBT *dbtp;
u_int32_t flags;
u_int32_t gen;
void *usr_ptr;
{
DB_REP *db_rep;
Expand Down Expand Up @@ -158,10 +159,7 @@ __rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr)
cntrl.flags = flags;
cntrl.rep_version = DB_REPVERSION;
cntrl.log_version = DB_LOGVERSION;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
cntrl.gen = rep->gen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);

cntrl.gen = gen;

memset(&cdbt, 0, sizeof(cdbt));
cdbt.data = &cntrl;
Expand Down Expand Up @@ -281,6 +279,36 @@ __rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr)
return (ret);
}


/*
* __rep_send_message --
* This is a wrapper for sending a message. It takes care of constructing
* the REP_CONTROL structure and calling the user's specified send function.
*
* PUBLIC: int __rep_send_message __P((DB_ENV *, char*,
* PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t,
* PUBLIC: void *usr_ptr));
*/
int
__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr)
DB_ENV *dbenv;
char *eid;
u_int32_t rtype;
DB_LSN *lsnp;
const DBT *dbtp;
u_int32_t flags;
void *usr_ptr;
{
u_int32_t gen;
DB_REP *db_rep = dbenv->rep_handle;
REP *rep = db_rep->region;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
gen = rep->gen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
return __rep_send_message_gen(dbenv, eid, rtype, lsnp, dbtp, flags, gen, usr_ptr);
}


/*
* __rep_send_log_more --
* This is a wrapper for sending a REP_LOG message.
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ extern int gbl_ufid_dbreg_test;
extern int gbl_debug_add_replication_latency;
extern int gbl_javasp_early_release;
extern int gbl_debug_drop_nth_rep_message;
extern int gbl_broadcast_newmaster;
extern int gbl_fdb_emulate_old;

extern long long sampling_threshold;
Expand Down
4 changes: 4 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,10 @@ REGISTER_TUNABLE("debug_drop_nth_rep_message", "Drop the Nth replication message
"for testing purposes (Default: 0)", TUNABLE_INTEGER,
&gbl_debug_drop_nth_rep_message, EXPERIMENTAL | INTERNAL, NULL,
NULL, NULL, NULL);
REGISTER_TUNABLE("broadcast_newmaster",
"Broadcast new-master periodically. "
"(Default: on)",
TUNABLE_BOOLEAN, &gbl_broadcast_newmaster, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE(
"max_clientstats",
"Max number of client stats stored in comdb2_clientstats. (Default: 10000)",
Expand Down