@@ -122,18 +122,16 @@ func NewBufferedSender(
122122		sender :  sender ,
123123		metrics : bsMetrics ,
124124	}
125- 	bs .queueMu .buffer  =  newEventQueue ()
126125	bs .notifyDataC  =  make (chan  struct {}, 1 )
127126	bs .queueMu .buffer  =  newEventQueue ()
128127	bs .queueMu .perStreamCapacity  =  RangefeedSingleBufferedSenderQueueMaxPerReg .Get (& settings .SV )
129128	bs .queueMu .byStream  =  make (map [int64 ]streamStatus )
130129	return  bs 
131130}
132131
133- // sendBuffered buffers the event before sending it to the underlying 
134- // gRPC stream. It does not block. sendBuffered will take the 
135- // ownership of the alloc and release it if the returned error is 
136- // non-nil. It only errors in the case of an already stopped stream. 
132+ // sendBuffered buffers the event before sending it to the underlying gRPC 
133+ // stream. It does not block. It errors in the case of a stopped sender of if 
134+ // the registration has exceeded its capacity. 
137135func  (bs  * BufferedSender ) sendBuffered (
138136	ev  * kvpb.MuxRangeFeedEvent , alloc  * SharedBudgetAllocation ,
139137) error  {
@@ -174,7 +172,12 @@ func (bs *BufferedSender) sendBuffered(
174172			// as disconnected. 
175173			// 
176174			// The only unfortunate exception is if we get disconnected while flushing 
177- 			// the catch-up scan buffer. 
175+ 			// the catch-up scan buffer. In this case we admit the event and stay in 
176+ 			// state overflowing until we actually receive the error. 
177+ 			// 
178+ 			// TODO(ssd): Given the above exception, we should perhaps just move 
179+ 			// directly to streamOverflowed. But, I think instead we want to remove 
180+ 			// that exception if possible. 
178181			if  ev .Error  !=  nil  {
179182				status .state  =  streamOverflowed 
180183			}
@@ -256,13 +259,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
256259	if  ok  {
257260		state , streamFound  :=  bs .queueMu .byStream [event .ev .StreamID ]
258261		if  streamFound  {
259- 			state .queueItems   -=   1 
262+ 			state .queueItems -- 
260263			bs .queueMu .byStream [event .ev .StreamID ] =  state 
261264		}
262265	}
263266	return  event , ok 
264267}
265268
269+ // addStream initializes the per-stream tracking for the given streamID. 
266270func  (bs  * BufferedSender ) addStream (streamID  int64 ) {
267271	bs .queueMu .Lock ()
268272	defer  bs .queueMu .Unlock ()
@@ -275,6 +279,12 @@ func (bs *BufferedSender) addStream(streamID int64) {
275279	}
276280}
277281
282+ // removeStream removes the per-stream state tracking from the sender. 
283+ // 
284+ // TODO(ssd): There may be items still in the queue when removeStream is called. 
285+ // We'd like to solve this by removing this as a possibility. But this is OK 
286+ // since we will eventually process the events and the client knows to ignore 
287+ // them. 
278288func  (bs  * BufferedSender ) removeStream (streamID  int64 ) {
279289	bs .queueMu .Lock ()
280290	defer  bs .queueMu .Unlock ()
@@ -308,10 +318,7 @@ func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
308318		MaxRetries :     50 ,
309319	}
310320	for  re  :=  retry .StartWithCtx (ctx , opts ); re .Next (); {
311- 		bs .queueMu .Lock ()
312- 		caughtUp  :=  bs .queueMu .buffer .len () ==  0  // nolint:deferunlockcheck 
313- 		bs .queueMu .Unlock ()
314- 		if  caughtUp  {
321+ 		if  bs .len () ==  0  {
315322			return  nil 
316323		}
317324	}
0 commit comments