diff --git a/server/raft.go b/server/raft.go index 0191827e8e8..784c7ef093e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -514,7 +514,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel truncateAndErr(index - 1) break } - n.processAppendEntry(ae, nil) + n.recoverAppendEntry(ae) // Check how much we have queued up so far to determine if we should pause. for _, e := range ae.entries { qsz += len(e.Data) @@ -2156,7 +2156,7 @@ func (n *raft) processAppendEntries() { aes := n.entry.pop() if canProcess { for _, ae := range aes { - n.processAppendEntry(ae, ae.sub) + n.processAppendEntry(ae) } } n.entry.recycle(&aes) @@ -3444,11 +3444,55 @@ func (n *raft) updateLeader(newLeader string) { } } -// processAppendEntry will process an appendEntry. This is called either -// during recovery or from processAppendEntries when there are new entries -// to be committed. -func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { +// recoverAppendEntry will process an appendEntry for recovery +func (n *raft) recoverAppendEntry(ae *appendEntry) { n.Lock() + defer n.Unlock() + + n.pterm = ae.term + n.pindex = ae.pindex + 1 + + n.processEntriesAndCommit(ae, false) +} + +// processAppendEntry will process an appendEntry. This is called from +// processAppendEntries when there are new entries to be committed. +func (n *raft) processAppendEntry(ae *appendEntry) { + var ar *appendEntryResponse + + // Make a copy of the reply subject, as ae may return + // to its pool as part of processAppendEntryLocked + subject := ae.reply + + n.Lock() + isNew := ae.sub != nil && ae.sub == n.aesub + isCatchup := !isNew && n.catchup != nil && ae.sub == n.catchup.sub + + // Process the appendEntry if it is a new proposal, or if we are + // catching up. Ignore messages coming from old catchup subs when + // ae.lterm <= 0. Older versions of the server do not send the leader + // term when catching up. Old catchups from newer subs can be rejected + // later by checking that the appendEntry is on the correct term. + if isNew || isCatchup || ae.lterm > 0 { + ar = n.processAppendEntryLocked(ae, isNew) + } else { + n.debug("processAppendEntry ignore entry from old subscription") + } + n.Unlock() + + if ar != nil { + var scratch [appendEntryResponseLen]byte + n.sendRPC(subject, ar.reply, ar.encode(scratch[:])) + arPool.Put(ar) + } +} + +// Process the given appendEntry. Parameter isNew indicates whether the +// appendEntry is comes from a new proposal or from a catchup request. +// Optionally returns a appendEntryResponse. The caller is responsible +// for sending out the response and return it to its pool. +// Lock should be held. +func (n *raft) processAppendEntryLocked(ae *appendEntry, isNew bool) *appendEntryResponse { // Don't reset here if we have been asked to assume leader position. if !n.lxfer { n.resetElectionTimeout() @@ -3456,16 +3500,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Just return if closed or we had previous write error. if n.State() == Closed || n.werr != nil { - n.Unlock() - return + return nil } - // Scratch buffer for responses. - var scratch [appendEntryResponseLen]byte - arbuf := scratch[:] - - // Grab term from append entry. But if leader explicitly defined its term, use that instead. - // This is required during catchup if the leader catches us up on older items from previous terms. + // Grab term from append entry. But if leader explicitly defined its term, + // use that instead. This is required during catchup if the leader catches + // us up on older items from previous terms. // While still allowing us to confirm they're matching our highest known term. lterm := ae.term if ae.lterm != 0 { @@ -3496,12 +3536,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.stepdownLocked(ae.leader) } else { // Let them know we are the leader. - ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) n.debug("AppendEntry ignoring old term from another leader") - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) - arPool.Put(ar) - n.Unlock() - return + return newAppendEntryResponse(n.term, n.pindex, n.id, false) } } @@ -3524,12 +3560,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // Catching up state. - catchingUp := n.catchup != nil - // Is this a new entry? New entries will be delivered on the append entry - // sub, rather than a catch-up sub. - isNew := sub != nil && sub == n.aesub - // Track leader directly if isNew && ae.leader != noLeader { if ps := n.peers[ae.leader]; ps != nil { @@ -3539,15 +3569,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // If we are/were catching up ignore old catchup subs, but only if catching up from an older server - // that doesn't send the leader term when catching up. We can reject old catchups from newer subs - // later, just by checking the append entry is on the correct term. - if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) { - n.Unlock() - n.debug("AppendEntry ignoring old entry from previous catchup") - return - } - // If this term is greater than ours. if lterm > n.term { n.term = lterm @@ -3559,46 +3580,33 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader) n.stepdownLocked(ae.leader) } - } else if lterm < n.term && sub != nil && (isNew || ae.lterm != 0) { + } else if lterm < n.term && (isNew || ae.lterm != 0) { // Anything that's below our expected highest term needs to be rejected. - // Unless we're replaying (sub=nil), in which case we'll always continue. // For backward-compatibility we shouldn't reject if we're being caught up by an old server. if !isNew { n.debug("AppendEntry ignoring old entry from previous catchup") - n.Unlock() - return + return nil } n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, lterm) - ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) - n.Unlock() - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) - arPool.Put(ar) - return + return newAppendEntryResponse(n.term, n.pindex, n.id, false) } // Check state if we are catching up. - if catchingUp { - if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { + if n.catchup != nil { + if n.pterm >= n.catchup.cterm && n.pindex >= n.catchup.cindex { // If we are here we are good, so if we have a catchup pending we can cancel. n.cancelCatchup() - // Reset our notion of catching up. - catchingUp = false } else if isNew { - var ar *appendEntryResponse - var inbox string // Check to see if we are stalled. If so recreate our catchup state and resend response. if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") - inbox = n.createCatchup(ae) - ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false) - } - n.Unlock() - if ar != nil { - n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) - arPool.Put(ar) + inbox := n.createCatchup(ae) + ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + ar.reply = inbox + return ar } // Ignore new while catching up or replaying. - return + return nil } } @@ -3668,21 +3676,19 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // For example, if we got partial catchup, and then the "real-time" messages came in very delayed. // If we reported "success" on those "real-time" messages, we'd wrongfully be providing // quorum while not having an up-to-date log. - n.Unlock() - return + return nil } // Check if we are catching up. If we are here we know the leader did not have all of the entries // so make sure this is a snapshot entry. If it is not start the catchup process again since it // means we may have missed additional messages. - if catchingUp { + if n.catchup != nil { // This means we already entered into a catchup state but what the leader sent us did not match what we expected. // Snapshots and peerstate will always be together when a leader is catching us up in this fashion. if len(ae.entries) != 2 || ae.entries[0].Type != EntrySnapshot || ae.entries[1].Type != EntryPeerState { n.warn("Expected first catchup entry to be a snapshot and peerstate, will retry") n.cancelCatchup() - n.Unlock() - return + return nil } if ps, err := decodePeerState(ae.entries[1].Data); err == nil { @@ -3692,8 +3698,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } else { n.warn("Could not parse snapshot peerstate correctly") n.cancelCatchup() - n.Unlock() - return + return nil } // Inherit state from appendEntry with the leader's snapshot. @@ -3710,48 +3715,54 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Install the leader's snapshot as our own. if err := n.installSnapshot(snap); err != nil { n.setWriteErrLocked(err) - n.Unlock() - return + return nil } n.resetInitializing() // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) - n.Unlock() - return + return nil } // Setup our state for catching up. n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) inbox := n.createCatchup(ae) ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) - n.Unlock() - n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) - arPool.Put(ar) - return + ar.reply = inbox + return ar } CONTINUE: // Save to our WAL if we have entries. if ae.shouldStore() { - // Only store if an original which will have sub != nil - if sub != nil { - if err := n.storeToWAL(ae); err != nil { - if err != ErrStoreClosed { - n.warn("Error storing entry to WAL: %v", err) - } - n.Unlock() - return + if err := n.storeToWAL(ae); err != nil { + if err != ErrStoreClosed { + n.warn("Error storing entry to WAL: %v", err) } - n.cachePendingEntry(ae) - n.resetInitializing() - } else { - // This is a replay on startup so just take the appendEntry version. - n.pterm = ae.term - n.pindex = ae.pindex + 1 + return nil } + n.cachePendingEntry(ae) + n.resetInitializing() } + // ae should no longer be used after this call as + // processEntriesAndCommit may return the appendEntry back to its pool + n.processEntriesAndCommit(ae, isNew) + + // Only ever respond to new entries. + // Never respond to catchup messages, because providing quorum based on this is unsafe. + // The only way for the leader to receive "success" MUST be through this path. + if isNew { + // Success. Send our response. + return newAppendEntryResponse(n.pterm, n.pindex, n.id, true) + } + + return nil +} + +// Process all entries in appendEntry and try to commit. +// Lock should be held. +func (n *raft) processEntriesAndCommit(ae *appendEntry, isNew bool) { // Check to see if we have any related entries to process here. for _, e := range ae.entries { switch e.Type { @@ -3789,9 +3800,9 @@ CONTINUE: } } - // Make a copy of these values, as the AppendEntry might be cached and returned to the pool in applyCommit. + // Copy ae.commit as the AppendEntry might be cached and returned to the + // pool in applyCommit. aeCommit := ae.commit - aeReply := ae.reply // Apply anything we need here. if aeCommit > n.commit { @@ -3806,21 +3817,6 @@ CONTINUE: } } } - - // Only ever respond to new entries. - // Never respond to catchup messages, because providing quorum based on this is unsafe. - // The only way for the leader to receive "success" MUST be through this path. - var ar *appendEntryResponse - if sub != nil && isNew { - ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true) - } - n.Unlock() - - // Success. Send our response. - if ar != nil { - n.sendRPC(aeReply, _EMPTY_, ar.encode(arbuf)) - arPool.Put(ar) - } } // resetInitializing resets the notion of initializing. diff --git a/server/raft_test.go b/server/raft_test.go index 4d97367dd29..017f2a6cab4 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -822,7 +822,8 @@ func TestNRGCandidateDoesntRevertTermAfterOldAE(t *testing.T) { // the term. Give it to the follower in candidate state. ae := newAppendEntry(leader.id, 6, leader.commit, leader.pterm, leader.pindex, nil) follower.switchToCandidate() - follower.processAppendEntry(ae, nil) + ae.sub = follower.aesub + follower.processAppendEntry(ae) // The candidate must not have reverted back to term 6. require_NotEqual(t, follower.term, 6) @@ -1174,7 +1175,8 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { for _, f := range rg { if f.node().ID() != l.ID() { fn := f.node().(*raft) - fn.processAppendEntry(ae, fn.aesub) + ae.sub = fn.aesub + fn.processAppendEntry(ae) require_Equal(t, fn.term, 20) // Follower's term gets upped as expected. } } @@ -1184,13 +1186,14 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { for _, f := range rg { if f.node().ID() != l.ID() { fn := f.node().(*raft) - fn.processAppendEntry(ae, fn.aesub) + ae.sub = fn.aesub + fn.processAppendEntry(ae) require_Equal(t, fn.term, 20) // Follower should reject and the term stays the same. fn.Lock() fn.resetWAL() fn.Unlock() - fn.processAppendEntry(ae, fn.aesub) + fn.processAppendEntry(ae) require_Equal(t, fn.term, 20) // Follower should reject again, even after reset, term stays the same. } } @@ -1257,25 +1260,29 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { aeHeartbeat3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) // Initial case is simple, just store the entry. - n.processAppendEntry(aeInitial, n.aesub) + aeInitial.sub = n.aesub + n.processAppendEntry(aeInitial) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 1) // We get one entry that has quorum (but we don't know that yet), so it stays uncommitted for a bit. - n.processAppendEntry(aeUncommitted, n.aesub) + aeUncommitted.sub = n.aesub + n.processAppendEntry(aeUncommitted) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // We get one entry that has NO quorum (but we don't know that yet). - n.processAppendEntry(aeNoQuorum, n.aesub) + aeNoQuorum.sub = n.aesub + n.processAppendEntry(aeNoQuorum) require_Equal(t, n.wal.State().Msgs, 3) entry, err = n.loadEntry(3) require_NoError(t, err) @@ -1283,12 +1290,14 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { // We've just had a leader election, and we missed one message from the previous leader. // We should truncate the last message. - n.processAppendEntry(aeCatchupTrigger, n.aesub) + aeCatchupTrigger.sub = n.aesub + n.processAppendEntry(aeCatchupTrigger) require_Equal(t, n.wal.State().Msgs, 2) require_True(t, n.catchup == nil) // We get a heartbeat that prompts us to catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_Equal(t, n.wal.State().Msgs, 2) require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) @@ -1296,12 +1305,14 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { require_Equal(t, n.catchup.pindex, 2) // n.pindex // We now notice the leader indicated a different entry at the (no quorum) index, should save that. - n.processAppendEntry(aeMissed, n.catchup.sub) + aeMissed.sub = n.catchup.sub + n.processAppendEntry(aeMissed) require_Equal(t, n.wal.State().Msgs, 3) require_True(t, n.catchup != nil) // We now get the entry that initially triggered us to catchup, it should be added. - n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + aeCatchupTrigger.sub = n.catchup.sub + n.processAppendEntry(aeCatchupTrigger) require_Equal(t, n.wal.State().Msgs, 4) require_True(t, n.catchup != nil) entry, err = n.loadEntry(4) @@ -1309,7 +1320,8 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { require_Equal(t, entry.leader, nats1) // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). - n.processAppendEntry(aeHeartbeat3, n.aesub) + aeHeartbeat3.sub = n.aesub + n.processAppendEntry(aeHeartbeat3) require_Equal(t, n.commit, 4) require_True(t, n.catchup == nil) } @@ -1339,25 +1351,29 @@ func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { aeHeartbeat3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) // Initial case is simple, just store the entry. - n.processAppendEntry(aeInitial, n.aesub) + aeInitial.sub = n.aesub + n.processAppendEntry(aeInitial) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 1) // We get one entry that has NO quorum (but we don't know that yet). - n.processAppendEntry(aeNoQuorum1, n.aesub) + aeNoQuorum1.sub = n.aesub + n.processAppendEntry(aeNoQuorum1) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // We get another entry that has NO quorum (but we don't know that yet). - n.processAppendEntry(aeNoQuorum2, n.aesub) + aeNoQuorum2.sub = n.aesub + n.processAppendEntry(aeNoQuorum2) require_Equal(t, n.wal.State().Msgs, 3) entry, err = n.loadEntry(3) require_NoError(t, err) @@ -1365,12 +1381,14 @@ func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { // We've just had a leader election, and we missed messages from the previous leader. // We should truncate the last message. - n.processAppendEntry(aeCatchupTrigger, n.aesub) + aeCatchupTrigger.sub = n.aesub + n.processAppendEntry(aeCatchupTrigger) require_Equal(t, n.wal.State().Msgs, 2) require_True(t, n.catchup == nil) // We get a heartbeat that prompts us to catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_Equal(t, n.wal.State().Msgs, 2) require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) @@ -1378,12 +1396,14 @@ func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { require_Equal(t, n.catchup.pindex, 2) // n.pindex // We now notice the leader indicated a different entry at the (no quorum) index. We should truncate again. - n.processAppendEntry(aeMissed2, n.catchup.sub) + aeMissed2.sub = n.catchup.sub + n.processAppendEntry(aeMissed2) require_Equal(t, n.wal.State().Msgs, 1) require_True(t, n.catchup == nil) // We get a heartbeat that prompts us to catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_Equal(t, n.wal.State().Msgs, 1) require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) @@ -1391,16 +1411,19 @@ func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { require_Equal(t, n.catchup.pindex, 1) // n.pindex // We now get caught up with the missed messages. - n.processAppendEntry(aeMissed1, n.catchup.sub) + aeMissed1.sub = n.catchup.sub + n.processAppendEntry(aeMissed1) require_Equal(t, n.wal.State().Msgs, 2) require_True(t, n.catchup != nil) - n.processAppendEntry(aeMissed2, n.catchup.sub) + aeMissed2.sub = n.catchup.sub + n.processAppendEntry(aeMissed2) require_Equal(t, n.wal.State().Msgs, 3) require_True(t, n.catchup != nil) // We now get the entry that initially triggered us to catchup, it should be added. - n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + aeCatchupTrigger.sub = n.catchup.sub + n.processAppendEntry(aeCatchupTrigger) require_Equal(t, n.wal.State().Msgs, 4) require_True(t, n.catchup != nil) entry, err = n.loadEntry(4) @@ -1408,7 +1431,8 @@ func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { require_Equal(t, entry.leader, nats1) // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). - n.processAppendEntry(aeHeartbeat3, n.aesub) + aeHeartbeat3.sub = n.aesub + n.processAppendEntry(aeHeartbeat3) require_Equal(t, n.commit, 4) require_True(t, n.catchup == nil) } @@ -1431,36 +1455,42 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Initial case is simple, just store the entry. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Deliver a message. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat1, n.aesub) + aeHeartbeat1.sub = n.aesub + n.processAppendEntry(aeHeartbeat1) require_Equal(t, n.commit, 2) // Deliver another message. - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Equal(t, n.wal.State().Msgs, 3) entry, err = n.loadEntry(3) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.commit, 2) // Heartbeat, makes sure we commit. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_Equal(t, n.commit, 3) } @@ -1479,34 +1509,39 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) // Heartbeat, triggers catchup. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex // First catchup message has the incorrect pterm, stop catchup and re-trigger later with the correct pterm. - n.processAppendEntry(aeMsg, n.catchup.sub) + aeMsg.sub = n.catchup.sub + n.processAppendEntry(aeMsg) require_True(t, n.catchup == nil) require_Equal(t, n.pterm, 1) require_Equal(t, n.pindex, 0) // Heartbeat, triggers catchup. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 1) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex // Now we get the message again and can continue to store it. - n.processAppendEntry(aeMsg, n.catchup.sub) + aeMsg.sub = n.catchup.sub + n.processAppendEntry(aeMsg) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Now heartbeat is able to commit the entry. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 1) } @@ -1525,14 +1560,16 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) // Initial case is simple, just store the entry. - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) require_NoError(t, err) require_Equal(t, entry.leader, nats0) // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 1) require_Equal(t, n.pterm, 1) @@ -1579,7 +1616,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil}) // Simply receive first message. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.commit, 0) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) @@ -1587,7 +1625,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { require_Equal(t, entry.leader, nats1) // Receive second message, which commits the first message. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.commit, 1) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) @@ -1631,7 +1670,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { // Receive heartbeat from new leader, should not lose commits. n.stepdown(noLeader) - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_Equal(t, n.wal.State().Msgs, 0) require_Equal(t, n.commit, 2) require_Equal(t, n.applied, 2) @@ -1653,19 +1693,22 @@ func TestNRGIgnoreDoubleSnapshot(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Simply receive first message. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) require_Equal(t, n.commit, 0) // Heartbeat moves commit up. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 1) // Manually call back down to applied. n.Applied(1) // Second message just for upping the pterm. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) require_Equal(t, n.commit, 1) require_Equal(t, n.pterm, 2) @@ -1705,13 +1748,15 @@ func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) { aeCatchupSnapshot := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) // Switch follower into catchup. - n.processAppendEntry(aeTriggerCatchup, n.aesub) + aeTriggerCatchup.sub = n.aesub + n.processAppendEntry(aeTriggerCatchup) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex // Follower receives a snapshot, marking a snapshot as inflight as the apply queue is async. - n.processAppendEntry(aeCatchupSnapshot, n.catchup.sub) + aeCatchupSnapshot.sub = n.catchup.sub + n.processAppendEntry(aeCatchupSnapshot) require_Equal(t, n.pindex, 1) require_Equal(t, n.commit, 1) @@ -1745,17 +1790,20 @@ func TestNRGDontSwitchToCandidateWithMultipleInflightSnapshots(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) // Simulate snapshots being sent to us. - n.processAppendEntry(aeSnapshot1, n.aesub) + aeSnapshot1.sub = n.aesub + n.processAppendEntry(aeSnapshot1) require_Equal(t, n.pindex, 1) require_Equal(t, n.commit, 0) require_Equal(t, n.applied, 0) - n.processAppendEntry(aeSnapshot2, n.aesub) + aeSnapshot2.sub = n.aesub + n.processAppendEntry(aeSnapshot2) require_Equal(t, n.pindex, 2) require_Equal(t, n.commit, 1) require_Equal(t, n.applied, 0) - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.pindex, 2) require_Equal(t, n.commit, 2) require_Equal(t, n.applied, 0) @@ -1821,14 +1869,15 @@ func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T) aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) // Truncate to simulate we missed one message and need to catchup. - n.processAppendEntry(aeCatchupTrigger, n.aesub) + aeCatchupTrigger.sub = n.aesub + n.processAppendEntry(aeCatchupTrigger) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex // Process first message as part of the catchup. - catchupSub := n.catchup.sub - n.processAppendEntry(aeMsg1, catchupSub) + aeMsg1.sub = n.catchup.sub + n.processAppendEntry(aeMsg1) require_True(t, n.catchup != nil) // Receiving a vote request should cancel our catchup. @@ -1873,7 +1922,8 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) // Simply receive first message. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.commit, 0) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) @@ -1881,7 +1931,8 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { require_Equal(t, entry.leader, nats0) // Receive second message, which commits the first message. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.commit, 1) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) @@ -1890,7 +1941,8 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { // We receive an entry from another leader, should truncate down to commit / remove the second message. // After doing so, we should also be able to immediately store the message after. - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Equal(t, n.commit, 1) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) @@ -1898,7 +1950,8 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { require_Equal(t, entry.leader, nats1) // Heartbeat moves commit up. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 2) } @@ -1933,7 +1986,8 @@ func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) { aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Simply receive first message. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.commit, 0) require_Equal(t, n.wal.State().Msgs, 1) entry, err := n.loadEntry(1) @@ -1941,7 +1995,8 @@ func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) { require_Equal(t, entry.leader, nats0) // Receive second message, which commits the first message. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.commit, 1) require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) @@ -1951,7 +2006,8 @@ func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) { // We receive an entry from another leader, should truncate down to commit / remove the second message. // But, truncation fails so should register that and not change pindex/pterm. bindex, bterm := n.pindex, n.pterm - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Error(t, n.werr, errors.New("test: truncate always fails")) require_Equal(t, bindex, n.pindex) require_Equal(t, bterm, n.pterm) @@ -1994,12 +2050,14 @@ func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) // Simply receive first message. - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.pindex, 1) require_Equal(t, n.commit, 0) // Heartbeat moves commit up. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.commit, 1) // Manually call back down to applied, and then snapshot. @@ -2041,7 +2099,8 @@ func TestNRGHealthCheckWaitForCatchup(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Switch follower into catchup. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex @@ -2049,12 +2108,14 @@ func TestNRGHealthCheckWaitForCatchup(t *testing.T) { require_Equal(t, n.catchup.cindex, aeHeartbeat.pindex) // Catchup first message. - n.processAppendEntry(aeMsg1, n.catchup.sub) + aeMsg1.sub = n.catchup.sub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) require_False(t, n.Healthy()) // Catchup second message. - n.processAppendEntry(aeMsg2, n.catchup.sub) + aeMsg2.sub = n.catchup.sub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) require_Equal(t, n.commit, 1) require_False(t, n.Healthy()) @@ -2065,14 +2126,16 @@ func TestNRGHealthCheckWaitForCatchup(t *testing.T) { require_False(t, n.Healthy()) // Catchup third message. - n.processAppendEntry(aeMsg3, n.catchup.sub) + aeMsg3.sub = n.catchup.sub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) require_Equal(t, n.commit, 2) n.Applied(2) require_False(t, n.Healthy()) // Heartbeat stops catchup. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_True(t, n.catchup == nil) require_Equal(t, n.pindex, 3) require_Equal(t, n.commit, 3) @@ -2101,7 +2164,8 @@ func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Switch follower into catchup. - n.processAppendEntry(aeHeartbeat1, n.aesub) + aeHeartbeat1.sub = n.aesub + n.processAppendEntry(aeHeartbeat1) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex @@ -2109,24 +2173,28 @@ func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) // Catchup first message. - n.processAppendEntry(aeMsg1, n.catchup.sub) + aeMsg1.sub = n.catchup.sub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) require_False(t, n.Healthy()) // We miss this message, since we're catching up. - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_True(t, n.catchup != nil) require_Equal(t, n.pindex, 1) require_False(t, n.Healthy()) // We also miss the heartbeat, since we're catching up. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_True(t, n.catchup != nil) require_Equal(t, n.pindex, 1) require_False(t, n.Healthy()) // Catchup second message, this will stop catchup. - n.processAppendEntry(aeMsg2, n.catchup.sub) + aeMsg2.sub = n.catchup.sub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) require_Equal(t, n.commit, 1) n.Applied(1) @@ -2138,7 +2206,8 @@ func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) // We now get a 'future' heartbeat, should restart catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 1) // n.pterm require_Equal(t, n.catchup.pindex, 2) // n.pindex @@ -2147,14 +2216,16 @@ func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { require_False(t, n.Healthy()) // Catchup third message. - n.processAppendEntry(aeMsg3, n.catchup.sub) + aeMsg3.sub = n.catchup.sub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) require_Equal(t, n.commit, 2) n.Applied(2) require_False(t, n.Healthy()) // Heartbeat stops catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) + aeHeartbeat2.sub = n.aesub + n.processAppendEntry(aeHeartbeat2) require_True(t, n.catchup == nil) require_Equal(t, n.pindex, 3) require_Equal(t, n.commit, 3) @@ -2181,12 +2252,14 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) // Process first message. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) require_False(t, n.Healthy()) // Process second message, moves commit up. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) require_False(t, n.Healthy()) @@ -2200,7 +2273,8 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { require_True(t, n.Healthy()) // Heartbeat marks second message to be committed. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) require_Equal(t, n.pindex, 2) require_False(t, n.Healthy()) @@ -2229,7 +2303,8 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true} // Process first message. - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.pindex, 1) require_Equal(t, n.aflr, 0) @@ -2382,7 +2457,8 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { test.switchNode(n) } - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) select { case isLeader := <-n.LeadChangeC(): @@ -2424,7 +2500,8 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) { aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply}) // Simulate we missed all messages up to this point. - n.processAppendEntry(aeCatchupTrigger, n.aesub) + aeCatchupTrigger.sub = n.aesub + n.processAppendEntry(aeCatchupTrigger) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex @@ -2440,16 +2517,19 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) { require_True(t, strings.HasPrefix(msg.Reply, "$NRG.CR")) // Should NEVER respond to catchup messages. - n.processAppendEntry(aeMissedMsg, n.catchup.sub) + aeMissedMsg.sub = n.catchup.sub + n.processAppendEntry(aeMissedMsg) _, err = sub.NextMsg(time.Second) require_Error(t, err, nats.ErrTimeout) - n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + aeCatchupTrigger.sub = n.catchup.sub + n.processAppendEntry(aeCatchupTrigger) _, err = sub.NextMsg(time.Second) require_Error(t, err, nats.ErrTimeout) // Now we've received all messages, stop catchup, and respond success to new message. - n.processAppendEntry(aeHeartbeat, n.aesub) + aeHeartbeat.sub = n.aesub + n.processAppendEntry(aeHeartbeat) msg, err = sub.NextMsg(time.Second) require_NoError(t, err) ar = n.decodeAppendEntryResponse(msg.Data) @@ -2503,7 +2583,8 @@ func TestNRGRejectNewAppendEntryFromPreviousLeader(t *testing.T) { // Accept first message because it equals our term. n.term = 1 - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pterm, 1) require_Equal(t, n.pindex, 1) @@ -2511,7 +2592,8 @@ func TestNRGRejectNewAppendEntryFromPreviousLeader(t *testing.T) { require_NoError(t, n.processVoteRequest(&voteRequest{term: 5, lastTerm: 1, lastIndex: 2})) // Must reject entry from a previous term. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pterm, 1) require_Equal(t, n.pindex, 1) } @@ -2533,7 +2615,8 @@ func TestNRGRejectAppendEntryDuringCatchupFromPreviousLeader(t *testing.T) { // Accept first message because it equals our term. n.term = 1 - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 0) // n.pterm require_Equal(t, n.catchup.pindex, 0) // n.pindex @@ -2548,7 +2631,8 @@ func TestNRGRejectAppendEntryDuringCatchupFromPreviousLeader(t *testing.T) { // First catchup message is accepted. catchup := n.catchup - n.processAppendEntry(aeMsg1, catchup.sub) + aeMsg1.sub = catchup.sub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pterm, 1) require_Equal(t, n.pindex, 1) @@ -2564,7 +2648,8 @@ func TestNRGRejectAppendEntryDuringCatchupFromPreviousLeader(t *testing.T) { } // Now send the second catchup entry. - n.processAppendEntry(aeMsg2, nsub) + aeMsg2.sub = nsub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pterm, 1) // Under the old behavior this entry is wrongly accepted. @@ -2782,7 +2867,8 @@ func TestNRGSnapshotRecovery(t *testing.T) { aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries}) // Store one entry. - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.pindex, 1) require_Equal(t, n.commit, 1) require_Equal(t, n.applied, 0) @@ -2898,7 +2984,8 @@ func TestNRGInitializeAndScaleUp(t *testing.T) { require_False(t, vr.empty) // Processing an append entry resets scale up and puts us out of observer mode. - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.pindex, 1) require_False(t, n.initializing) require_False(t, n.scaleUp) @@ -2930,7 +3017,8 @@ func TestNRGInitializeAndScaleUp(t *testing.T) { } aeSnapshot := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) n.createCatchup(aeSnapshot) - n.processAppendEntry(aeSnapshot, n.catchup.sub) + aeSnapshot.sub = n.catchup.sub + n.processAppendEntry(aeSnapshot) require_False(t, n.initializing) require_False(t, n.scaleUp) require_False(t, n.observer) @@ -2969,7 +3057,8 @@ func TestNRGReplayOnSnapshotSameTerm(t *testing.T) { aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) // Process the first append entry. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) // Commit and apply. @@ -2985,15 +3074,18 @@ func TestNRGReplayOnSnapshotSameTerm(t *testing.T) { require_Equal(t, snap.lastIndex, 1) // Process other messages. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) // Replay the append entry that matches our snapshot. // This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup. // Should be recognized as a replay with the same term, marked as success, and not truncate. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 3) } @@ -3013,7 +3105,8 @@ func TestNRGReplayOnSnapshotDifferentTerm(t *testing.T) { aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 2, pindex: 2, entries: entries, lterm: 2}) // Process the first append entry. - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) // Commit and apply. @@ -3033,20 +3126,25 @@ func TestNRGReplayOnSnapshotDifferentTerm(t *testing.T) { n.applied = 0 // Process other messages. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) - n.processAppendEntry(aeMsg3, n.aesub) + + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) // Replay the append entry that matches our snapshot. // This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup. // Should be recognized as truncating back to the installed snapshot, not reset the WAL fully. // Since all is aligned after truncation, should also be able to apply the entry. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) // Should now also be able to apply the third entry. - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) } @@ -3129,14 +3227,16 @@ func TestNRGIgnoreEntryAfterCanceledCatchup(t *testing.T) { aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_True(t, n.catchup != nil) csub := n.catchup.sub n.cancelCatchup() // Catchup was canceled, a message on this canceled catchup should not be stored. - n.processAppendEntry(aeMsg1, csub) + aeMsg1.sub = csub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 0) } @@ -3155,12 +3255,15 @@ func TestNRGDelayedMessagesAfterCatchupDontCountTowardQuorum(t *testing.T) { aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries, reply: aeReply}) // Triggers catchup. - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_True(t, n.catchup != nil) // Catchup runs partially. - n.processAppendEntry(aeMsg1, n.catchup.sub) - n.processAppendEntry(aeMsg2, n.catchup.sub) + aeMsg1.sub = n.catchup.sub + aeMsg2.sub = n.catchup.sub + n.processAppendEntry(aeMsg1) + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) require_Equal(t, n.commit, 1) n.Applied(1) @@ -3179,19 +3282,22 @@ func TestNRGDelayedMessagesAfterCatchupDontCountTowardQuorum(t *testing.T) { // We now receive delayed "real-time" messages. // The first message needs to be a copy, because we've committed it before and returned it to the pool. aeMsg1Copy := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply}) - n.processAppendEntry(aeMsg1Copy, n.aesub) + aeMsg1Copy.sub = n.aesub + n.processAppendEntry(aeMsg1Copy) require_Equal(t, n.pindex, 2) // Should NOT reply "success", otherwise we would wrongfully provide quorum while not having an up-to-date log. _, err = sub.NextMsg(500 * time.Millisecond) require_Error(t, err, nats.ErrTimeout) - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) // Should NOT reply "success", otherwise we would wrongfully provide quorum while not having an up-to-date log. _, err = sub.NextMsg(500 * time.Millisecond) require_Error(t, err, nats.ErrTimeout) - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) // Should reply "success", this is the latest message. msg, err := sub.NextMsg(500 * time.Millisecond) @@ -3217,13 +3323,15 @@ func TestNRGStepdownWithHighestTermDuringCatchup(t *testing.T) { // Need to store the message, stepdown, and up term. n.switchToCandidate() require_Equal(t, n.term, 1) - n.processAppendEntry(aeMsg1, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) require_Equal(t, n.term, 10) require_Equal(t, n.pindex, 1) // Need to store the message, stepdown, and up term. n.switchToLeader() - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.term, 20) require_Equal(t, n.pindex, 2) } @@ -3255,8 +3363,10 @@ func TestNRGTruncateOnStartup(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) // Store two messages the normal way. - n.processAppendEntry(aeMsg1, n.aesub) - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg1.sub = n.aesub + n.processAppendEntry(aeMsg1) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) state := n.wal.State() @@ -3336,9 +3446,12 @@ func TestNRGLeaderCatchupHandling(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: entries}) - n.processAppendEntry(aeMsg1, n.aesub) - n.processAppendEntry(aeMsg2, n.aesub) - n.processAppendEntry(aeMsg3, n.aesub) + aeMsg1.sub = n.aesub + aeMsg2.sub = n.aesub + aeMsg3.sub = n.aesub + n.processAppendEntry(aeMsg1) + n.processAppendEntry(aeMsg2) + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) n.switchToLeader() @@ -3390,7 +3503,8 @@ func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) { aeMsg2Fork := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) // Trigger a catchup. - n.processAppendEntry(aeMsg2, n.aesub) + aeMsg2.sub = n.aesub + n.processAppendEntry(aeMsg2) validateCatchup := func() { t.Helper() require_True(t, n.catchup != nil) @@ -3401,7 +3515,8 @@ func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) { // Catchup the first missed entry. csub := n.catchup.sub - n.processAppendEntry(aeMsg1, csub) + aeMsg1.sub = csub + n.processAppendEntry(aeMsg1) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 20) @@ -3417,7 +3532,8 @@ func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) { // Would previously stall the catchup and restart it with a previous leader. n.catchup.pindex = aeMsg1.pindex + 1 n.catchup.active = time.Time{} - n.processAppendEntry(aeMsg1Fork, n.aesub) + aeMsg1Fork.sub = n.aesub + n.processAppendEntry(aeMsg1Fork) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 20) validateCatchup() @@ -3431,19 +3547,22 @@ func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) { require_Equal(t, ar.term, 20) // Would previously reset the WAL. - n.processAppendEntry(aeMsg2Fork, n.aesub) + aeMsg2Fork.sub = n.aesub + n.processAppendEntry(aeMsg2Fork) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 20) validateCatchup() // Now the catchup should continue, undisturbed by an old leader sending append entries. - n.processAppendEntry(aeMsg2, csub) + aeMsg2.sub = csub + n.processAppendEntry(aeMsg2) require_Equal(t, n.pindex, 2) require_Equal(t, n.pterm, 20) require_True(t, n.catchup == nil) // A remaining catchup entry can still be ingested, even if the catchup state itself is gone. - n.processAppendEntry(aeMsg3, csub) + aeMsg3.sub = csub + n.processAppendEntry(aeMsg3) require_Equal(t, n.pindex, 3) require_Equal(t, n.pterm, 20) } @@ -3467,7 +3586,8 @@ func TestNRGProcessed(t *testing.T) { // Store three entries. for i, aeMsg := range []*appendEntry{aeMsg1, aeMsg2, aeMsg3} { - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.pindex, uint64(i+1)) require_Equal(t, n.commit, uint64(i+1)) require_Equal(t, n.processed, 0) @@ -3496,7 +3616,8 @@ func TestNRGProcessed(t *testing.T) { // Store the remaining messages. for i, aeMsg := range []*appendEntry{aeMsg4, aeMsg5} { - n.processAppendEntry(aeMsg, n.aesub) + aeMsg.sub = n.aesub + n.processAppendEntry(aeMsg) require_Equal(t, n.pindex, uint64(i+4)) require_Equal(t, n.commit, uint64(i+4)) require_Equal(t, n.processed, 3)