Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,16 @@ func NewBufferedSender(
sender: sender,
metrics: bsMetrics,
}
bs.queueMu.buffer = newEventQueue()
bs.notifyDataC = make(chan struct{}, 1)
bs.queueMu.buffer = newEventQueue()
bs.queueMu.perStreamCapacity = RangefeedSingleBufferedSenderQueueMaxPerReg.Get(&settings.SV)
bs.queueMu.byStream = make(map[int64]streamStatus)
return bs
}

// sendBuffered buffers the event before sending it to the underlying
// gRPC stream. It does not block. sendBuffered will take the
// ownership of the alloc and release it if the returned error is
// non-nil. It only errors in the case of an already stopped stream.
// sendBuffered buffers the event before sending it to the underlying gRPC
// stream. It does not block. It errors in the case of a stopped sender of if
// the registration has exceeded its capacity.
func (bs *BufferedSender) sendBuffered(
ev *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation,
) error {
Expand Down Expand Up @@ -174,7 +172,12 @@ func (bs *BufferedSender) sendBuffered(
// as disconnected.
//
// The only unfortunate exception is if we get disconnected while flushing
// the catch-up scan buffer.
// the catch-up scan buffer. In this case we admit the event and stay in
// state overflowing until we actually receive the error.
//
// TODO(ssd): Given the above exception, we should perhaps just move
// directly to streamOverflowed. But, I think instead we want to remove
// that exception if possible.
if ev.Error != nil {
status.state = streamOverflowed
}
Expand Down Expand Up @@ -256,13 +259,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
if ok {
state, streamFound := bs.queueMu.byStream[event.ev.StreamID]
if streamFound {
state.queueItems -= 1
state.queueItems--
bs.queueMu.byStream[event.ev.StreamID] = state
}
}
return event, ok
}

// addStream initializes the per-stream tracking for the given streamID.
func (bs *BufferedSender) addStream(streamID int64) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
Expand All @@ -275,6 +279,12 @@ func (bs *BufferedSender) addStream(streamID int64) {
}
}

// removeStream removes the per-stream state tracking from the sender.
//
// TODO(ssd): There may be items still in the queue when removeStream is called.
// We'd like to solve this by removing this as a possibility. But this is OK
// since we will eventually process the events and the client knows to ignore
// them.
func (bs *BufferedSender) removeStream(streamID int64) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
Expand Down Expand Up @@ -308,10 +318,7 @@ func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
MaxRetries: 50,
}
for re := retry.StartWithCtx(ctx, opts); re.Next(); {
bs.queueMu.Lock()
caughtUp := bs.queueMu.buffer.len() == 0 // nolint:deferunlockcheck
bs.queueMu.Unlock()
if caughtUp {
if bs.len() == 0 {
return nil
}
}
Expand Down
Loading