@@ -2156,7 +2156,7 @@ func (n *raft) processAppendEntries() {
21562156 aes := n .entry .pop ()
21572157 if canProcess {
21582158 for _ , ae := range aes {
2159- n .processAppendEntry (ae , ae . sub )
2159+ n .processAppendEntry (ae )
21602160 }
21612161 }
21622162 n .entry .recycle (& aes )
@@ -3457,13 +3457,27 @@ func (n *raft) recoverAppendEntry(ae *appendEntry) {
34573457
34583458// processAppendEntry will process an appendEntry. This is called from
34593459// processAppendEntries when there are new entries to be committed.
3460- func (n * raft ) processAppendEntry (ae * appendEntry , sub * subscription ) {
3460+ func (n * raft ) processAppendEntry (ae * appendEntry ) {
3461+ var ar * appendEntryResponse
3462+
34613463 // Make a copy of the reply subject, as ae may return
34623464 // to its pool as part of processAppendEntryLocked
34633465 subject := ae .reply
34643466
34653467 n .Lock ()
3466- ar := n .processAppendEntryLocked (ae , sub )
3468+ isNew := ae .sub != nil && ae .sub == n .aesub
3469+ isCatchup := ! isNew && n .catchup != nil && ae .sub == n .catchup .sub
3470+
3471+ // Process the appendEntry if it is a new proposal, or if we are
3472+ // catching up. Ignore messages coming from old catchup subs when
3473+ // ae.lterm <= 0. Older versions of the server do not send the leader
3474+ // term when catching up. Old catchups from newer subs can be rejected
3475+ // later by checking that the appendEntry is on the correct term.
3476+ if isNew || isCatchup || ae .lterm > 0 {
3477+ ar = n .processAppendEntryLocked (ae , isNew )
3478+ } else {
3479+ n .debug ("processAppendEntry ignore entry from old subscription" )
3480+ }
34673481 n .Unlock ()
34683482
34693483 if ar != nil {
@@ -3473,23 +3487,25 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
34733487 }
34743488}
34753489
3476- // Process the given appendEntry. Optionally returns a appendEntryResponse.
3477- // The caller is responsible for sending out the response and return it to
3478- // its pool.
3490+ // Process the given appendEntry. Parameter isNew indicates whether the
3491+ // appendEntry is comes from a new proposal or from a catchup request.
3492+ // Optionally returns a appendEntryResponse. The caller is responsible
3493+ // for sending out the response and return it to its pool.
34793494// Lock should be held.
3480- func (n * raft ) processAppendEntryLocked (ae * appendEntry , sub * subscription ) * appendEntryResponse {
3495+ func (n * raft ) processAppendEntryLocked (ae * appendEntry , isNew bool ) * appendEntryResponse {
34813496 // Don't reset here if we have been asked to assume leader position.
34823497 if ! n .lxfer {
34833498 n .resetElectionTimeout ()
34843499 }
34853500
3486- // Just return if closed or we had previous write error, or invalid sub
3487- if n .State () == Closed || n .werr != nil || sub == nil {
3501+ // Just return if closed or we had previous write error.
3502+ if n .State () == Closed || n .werr != nil {
34883503 return nil
34893504 }
34903505
3491- // Grab term from append entry. But if leader explicitly defined its term, use that instead.
3492- // This is required during catchup if the leader catches us up on older items from previous terms.
3506+ // Grab term from append entry. But if leader explicitly defined its term,
3507+ // use that instead. This is required during catchup if the leader catches
3508+ // us up on older items from previous terms.
34933509 // While still allowing us to confirm they're matching our highest known term.
34943510 lterm := ae .term
34953511 if ae .lterm != 0 {
@@ -3544,12 +3560,6 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35443560 }
35453561 }
35463562
3547- // Catching up state.
3548- catchingUp := n .catchup != nil
3549- // Is this a new entry? New entries will be delivered on the append entry
3550- // sub, rather than a catch-up sub.
3551- isNew := sub == n .aesub
3552-
35533563 // Track leader directly
35543564 if isNew && ae .leader != noLeader {
35553565 if ps := n .peers [ae .leader ]; ps != nil {
@@ -3559,14 +3569,6 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35593569 }
35603570 }
35613571
3562- // If we are/were catching up ignore old catchup subs, but only if catching up from an older server
3563- // that doesn't send the leader term when catching up. We can reject old catchups from newer subs
3564- // later, just by checking the append entry is on the correct term.
3565- if ! isNew && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
3566- n .debug ("AppendEntry ignoring old entry from previous catchup" )
3567- return nil
3568- }
3569-
35703572 // If this term is greater than ours.
35713573 if lterm > n .term {
35723574 n .term = lterm
@@ -3590,12 +3592,10 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35903592 }
35913593
35923594 // Check state if we are catching up.
3593- if catchingUp {
3594- if cs := n . catchup ; cs != nil && n . pterm >= cs . cterm && n .pindex >= cs .cindex {
3595+ if n . catchup != nil {
3596+ if n . pterm >= n . catchup . cterm && n .pindex >= n . catchup .cindex {
35953597 // If we are here we are good, so if we have a catchup pending we can cancel.
35963598 n .cancelCatchup ()
3597- // Reset our notion of catching up.
3598- catchingUp = false
35993599 } else if isNew {
36003600 // Check to see if we are stalled. If so recreate our catchup state and resend response.
36013601 if n .catchupStalled () {
@@ -3682,7 +3682,7 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
36823682 // Check if we are catching up. If we are here we know the leader did not have all of the entries
36833683 // so make sure this is a snapshot entry. If it is not start the catchup process again since it
36843684 // means we may have missed additional messages.
3685- if catchingUp {
3685+ if n . catchup != nil {
36863686 // This means we already entered into a catchup state but what the leader sent us did not match what we expected.
36873687 // Snapshots and peerstate will always be together when a leader is catching us up in this fashion.
36883688 if len (ae .entries ) != 2 || ae .entries [0 ].Type != EntrySnapshot || ae .entries [1 ].Type != EntryPeerState {
0 commit comments