Skip to content

Commit 092a189

Browse files
committed
JetStream API routed queue changes (LIFO)
Signed-off-by: Neil Twigg <[email protected]>
1 parent 9f5b27a commit 092a189

File tree

3 files changed

+189
-9
lines changed

3 files changed

+189
-9
lines changed

server/ipqueue.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,43 @@ func (q *ipQueue[T]) popOne() (T, bool) {
207207
return e, true
208208
}
209209

210+
// Returns the last element from the queue, if any. See comment above
211+
// regarding calling after being notified that there is something and
212+
// the use of drain(). In short, the caller should always check the
213+
// boolean return value to ensure that the value is genuine and not a
214+
// default empty value.
215+
func (q *ipQueue[T]) popOneLast() (T, bool) {
216+
q.Lock()
217+
l := len(q.elts) - q.pos
218+
if l == 0 {
219+
q.Unlock()
220+
var empty T
221+
return empty, false
222+
}
223+
e := q.elts[len(q.elts)-1]
224+
q.elts = q.elts[:len(q.elts)-1]
225+
if l--; l > 0 {
226+
if q.calc != nil {
227+
q.sz -= q.calc(e)
228+
}
229+
// We need to re-signal
230+
select {
231+
case q.ch <- struct{}{}:
232+
default:
233+
}
234+
} else {
235+
// We have just emptied the queue, so we can reuse unless it is too big.
236+
if cap(q.elts) <= q.mrs {
237+
q.elts = q.elts[:0]
238+
} else {
239+
q.elts = nil
240+
}
241+
q.pos, q.sz = 0, 0
242+
}
243+
q.Unlock()
244+
return e, true
245+
}
246+
210247
// After a pop(), the slice can be recycled for the next push() when
211248
// a first element is added to the queue.
212249
// This will also decrement the "in progress" count with the length

server/ipqueue_test.go

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,99 @@ func TestIPQueuePopOne(t *testing.T) {
250250
q.recycle(&values)
251251
}
252252

253+
func TestIPQueuePopOneLast(t *testing.T) {
254+
s := &Server{}
255+
q := newIPQueue[int](s, "test")
256+
q.push(1)
257+
<-q.ch
258+
e, ok := q.popOneLast()
259+
if !ok {
260+
t.Fatal("Got nil")
261+
}
262+
if i := e; i != 1 {
263+
t.Fatalf("Expected 1, got %v", i)
264+
}
265+
if l := q.len(); l != 0 {
266+
t.Fatalf("Expected len to be 0, got %v", l)
267+
}
268+
// That does not affect the number of notProcessed
269+
if n := q.inProgress(); n != 0 {
270+
t.Fatalf("Expected count to be 0, got %v", n)
271+
}
272+
select {
273+
case <-q.ch:
274+
t.Fatalf("Should not have been notified of addition")
275+
default:
276+
// OK
277+
}
278+
q.push(2)
279+
q.push(3)
280+
e, ok = q.popOneLast()
281+
if !ok {
282+
t.Fatal("Got nil")
283+
}
284+
if i := e; i != 3 {
285+
t.Fatalf("Expected 3, got %v", i)
286+
}
287+
if l := q.len(); l != 1 {
288+
t.Fatalf("Expected len to be 1, got %v", l)
289+
}
290+
select {
291+
case <-q.ch:
292+
// OK
293+
default:
294+
t.Fatalf("Should have been notified that there is more")
295+
}
296+
e, ok = q.popOneLast()
297+
if !ok {
298+
t.Fatal("Got nil")
299+
}
300+
if i := e; i != 2 {
301+
t.Fatalf("Expected 2, got %v", i)
302+
}
303+
if l := q.len(); l != 0 {
304+
t.Fatalf("Expected len to be 0, got %v", l)
305+
}
306+
select {
307+
case <-q.ch:
308+
t.Fatalf("Should not have been notified that there is more")
309+
default:
310+
// OK
311+
}
312+
// Calling it again now that we know there is nothing, we
313+
// should get nil.
314+
if e, ok = q.popOneLast(); ok {
315+
t.Fatalf("Expected nil, got %v", e)
316+
}
317+
318+
q = newIPQueue[int](s, "test2")
319+
q.push(1)
320+
q.push(2)
321+
// Capture current capacity
322+
q.Lock()
323+
c := cap(q.elts)
324+
q.Unlock()
325+
e, ok = q.popOneLast()
326+
if !ok || e != 2 {
327+
t.Fatalf("Invalid value: %v", e)
328+
}
329+
if l := q.len(); l != 1 {
330+
t.Fatalf("Expected len to be 1, got %v", l)
331+
}
332+
values := q.pop()
333+
if len(values) != 1 || values[0] != 1 {
334+
t.Fatalf("Unexpected values: %v", values)
335+
}
336+
if cap(values) != c {
337+
t.Fatalf("Unexpected capacity: %v vs %v", cap(values), c)
338+
}
339+
if l := q.len(); l != 0 {
340+
t.Fatalf("Expected len to be 0, got %v", l)
341+
}
342+
// Just make sure that this is ok...
343+
q.recycle(&values)
344+
}
345+
253346
func TestIPQueueMultiProducers(t *testing.T) {
254347
s := &Server{}
255348
q := newIPQueue[int](s, "test")
@@ -382,7 +475,36 @@ func TestIPQueueDrain(t *testing.T) {
382475
}
383476
}
384477

385-
func TestIPQueueSizeCalculation(t *testing.T) {
478+
func TestIPQueueSizeCalculationPopOne(t *testing.T) {
479+
type testType = [16]byte
480+
var testValue testType
481+
482+
calc := ipqSizeCalculation[testType](func(e testType) uint64 {
483+
return uint64(len(e))
484+
})
485+
s := &Server{}
486+
q := newIPQueue[testType](s, "test", calc)
487+
488+
for i := 0; i < 10; i++ {
489+
testValue[0] = byte(i)
490+
q.push(testValue)
491+
require_Equal(t, q.len(), i+1)
492+
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
493+
}
494+
495+
for i := 10; i > 5; i-- {
496+
v, _ := q.popOne()
497+
require_Equal(t, 10-v[0], byte(i))
498+
require_Equal(t, q.len(), i-1)
499+
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
500+
}
501+
502+
q.pop()
503+
require_Equal(t, q.len(), 0)
504+
require_Equal(t, q.size(), 0)
505+
}
506+
507+
func TestIPQueueSizeCalculationPopOneLast(t *testing.T) {
386508
type testType = [16]byte
387509
var testValue testType
388510

@@ -393,13 +515,15 @@ func TestIPQueueSizeCalculation(t *testing.T) {
393515
q := newIPQueue[testType](s, "test", calc)
394516

395517
for i := 0; i < 10; i++ {
518+
testValue[0] = byte(i)
396519
q.push(testValue)
397520
require_Equal(t, q.len(), i+1)
398521
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
399522
}
400523

401524
for i := 10; i > 5; i-- {
402-
q.popOne()
525+
v, _ := q.popOneLast()
526+
require_Equal(t, v[0]+1, byte(i))
403527
require_Equal(t, q.len(), i-1)
404528
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
405529
}

server/jetstream_api.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -886,18 +886,37 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
886886
// If we are here we have received this request over a non-client connection.
887887
// We need to make sure not to block. We will send the request to a long-lived
888888
// pool of go routines.
889-
890-
// Increment inflight. Do this before queueing.
891-
atomic.AddInt64(&js.apiInflight, 1)
892-
893889
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
894890
// header from the msg body. No other references are needed.
895891
// Check pending and warn if getting backed up.
896-
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
897892
limit := atomic.LoadInt64(&js.queueLimit)
893+
retry:
894+
atomic.AddInt64(&js.apiInflight, 1)
895+
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
898896
if pending >= int(limit) {
899-
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
897+
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
898+
// If we were able to take one of the oldest items off the queue, then
899+
// retry the insert.
900+
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
901+
atomic.AddInt64(&js.apiInflight, -1)
902+
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
903+
TypedEvent: TypedEvent{
904+
Type: JSAPILimitReachedAdvisoryType,
905+
ID: nuid.Next(),
906+
Time: time.Now().UTC(),
907+
},
908+
Server: s.Name(),
909+
Domain: js.config.Domain,
910+
Dropped: 1,
911+
})
912+
goto retry
913+
}
914+
915+
// It's likely not possible to get to this point, but if for some reason we have got here,
916+
// then something is wrong for us to be both over the limit but unable to pull entries, so
917+
// throw everything away and hope we recover from it.
900918
drained := int64(s.jsAPIRoutedReqs.drain())
919+
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", drained)
901920
atomic.AddInt64(&js.apiInflight, -drained)
902921

903922
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
@@ -929,7 +948,7 @@ func (s *Server) processJSAPIRoutedRequests() {
929948
// Only pop one item at a time here, otherwise if the system is recovering
930949
// from queue buildup, then one worker will pull off all the tasks and the
931950
// others will be starved of work.
932-
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
951+
for r, ok := queue.popOneLast(); ok && r != nil; r, ok = queue.popOneLast() {
933952
client.pa = r.pa
934953
start := time.Now()
935954
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)

0 commit comments

Comments
 (0)