From 4ee4606c92d1474ab92d35cc48a27100d0a2d5e2 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 13 Oct 2025 11:26:01 +0100 Subject: [PATCH] rangefeed: remove duplicate buffer initialization in buffered sender While here, I also updated some comments and added a few TODOs. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/buffered_sender.go | 29 ++++++++++++-------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go index 6d049c6ae724..b63b418fc157 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go @@ -122,7 +122,6 @@ func NewBufferedSender( sender: sender, metrics: bsMetrics, } - bs.queueMu.buffer = newEventQueue() bs.notifyDataC = make(chan struct{}, 1) bs.queueMu.buffer = newEventQueue() bs.queueMu.perStreamCapacity = RangefeedSingleBufferedSenderQueueMaxPerReg.Get(&settings.SV) @@ -130,10 +129,9 @@ func NewBufferedSender( return bs } -// sendBuffered buffers the event before sending it to the underlying -// gRPC stream. It does not block. sendBuffered will take the -// ownership of the alloc and release it if the returned error is -// non-nil. It only errors in the case of an already stopped stream. +// sendBuffered buffers the event before sending it to the underlying gRPC +// stream. It does not block. It errors in the case of a stopped sender of if +// the registration has exceeded its capacity. func (bs *BufferedSender) sendBuffered( ev *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation, ) error { @@ -174,7 +172,12 @@ func (bs *BufferedSender) sendBuffered( // as disconnected. // // The only unfortunate exception is if we get disconnected while flushing - // the catch-up scan buffer. + // the catch-up scan buffer. In this case we admit the event and stay in + // state overflowing until we actually receive the error. + // + // TODO(ssd): Given the above exception, we should perhaps just move + // directly to streamOverflowed. But, I think instead we want to remove + // that exception if possible. if ev.Error != nil { status.state = streamOverflowed } @@ -256,13 +259,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) { if ok { state, streamFound := bs.queueMu.byStream[event.ev.StreamID] if streamFound { - state.queueItems -= 1 + state.queueItems-- bs.queueMu.byStream[event.ev.StreamID] = state } } return event, ok } +// addStream initializes the per-stream tracking for the given streamID. func (bs *BufferedSender) addStream(streamID int64) { bs.queueMu.Lock() defer bs.queueMu.Unlock() @@ -275,6 +279,12 @@ func (bs *BufferedSender) addStream(streamID int64) { } } +// removeStream removes the per-stream state tracking from the sender. +// +// TODO(ssd): There may be items still in the queue when removeStream is called. +// We'd like to solve this by removing this as a possibility. But this is OK +// since we will eventually process the events and the client knows to ignore +// them. func (bs *BufferedSender) removeStream(streamID int64) { bs.queueMu.Lock() defer bs.queueMu.Unlock() @@ -308,10 +318,7 @@ func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error { MaxRetries: 50, } for re := retry.StartWithCtx(ctx, opts); re.Next(); { - bs.queueMu.Lock() - caughtUp := bs.queueMu.buffer.len() == 0 // nolint:deferunlockcheck - bs.queueMu.Unlock() - if caughtUp { + if bs.len() == 0 { return nil } }