diff --git a/bdb/rep.c b/bdb/rep.c index 690fb75ec9..4167ab3320 100644 --- a/bdb/rep.c +++ b/bdb/rep.c @@ -68,6 +68,7 @@ int gbl_prefault_latency = 0; int gbl_long_log_truncation_warn_thresh_sec = INT_MAX; int gbl_long_log_truncation_abort_thresh_sec = INT_MAX; int gbl_debug_drop_nth_rep_message = 0; +int gbl_broadcast_newmaster = 1; extern int gbl_debug_stat4dump_loop; extern struct thdpool *gbl_udppfault_thdpool; @@ -4838,6 +4839,10 @@ void berkdb_receive_msg(void *ack_handle, void *usr_ptr, char *from_host, bdb_state->dbenv->rep_flush(bdb_state->dbenv); + if (gbl_broadcast_newmaster) { + bdb_state->dbenv->rep_newmaster(bdb_state->dbenv); + } + logmsg(LOGMSG_INFO, "USER_TYPE_LSNCMP %d %d %d %d host:%s\n", lsn_cmp.lsn.file, cur_lsn.file, lsn_cmp.lsn.offset, cur_lsn.offset, from_host); diff --git a/berkdb/build/db.h b/berkdb/build/db.h index 0c56854439..9890588612 100644 --- a/berkdb/build/db.h +++ b/berkdb/build/db.h @@ -2592,6 +2592,7 @@ struct __db_env { void *rep_handle; /* Replication handle and methods. */ int (*rep_elect) __P((DB_ENV *, int, int, u_int32_t, u_int32_t *, int *, char **)); int (*rep_flush) __P((DB_ENV *)); + int (*rep_newmaster) __P((DB_ENV *)); int (*rep_process_message) __P((DB_ENV *, DBT *, DBT *, char **, DB_LSN *, uint32_t *, uint32_t *, char **, int)); int (*rep_verify_will_recover) __P((DB_ENV *, DBT *, DBT *)); diff --git a/berkdb/rep/rep_method.c b/berkdb/rep/rep_method.c index ea8c9af119..96abf40a7f 100644 --- a/berkdb/rep/rep_method.c +++ b/berkdb/rep/rep_method.c @@ -51,6 +51,7 @@ static int __rep_elect __P((DB_ENV *, int, int, u_int32_t, u_int32_t *, int *, c static int __rep_elect_init __P((DB_ENV *, DB_LSN *, int, int, int *, u_int32_t *)); static int __rep_flush __P((DB_ENV *)); +static int __rep_newmaster __P((DB_ENV *)); static int __rep_restore_prepared __P((DB_ENV *)); static int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *)); static int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t)); @@ -116,6 +117,7 @@ __rep_dbenv_create(dbenv) { dbenv->rep_elect = __rep_elect; dbenv->rep_flush = __rep_flush; + dbenv->rep_newmaster = __rep_newmaster; dbenv->rep_process_message = __rep_process_message; dbenv->rep_verify_will_recover = __rep_verify_will_recover; dbenv->rep_start = __rep_start; @@ -129,13 +131,13 @@ __rep_dbenv_create(dbenv) dbenv->set_rep_request = __rep_set_request; dbenv->set_rep_transport = __rep_set_rep_transport; dbenv->set_rep_ignore = __rep_set_ignore; - dbenv->set_log_trigger = __rep_set_log_trigger; + dbenv->set_log_trigger = __rep_set_log_trigger; dbenv->set_truncate_sc_callback = __rep_set_truncate_sc_callback; dbenv->set_rep_truncate_callback = __rep_set_rep_truncate_callback; dbenv->set_rep_recovery_cleanup = __rep_set_rep_recovery_cleanup; dbenv->rep_set_gen = __rep_set_gen_pp; dbenv->wrlock_recovery_lock = __rep_wrlock_recovery_lock; - dbenv->wrlock_recovery_blocked = __rep_wrlock_recovery_blocked; + dbenv->wrlock_recovery_blocked = __rep_wrlock_recovery_blocked; dbenv->lock_recovery_lock = __rep_lock_recovery_lock; dbenv->unlock_recovery_lock = __rep_unlock_recovery_lock; dbenv->set_check_standalone = __rep_set_check_standalone; @@ -407,8 +409,8 @@ __rep_start(dbenv, dbt, gen, flags) */ logmsg(LOGMSG_DEBUG, "%s line %d sending REP_NEWMASTER\n", __func__, __LINE__); - (void)__rep_send_message(dbenv, - db_eid_broadcast, REP_NEWMASTER, &lsn, NULL, 0, NULL); + (void)__rep_send_message_gen(dbenv, + db_eid_broadcast, REP_NEWMASTER, &lsn, NULL, 0, gen, NULL); ret = 0; if (role_chg) { ret = __txn_reset(dbenv); @@ -1640,6 +1642,8 @@ __rep_elect_init(dbenv, lsnp, nsites, priority, beginp, otally) DB_REP *db_rep; REP *rep; int ret; + u_int32_t gen; + int ismaster = 0; db_rep = dbenv->rep_handle; rep = db_rep->region; @@ -1648,14 +1652,18 @@ __rep_elect_init(dbenv, lsnp, nsites, priority, beginp, otally) /* We may miscount, as we don't hold the replication mutex here. */ rep->stat.st_elections++; + MUTEX_LOCK(dbenv, db_rep->rep_mutexp); + ismaster = F_ISSET(rep, REP_F_MASTER); + gen = rep->gen; + MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); /* If we are already a master; simply broadcast that fact and return. */ - if (F_ISSET(rep, REP_F_MASTER)) { + if (ismaster) { logmsg(LOGMSG_USER, "%s line %d sending REP_NEWMASTER\n", __func__, __LINE__); - (void)__rep_send_message(dbenv, - db_eid_broadcast, REP_NEWMASTER, lsnp, NULL, 0, NULL); + (void)__rep_send_message_gen(dbenv, + db_eid_broadcast, REP_NEWMASTER, lsnp, NULL, 0, gen, NULL); rep->stat.st_elections_won++; return (DB_REP_NEWMASTER); } @@ -1824,6 +1832,41 @@ err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0) return (ret); } +static int +__rep_newmaster(dbenv) + DB_ENV *dbenv; +{ + REP *rep; + DB_LSN lsn; + DB_REP *db_rep; + DB_LOG *dblp; + int ismaster = 0; + u_int32_t gen; + + PANIC_CHECK(dbenv); + ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_newmaster", DB_INIT_REP); + db_rep = dbenv->rep_handle; + rep = db_rep->region; + + MUTEX_LOCK(dbenv, db_rep->rep_mutexp); + ismaster = F_ISSET(rep, REP_F_MASTER); + gen = rep->gen; + MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); + if (ismaster) { + dblp = (DB_LOG *)dbenv->lg_handle; + R_LOCK(dbenv, &dblp->reginfo); + lsn = ((LOG *)dblp->reginfo.primary)->lsn; + R_UNLOCK(dbenv, &dblp->reginfo); + + logmsg(LOGMSG_DEBUG, "%s line %d sending REP_NEWMASTER\n", + __func__, __LINE__); + (void)__rep_send_message_gen(dbenv, + db_eid_broadcast, REP_NEWMASTER, &lsn, NULL, 0, gen, NULL); + } + + return 0; +} + int __rep_get_eid(dbenv, eid_out) DB_ENV *dbenv; diff --git a/berkdb/rep/rep_record.c b/berkdb/rep/rep_record.c index 36f11669a4..c33be2c9ce 100644 --- a/berkdb/rep/rep_record.c +++ b/berkdb/rep/rep_record.c @@ -1264,11 +1264,13 @@ __rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, n * except requests that are indicative of a new client that needs * to get in sync. */ - if ((gbl_is_physical_replicant && rp->gen < rep->log_gen) || - (rp->gen < gen && + int check_rectype = (gbl_is_physical_replicant && rp->gen < rep->log_gen) || + (!gbl_is_physical_replicant && rp->gen < gen); + + if (check_rectype && rp->rectype != REP_ALIVE_REQ && rp->rectype != REP_NEWCLIENT && - rp->rectype != REP_MASTER_REQ)) { + rp->rectype != REP_MASTER_REQ) { /* * We don't hold the rep mutex, and could miscount if we race. */ @@ -8722,6 +8724,7 @@ __rep_verify_match(dbenv, rp, savetime, online) rep_truncate_flags |= DB_REP_TRUNCATE_ONLINE; dbenv->rep_truncate_callback(dbenv, &trunclsn, rep_truncate_flags); } + __rep_set_last_locked(dbenv, &trunclsn); logmsg(LOGMSG_WARN, "skip-recovery truncate log lsn [%d:%d]\n", trunclsn.file, trunclsn.offset); diff --git a/berkdb/rep/rep_util.c b/berkdb/rep/rep_util.c index f6efb632fa..3f195fa7d1 100644 --- a/berkdb/rep/rep_util.c +++ b/berkdb/rep/rep_util.c @@ -102,22 +102,23 @@ static inline int is_logput(int type) { } /* - * __rep_send_message -- + * __rep_send_message_gen -- * This is a wrapper for sending a message. It takes care of constructing * the REP_CONTROL structure and calling the user's specified send function. * - * PUBLIC: int __rep_send_message __P((DB_ENV *, char*, + * PUBLIC: int __rep_send_message_gen __P((DB_ENV *, char*, * PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t, - * PUBLIC: void *usr_ptr)); + * PUBLIC: u_int32_t, void *usr_ptr)); */ int -__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr) +__rep_send_message_gen(dbenv, eid, rtype, lsnp, dbtp, flags, gen, usr_ptr) DB_ENV *dbenv; char *eid; u_int32_t rtype; DB_LSN *lsnp; const DBT *dbtp; u_int32_t flags; + u_int32_t gen; void *usr_ptr; { DB_REP *db_rep; @@ -158,10 +159,7 @@ __rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr) cntrl.flags = flags; cntrl.rep_version = DB_REPVERSION; cntrl.log_version = DB_LOGVERSION; - MUTEX_LOCK(dbenv, db_rep->rep_mutexp); - cntrl.gen = rep->gen; - MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); - + cntrl.gen = gen; memset(&cdbt, 0, sizeof(cdbt)); cdbt.data = &cntrl; @@ -281,6 +279,36 @@ __rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr) return (ret); } + +/* + * __rep_send_message -- + * This is a wrapper for sending a message. It takes care of constructing + * the REP_CONTROL structure and calling the user's specified send function. + * + * PUBLIC: int __rep_send_message __P((DB_ENV *, char*, + * PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t, + * PUBLIC: void *usr_ptr)); + */ +int +__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags, usr_ptr) + DB_ENV *dbenv; + char *eid; + u_int32_t rtype; + DB_LSN *lsnp; + const DBT *dbtp; + u_int32_t flags; + void *usr_ptr; +{ + u_int32_t gen; + DB_REP *db_rep = dbenv->rep_handle; + REP *rep = db_rep->region; + MUTEX_LOCK(dbenv, db_rep->rep_mutexp); + gen = rep->gen; + MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); + return __rep_send_message_gen(dbenv, eid, rtype, lsnp, dbtp, flags, gen, usr_ptr); +} + + /* * __rep_send_log_more -- * This is a wrapper for sending a REP_LOG message. diff --git a/db/db_tunables.c b/db/db_tunables.c index cb1da4b1f2..4b8d83a30b 100644 --- a/db/db_tunables.c +++ b/db/db_tunables.c @@ -345,6 +345,7 @@ extern int gbl_ufid_dbreg_test; extern int gbl_debug_add_replication_latency; extern int gbl_javasp_early_release; extern int gbl_debug_drop_nth_rep_message; +extern int gbl_broadcast_newmaster; extern int gbl_fdb_emulate_old; extern long long sampling_threshold; diff --git a/db/db_tunables.h b/db/db_tunables.h index 1fb4ad2237..15082a7fb1 100644 --- a/db/db_tunables.h +++ b/db/db_tunables.h @@ -1498,6 +1498,10 @@ REGISTER_TUNABLE("debug_drop_nth_rep_message", "Drop the Nth replication message "for testing purposes (Default: 0)", TUNABLE_INTEGER, &gbl_debug_drop_nth_rep_message, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("broadcast_newmaster", + "Broadcast new-master periodically. " + "(Default: on)", + TUNABLE_BOOLEAN, &gbl_broadcast_newmaster, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); REGISTER_TUNABLE( "max_clientstats", "Max number of client stats stored in comdb2_clientstats. (Default: 10000)",