Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 99 additions & 103 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
truncateAndErr(index - 1)
break
}
n.processAppendEntry(ae, nil)
n.recoverAppendEntry(ae)
// Check how much we have queued up so far to determine if we should pause.
for _, e := range ae.entries {
qsz += len(e.Data)
Expand Down Expand Up @@ -2156,7 +2156,7 @@ func (n *raft) processAppendEntries() {
aes := n.entry.pop()
if canProcess {
for _, ae := range aes {
n.processAppendEntry(ae, ae.sub)
n.processAppendEntry(ae)
}
}
n.entry.recycle(&aes)
Expand Down Expand Up @@ -3444,28 +3444,68 @@ func (n *raft) updateLeader(newLeader string) {
}
}

// processAppendEntry will process an appendEntry. This is called either
// during recovery or from processAppendEntries when there are new entries
// to be committed.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// recoverAppendEntry will process an appendEntry for recovery
func (n *raft) recoverAppendEntry(ae *appendEntry) {
n.Lock()
defer n.Unlock()

n.pterm = ae.term
n.pindex = ae.pindex + 1

n.processEntriesAndCommit(ae, false)
}

// processAppendEntry will process an appendEntry. This is called from
// processAppendEntries when there are new entries to be committed.
func (n *raft) processAppendEntry(ae *appendEntry) {
var ar *appendEntryResponse

// Make a copy of the reply subject, as ae may return
// to its pool as part of processAppendEntryLocked
subject := ae.reply

n.Lock()
isNew := ae.sub != nil && ae.sub == n.aesub
isCatchup := !isNew && n.catchup != nil && ae.sub == n.catchup.sub

// Process the appendEntry if it is a new proposal, or if we are
// catching up. Ignore messages coming from old catchup subs when
// ae.lterm <= 0. Older versions of the server do not send the leader
// term when catching up. Old catchups from newer subs can be rejected
// later by checking that the appendEntry is on the correct term.
if isNew || isCatchup || ae.lterm > 0 {
ar = n.processAppendEntryLocked(ae, isNew)
} else {
n.debug("processAppendEntry ignore entry from old subscription")
}
n.Unlock()

if ar != nil {
var scratch [appendEntryResponseLen]byte
n.sendRPC(subject, ar.reply, ar.encode(scratch[:]))
arPool.Put(ar)
}
}

// Process the given appendEntry. Parameter isNew indicates whether the
// appendEntry is comes from a new proposal or from a catchup request.
// Optionally returns a appendEntryResponse. The caller is responsible
// for sending out the response and return it to its pool.
// Lock should be held.
func (n *raft) processAppendEntryLocked(ae *appendEntry, isNew bool) *appendEntryResponse {
// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
}

// Just return if closed or we had previous write error.
if n.State() == Closed || n.werr != nil {
n.Unlock()
return
return nil
}

// Scratch buffer for responses.
var scratch [appendEntryResponseLen]byte
arbuf := scratch[:]

// Grab term from append entry. But if leader explicitly defined its term, use that instead.
// This is required during catchup if the leader catches us up on older items from previous terms.
// Grab term from append entry. But if leader explicitly defined its term,
// use that instead. This is required during catchup if the leader catches
// us up on older items from previous terms.
// While still allowing us to confirm they're matching our highest known term.
lterm := ae.term
if ae.lterm != 0 {
Expand Down Expand Up @@ -3496,12 +3536,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.stepdownLocked(ae.leader)
} else {
// Let them know we are the leader.
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
n.debug("AppendEntry ignoring old term from another leader")
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
n.Unlock()
return
return newAppendEntryResponse(n.term, n.pindex, n.id, false)
}
}

Expand All @@ -3524,12 +3560,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}

// Catching up state.
catchingUp := n.catchup != nil
// Is this a new entry? New entries will be delivered on the append entry
// sub, rather than a catch-up sub.
isNew := sub != nil && sub == n.aesub

// Track leader directly
if isNew && ae.leader != noLeader {
if ps := n.peers[ae.leader]; ps != nil {
Expand All @@ -3539,15 +3569,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}

// If we are/were catching up ignore old catchup subs, but only if catching up from an older server
// that doesn't send the leader term when catching up. We can reject old catchups from newer subs
// later, just by checking the append entry is on the correct term.
if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) {
n.Unlock()
n.debug("AppendEntry ignoring old entry from previous catchup")
return
}

// If this term is greater than ours.
if lterm > n.term {
n.term = lterm
Expand All @@ -3559,46 +3580,33 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdownLocked(ae.leader)
}
} else if lterm < n.term && sub != nil && (isNew || ae.lterm != 0) {
} else if lterm < n.term && (isNew || ae.lterm != 0) {
// Anything that's below our expected highest term needs to be rejected.
// Unless we're replaying (sub=nil), in which case we'll always continue.
// For backward-compatibility we shouldn't reject if we're being caught up by an old server.
if !isNew {
n.debug("AppendEntry ignoring old entry from previous catchup")
n.Unlock()
return
return nil
}
n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, lterm)
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
return
return newAppendEntryResponse(n.term, n.pindex, n.id, false)
}

// Check state if we are catching up.
if catchingUp {
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
if n.catchup != nil {
if n.pterm >= n.catchup.cterm && n.pindex >= n.catchup.cindex {
// If we are here we are good, so if we have a catchup pending we can cancel.
n.cancelCatchup()
// Reset our notion of catching up.
catchingUp = false
} else if isNew {
var ar *appendEntryResponse
var inbox string
// Check to see if we are stalled. If so recreate our catchup state and resend response.
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
}
n.Unlock()
if ar != nil {
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar.reply = inbox
return ar
}
// Ignore new while catching up or replaying.
return
return nil
}
}

Expand Down Expand Up @@ -3668,21 +3676,19 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// For example, if we got partial catchup, and then the "real-time" messages came in very delayed.
// If we reported "success" on those "real-time" messages, we'd wrongfully be providing
// quorum while not having an up-to-date log.
n.Unlock()
return
return nil
}

// Check if we are catching up. If we are here we know the leader did not have all of the entries
// so make sure this is a snapshot entry. If it is not start the catchup process again since it
// means we may have missed additional messages.
if catchingUp {
if n.catchup != nil {
// This means we already entered into a catchup state but what the leader sent us did not match what we expected.
// Snapshots and peerstate will always be together when a leader is catching us up in this fashion.
if len(ae.entries) != 2 || ae.entries[0].Type != EntrySnapshot || ae.entries[1].Type != EntryPeerState {
n.warn("Expected first catchup entry to be a snapshot and peerstate, will retry")
n.cancelCatchup()
n.Unlock()
return
return nil
}

if ps, err := decodePeerState(ae.entries[1].Data); err == nil {
Expand All @@ -3692,8 +3698,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
} else {
n.warn("Could not parse snapshot peerstate correctly")
n.cancelCatchup()
n.Unlock()
return
return nil
}

// Inherit state from appendEntry with the leader's snapshot.
Expand All @@ -3710,48 +3715,54 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Install the leader's snapshot as our own.
if err := n.installSnapshot(snap); err != nil {
n.setWriteErrLocked(err)
n.Unlock()
return
return nil
}
n.resetInitializing()

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.Unlock()
return
return nil
}

// Setup our state for catching up.
n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
return
ar.reply = inbox
return ar
}

CONTINUE:
// Save to our WAL if we have entries.
if ae.shouldStore() {
// Only store if an original which will have sub != nil
if sub != nil {
if err := n.storeToWAL(ae); err != nil {
if err != ErrStoreClosed {
n.warn("Error storing entry to WAL: %v", err)
}
n.Unlock()
return
if err := n.storeToWAL(ae); err != nil {
if err != ErrStoreClosed {
n.warn("Error storing entry to WAL: %v", err)
}
n.cachePendingEntry(ae)
n.resetInitializing()
} else {
// This is a replay on startup so just take the appendEntry version.
n.pterm = ae.term
n.pindex = ae.pindex + 1
return nil
}
n.cachePendingEntry(ae)
n.resetInitializing()
}

// ae should no longer be used after this call as
// processEntriesAndCommit may return the appendEntry back to its pool
n.processEntriesAndCommit(ae, isNew)

// Only ever respond to new entries.
// Never respond to catchup messages, because providing quorum based on this is unsafe.
// The only way for the leader to receive "success" MUST be through this path.
if isNew {
// Success. Send our response.
return newAppendEntryResponse(n.pterm, n.pindex, n.id, true)
}

return nil
}

// Process all entries in appendEntry and try to commit.
// Lock should be held.
func (n *raft) processEntriesAndCommit(ae *appendEntry, isNew bool) {
// Check to see if we have any related entries to process here.
for _, e := range ae.entries {
switch e.Type {
Expand Down Expand Up @@ -3789,9 +3800,9 @@ CONTINUE:
}
}

// Make a copy of these values, as the AppendEntry might be cached and returned to the pool in applyCommit.
// Copy ae.commit as the AppendEntry might be cached and returned to the
// pool in applyCommit.
aeCommit := ae.commit
aeReply := ae.reply

// Apply anything we need here.
if aeCommit > n.commit {
Expand All @@ -3806,21 +3817,6 @@ CONTINUE:
}
}
}

// Only ever respond to new entries.
// Never respond to catchup messages, because providing quorum based on this is unsafe.
// The only way for the leader to receive "success" MUST be through this path.
var ar *appendEntryResponse
if sub != nil && isNew {
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true)
}
n.Unlock()

// Success. Send our response.
if ar != nil {
n.sendRPC(aeReply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
}
}

// resetInitializing resets the notion of initializing.
Expand Down
Loading
Loading