Skip to content

Commit 98bac1f

Browse files
committed
Refactor processAppendEntry
Remove sub parameter from processAppendEntry, appendEntry already stores sub. Move all subscription pointer comparisons from processAppendEntryLocked to processAppendEntry. processAppendEntryLocked takes now a boolean instead of a subscription pointer. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent acb54f1 commit 98bac1f

File tree

2 files changed

+274
-153
lines changed

2 files changed

+274
-153
lines changed

server/raft.go

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2156,7 +2156,7 @@ func (n *raft) processAppendEntries() {
21562156
aes := n.entry.pop()
21572157
if canProcess {
21582158
for _, ae := range aes {
2159-
n.processAppendEntry(ae, ae.sub)
2159+
n.processAppendEntry(ae)
21602160
}
21612161
}
21622162
n.entry.recycle(&aes)
@@ -3457,13 +3457,27 @@ func (n *raft) recoverAppendEntry(ae *appendEntry) {
34573457

34583458
// processAppendEntry will process an appendEntry. This is called from
34593459
// processAppendEntries when there are new entries to be committed.
3460-
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
3460+
func (n *raft) processAppendEntry(ae *appendEntry) {
3461+
var ar *appendEntryResponse
3462+
34613463
// Make a copy of the reply subject, as ae may return
34623464
// to its pool as part of processAppendEntryLocked
34633465
subject := ae.reply
34643466

34653467
n.Lock()
3466-
ar := n.processAppendEntryLocked(ae, sub)
3468+
isNew := ae.sub != nil && ae.sub == n.aesub
3469+
isCatchup := !isNew && n.catchup != nil && ae.sub == n.catchup.sub
3470+
3471+
// Process the appendEntry if it is a new proposal, or if we are
3472+
// catching up. Ignore messages coming from old catchup subs when
3473+
// ae.lterm <= 0. Older versions of the server do not send the leader
3474+
// term when catching up. Old catchups from newer subs can be rejected
3475+
// later by checking that the appendEntry is on the correct term.
3476+
if isNew || isCatchup || ae.lterm > 0 {
3477+
ar = n.processAppendEntryLocked(ae, isNew)
3478+
} else {
3479+
n.debug("processAppendEntry ignore entry from old subscription")
3480+
}
34673481
n.Unlock()
34683482

34693483
if ar != nil {
@@ -3473,23 +3487,25 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
34733487
}
34743488
}
34753489

3476-
// Process the given appendEntry. Optionally returns a appendEntryResponse.
3477-
// The caller is responsible for sending out the response and return it to
3478-
// its pool.
3490+
// Process the given appendEntry. Parameter isNew indicates whether the
3491+
// appendEntry is comes from a new proposal or from a catchup request.
3492+
// Optionally returns a appendEntryResponse. The caller is responsible
3493+
// for sending out the response and return it to its pool.
34793494
// Lock should be held.
3480-
func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *appendEntryResponse {
3495+
func (n *raft) processAppendEntryLocked(ae *appendEntry, isNew bool) *appendEntryResponse {
34813496
// Don't reset here if we have been asked to assume leader position.
34823497
if !n.lxfer {
34833498
n.resetElectionTimeout()
34843499
}
34853500

3486-
// Just return if closed or we had previous write error, or invalid sub
3487-
if n.State() == Closed || n.werr != nil || sub == nil {
3501+
// Just return if closed or we had previous write error
3502+
if n.State() == Closed || n.werr != nil {
34883503
return nil
34893504
}
34903505

3491-
// Grab term from append entry. But if leader explicitly defined its term, use that instead.
3492-
// This is required during catchup if the leader catches us up on older items from previous terms.
3506+
// Grab term from append entry. But if leader explicitly defined its term,
3507+
// use that instead. This is required during catchup if the leader catches
3508+
// us up on older items from previous terms.
34933509
// While still allowing us to confirm they're matching our highest known term.
34943510
lterm := ae.term
34953511
if ae.lterm != 0 {
@@ -3544,12 +3560,6 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35443560
}
35453561
}
35463562

3547-
// Catching up state.
3548-
catchingUp := n.catchup != nil
3549-
// Is this a new entry? New entries will be delivered on the append entry
3550-
// sub, rather than a catch-up sub.
3551-
isNew := sub == n.aesub
3552-
35533563
// Track leader directly
35543564
if isNew && ae.leader != noLeader {
35553565
if ps := n.peers[ae.leader]; ps != nil {
@@ -3559,14 +3569,6 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35593569
}
35603570
}
35613571

3562-
// If we are/were catching up ignore old catchup subs, but only if catching up from an older server
3563-
// that doesn't send the leader term when catching up. We can reject old catchups from newer subs
3564-
// later, just by checking the append entry is on the correct term.
3565-
if !isNew && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) {
3566-
n.debug("AppendEntry ignoring old entry from previous catchup")
3567-
return nil
3568-
}
3569-
35703572
// If this term is greater than ours.
35713573
if lterm > n.term {
35723574
n.term = lterm
@@ -3590,12 +3592,10 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35903592
}
35913593

35923594
// Check state if we are catching up.
3593-
if catchingUp {
3594-
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
3595+
if n.catchup != nil {
3596+
if n.pterm >= n.catchup.cterm && n.pindex >= n.catchup.cindex {
35953597
// If we are here we are good, so if we have a catchup pending we can cancel.
35963598
n.cancelCatchup()
3597-
// Reset our notion of catching up.
3598-
catchingUp = false
35993599
} else if isNew {
36003600
// Check to see if we are stalled. If so recreate our catchup state and resend response.
36013601
if n.catchupStalled() {
@@ -3682,7 +3682,7 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
36823682
// Check if we are catching up. If we are here we know the leader did not have all of the entries
36833683
// so make sure this is a snapshot entry. If it is not start the catchup process again since it
36843684
// means we may have missed additional messages.
3685-
if catchingUp {
3685+
if n.catchup != nil {
36863686
// This means we already entered into a catchup state but what the leader sent us did not match what we expected.
36873687
// Snapshots and peerstate will always be together when a leader is catching us up in this fashion.
36883688
if len(ae.entries) != 2 || ae.entries[0].Type != EntrySnapshot || ae.entries[1].Type != EntryPeerState {

0 commit comments

Comments
 (0)