Skip to content

Commit 3096a0c

Browse files
(2.12) Async docs
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 303118d commit 3096a0c

File tree

5 files changed

+45
-35
lines changed

5 files changed

+45
-35
lines changed

server/filestore.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4206,6 +4206,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t
42064206
}
42074207

42084208
// StoreRawMsg stores a raw message with expected sequence number and timestamp.
4209+
// The ceIndex reflects the index of the entry in the WAL, used for signaling when it's persisted.
42094210
func (fs *fileStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64, ceIndex uint64) error {
42104211
fs.mu.Lock()
42114212
err := fs.storeRawMsg(subj, hdr, msg, seq, ts, ttl, ceIndex)
@@ -4288,6 +4289,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time, ceIndex uint64) {
42884289
}
42894290

42904291
// SkipMsg will use the next sequence number but not store anything.
4292+
// The ceIndex reflects the index of the entry in the WAL, used for signaling when it's persisted.
42914293
func (fs *fileStore) SkipMsg(ceIndex uint64) uint64 {
42924294
fs.mu.Lock()
42934295
defer fs.mu.Unlock()

server/raft.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ type RaftNode interface {
4444
SendSnapshot(snap []byte) error
4545
NeedSnapshot() bool
4646
Applied(index uint64) (entries uint64, bytes uint64)
47-
TrackWrite(index uint64)
48-
WritePersisted(index uint64)
47+
ApplyWritePending(index uint64)
48+
ApplyWritePersisted(index uint64)
4949
State() RaftState
5050
Size() (entries, bytes uint64)
5151
Progress() (index, commit, applied uint64)
@@ -1126,42 +1126,49 @@ func (n *raft) appliedLocked(index uint64) {
11261126
}
11271127
}
11281128

1129-
// TrackWrite signals writes need to happen for the specified index.
1130-
// Applied can still be called, but will not move up until WritePersisted
1131-
// is called, signaling writes were persisted.
1132-
func (n *raft) TrackWrite(index uint64) {
1129+
// ApplyWritePending signals writes need to happen for the specified index,
1130+
// i.e. writes are done asynchronously and might take some time for them to hit the disk.
1131+
// Applied can still be called after ApplyWritePending if there are any async writes.
1132+
// But applied will not move up until ApplyWritePersisted is called, signaling writes
1133+
// were persisted.
1134+
// If there are async writes ApplyWritePending MUST ALWAYS be called before Applied.
1135+
// This ensures Applied can be blocked from moving up until the writes are fully persisted.
1136+
// If ApplyWritePending is not called, for example when all writes are persisted by default,
1137+
// Applied will be able to freely move up.
1138+
func (n *raft) ApplyWritePending(index uint64) {
11331139
n.Lock()
11341140
defer n.Unlock()
1135-
n.trackWritesLocked(index, false)
1141+
n.applyWritesLocked(index, false)
11361142
}
11371143

1138-
// WritePersisted signals all writes up to and including the index were persisted.
1139-
// Applied automatically moves up if it was called in the meantime.
1140-
func (n *raft) WritePersisted(index uint64) {
1144+
// ApplyWritePersisted signals all writes up to and including the index were persisted,
1145+
// this can be called either before Applied or after as long as the writes were persisted.
1146+
// ApplyWritePending MUST be called before this for every index that requires persisting.
1147+
// Applied will have been called in the meantime, and it will be moved up either:
1148+
// - up to the point where we're waiting for other writes to be persisted
1149+
// - or all writes are persisted, and it can fully move up.
1150+
func (n *raft) ApplyWritePersisted(index uint64) {
11411151
n.Lock()
11421152
defer n.Unlock()
1143-
n.trackWritesLocked(index, true)
1153+
n.applyWritesLocked(index, true)
11441154
}
11451155

1146-
// trackWritesLocked keeps track of pending and persisted writes.
1147-
func (n *raft) trackWritesLocked(index uint64, persisted bool) {
1156+
// applyWritesLocked keeps track of pending and persisted writes.
1157+
func (n *raft) applyWritesLocked(index uint64, persisted bool) {
11481158
if persisted {
11491159
// Persisted moves the floor up.
1160+
// This indicates all writes up to and including this floor were persisted.
11501161
n.persistFloor = max(n.persistFloor, index)
1151-
} else if n.persistFloor > 0 {
1152-
// Track the lowest to-be-persisted index.
1153-
n.persistFloor = min(n.persistFloor, index)
1154-
} else {
1162+
n.persistFloorApplied = true
1163+
} else if n.persistFloor == 0 {
11551164
// Initializes if not set.
11561165
n.persistFloor = index
11571166
}
11581167
// Track the highest observed pending write.
11591168
n.persistHigh = max(n.persistHigh, index)
1160-
// Track whether the floor was persisted.
1161-
n.persistFloorApplied = n.persistFloorApplied || persisted
11621169

11631170
if n.persistFloorApplied {
1164-
// Move applied up partially.
1171+
// If the floor is persisted, we can move applied up if possible.
11651172
if n.persistFloor != n.persistHigh {
11661173
n.appliedLocked(min(n.persistFloor, n.pendingApplied))
11671174
return

server/raft_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2759,36 +2759,36 @@ func TestNRGPendingAppliedPermutations(t *testing.T) {
27592759
t.Run("ApplyOnDone", func(t *testing.T) {
27602760
defer reset()
27612761
// Call applied after.
2762-
n.TrackWrite(1)
2763-
n.WritePersisted(1)
2762+
n.ApplyWritePending(1)
2763+
n.ApplyWritePersisted(1)
27642764
n.Applied(10)
27652765
require_Equal(t, n.applied, 10)
27662766

27672767
// Call applied before.
2768-
n.TrackWrite(11)
2768+
n.ApplyWritePending(11)
27692769
n.Applied(20)
27702770
require_Equal(t, n.applied, 10)
2771-
n.WritePersisted(11)
2771+
n.ApplyWritePersisted(11)
27722772
require_Equal(t, n.applied, 20)
27732773
})
27742774

27752775
// Applied can't move up until all pending entries referencing the same index are applied.
27762776
t.Run("ApplyWhenNotPending", func(t *testing.T) {
27772777
defer reset()
2778-
n.TrackWrite(1)
2779-
n.TrackWrite(2)
2778+
n.ApplyWritePending(1)
2779+
n.ApplyWritePending(2)
27802780
n.Applied(3)
27812781
require_Equal(t, n.applied, 0)
2782-
n.WritePersisted(1)
2782+
n.ApplyWritePersisted(1)
27832783
require_Equal(t, n.applied, 1)
2784-
n.WritePersisted(2)
2784+
n.ApplyWritePersisted(2)
27852785
require_Equal(t, n.applied, 3)
2786-
n.TrackWrite(10)
2786+
n.ApplyWritePending(10)
27872787
n.Applied(10)
27882788
require_Equal(t, n.applied, 9)
27892789

2790-
// Apply is unblocked once the last pending is applied.
2791-
n.WritePersisted(10)
2790+
// Apply is unblocked once the last pending is persisted.
2791+
n.ApplyWritePersisted(10)
27922792
require_Equal(t, n.applied, 10)
27932793
n.Applied(11)
27942794
require_Equal(t, n.applied, 11)
@@ -2797,10 +2797,10 @@ func TestNRGPendingAppliedPermutations(t *testing.T) {
27972797
// Pending state must be reset if all is done.
27982798
t.Run("DonePendingResetsState", func(t *testing.T) {
27992799
defer reset()
2800-
n.TrackWrite(1)
2800+
n.ApplyWritePending(1)
28012801
n.Applied(1)
28022802
require_Equal(t, n.applied, 0)
2803-
n.WritePersisted(1)
2803+
n.ApplyWritePersisted(1)
28042804
require_Equal(t, n.applied, 1)
28052805
require_Equal(t, n.persistFloor, 0)
28062806
require_Equal(t, n.persistHigh, 0)

server/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type StorageUpdateHandler func(msgs, bytes int64, seq uint64, subj string)
8888
type StorageRemoveMsgHandler func(seq uint64)
8989

9090
// Used to call back into the upper layers that a write is scheduled for a given replication index.
91-
// Below handler will be called once this data is persisted.
91+
// Below StorageWritePersistedHandler will be called once this data is persisted.
9292
type StorageTrackWriteHandler func(index uint64)
9393

9494
// Used to call back into the upper layers that certain writes were persisted up to

server/stream.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
914914
}
915915
// Update store callbacks that call into the Raft node (if any).
916916
if node != nil {
917-
mset.store.RegisterStorageTrackWrites(node.TrackWrite, node.WritePersisted)
917+
mset.store.RegisterStorageTrackWrites(node.ApplyWritePending, node.ApplyWritePersisted)
918918
} else {
919919
mset.store.RegisterStorageTrackWrites(nil, nil)
920920
}
@@ -4853,6 +4853,7 @@ var (
48534853
)
48544854

48554855
// processJetStreamMsg is where we try to actually process the stream msg.
4856+
// The ceIndex reflects the index of the entry in the WAL, used for signaling when it's persisted.
48564857
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, mt *msgTrace, sourced bool, ceIndex uint64) (retErr error) {
48574858
if mt != nil {
48584859
// Only the leader/standalone will have mt!=nil. On exit, send the

0 commit comments

Comments
 (0)