From 092a189b793e93f082af1436f4e11806767a1ea5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 8 Jan 2025 10:24:53 +0000 Subject: [PATCH] JetStream API routed queue changes (LIFO) Signed-off-by: Neil Twigg --- server/ipqueue.go | 37 ++++++++++++ server/ipqueue_test.go | 128 +++++++++++++++++++++++++++++++++++++++- server/jetstream_api.go | 33 ++++++++--- 3 files changed, 189 insertions(+), 9 deletions(-) diff --git a/server/ipqueue.go b/server/ipqueue.go index 094c522ee20..69f63ef4719 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -207,6 +207,43 @@ func (q *ipQueue[T]) popOne() (T, bool) { return e, true } +// Returns the last element from the queue, if any. See comment above +// regarding calling after being notified that there is something and +// the use of drain(). In short, the caller should always check the +// boolean return value to ensure that the value is genuine and not a +// default empty value. +func (q *ipQueue[T]) popOneLast() (T, bool) { + q.Lock() + l := len(q.elts) - q.pos + if l == 0 { + q.Unlock() + var empty T + return empty, false + } + e := q.elts[len(q.elts)-1] + q.elts = q.elts[:len(q.elts)-1] + if l--; l > 0 { + if q.calc != nil { + q.sz -= q.calc(e) + } + // We need to re-signal + select { + case q.ch <- struct{}{}: + default: + } + } else { + // We have just emptied the queue, so we can reuse unless it is too big. + if cap(q.elts) <= q.mrs { + q.elts = q.elts[:0] + } else { + q.elts = nil + } + q.pos, q.sz = 0, 0 + } + q.Unlock() + return e, true +} + // After a pop(), the slice can be recycled for the next push() when // a first element is added to the queue. // This will also decrement the "in progress" count with the length diff --git a/server/ipqueue_test.go b/server/ipqueue_test.go index 8795ba705ff..1688b7976f3 100644 --- a/server/ipqueue_test.go +++ b/server/ipqueue_test.go @@ -250,6 +250,99 @@ func TestIPQueuePopOne(t *testing.T) { q.recycle(&values) } +func TestIPQueuePopOneLast(t *testing.T) { + s := &Server{} + q := newIPQueue[int](s, "test") + q.push(1) + <-q.ch + e, ok := q.popOneLast() + if !ok { + t.Fatal("Got nil") + } + if i := e; i != 1 { + t.Fatalf("Expected 1, got %v", i) + } + if l := q.len(); l != 0 { + t.Fatalf("Expected len to be 0, got %v", l) + } + // That does not affect the number of notProcessed + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } + select { + case <-q.ch: + t.Fatalf("Should not have been notified of addition") + default: + // OK + } + q.push(2) + q.push(3) + e, ok = q.popOneLast() + if !ok { + t.Fatal("Got nil") + } + if i := e; i != 3 { + t.Fatalf("Expected 3, got %v", i) + } + if l := q.len(); l != 1 { + t.Fatalf("Expected len to be 1, got %v", l) + } + select { + case <-q.ch: + // OK + default: + t.Fatalf("Should have been notified that there is more") + } + e, ok = q.popOneLast() + if !ok { + t.Fatal("Got nil") + } + if i := e; i != 2 { + t.Fatalf("Expected 2, got %v", i) + } + if l := q.len(); l != 0 { + t.Fatalf("Expected len to be 0, got %v", l) + } + select { + case <-q.ch: + t.Fatalf("Should not have been notified that there is more") + default: + // OK + } + // Calling it again now that we know there is nothing, we + // should get nil. + if e, ok = q.popOneLast(); ok { + t.Fatalf("Expected nil, got %v", e) + } + + q = newIPQueue[int](s, "test2") + q.push(1) + q.push(2) + // Capture current capacity + q.Lock() + c := cap(q.elts) + q.Unlock() + e, ok = q.popOneLast() + if !ok || e != 2 { + t.Fatalf("Invalid value: %v", e) + } + if l := q.len(); l != 1 { + t.Fatalf("Expected len to be 1, got %v", l) + } + values := q.pop() + if len(values) != 1 || values[0] != 1 { + t.Fatalf("Unexpected values: %v", values) + } + if cap(values) != c { + t.Fatalf("Unexpected capacity: %v vs %v", cap(values), c) + } + if l := q.len(); l != 0 { + t.Fatalf("Expected len to be 0, got %v", l) + } + // Just make sure that this is ok... + q.recycle(&values) +} + func TestIPQueueMultiProducers(t *testing.T) { s := &Server{} q := newIPQueue[int](s, "test") @@ -382,7 +475,36 @@ func TestIPQueueDrain(t *testing.T) { } } -func TestIPQueueSizeCalculation(t *testing.T) { +func TestIPQueueSizeCalculationPopOne(t *testing.T) { + type testType = [16]byte + var testValue testType + + calc := ipqSizeCalculation[testType](func(e testType) uint64 { + return uint64(len(e)) + }) + s := &Server{} + q := newIPQueue[testType](s, "test", calc) + + for i := 0; i < 10; i++ { + testValue[0] = byte(i) + q.push(testValue) + require_Equal(t, q.len(), i+1) + require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue))) + } + + for i := 10; i > 5; i-- { + v, _ := q.popOne() + require_Equal(t, 10-v[0], byte(i)) + require_Equal(t, q.len(), i-1) + require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue))) + } + + q.pop() + require_Equal(t, q.len(), 0) + require_Equal(t, q.size(), 0) +} + +func TestIPQueueSizeCalculationPopOneLast(t *testing.T) { type testType = [16]byte var testValue testType @@ -393,13 +515,15 @@ func TestIPQueueSizeCalculation(t *testing.T) { q := newIPQueue[testType](s, "test", calc) for i := 0; i < 10; i++ { + testValue[0] = byte(i) q.push(testValue) require_Equal(t, q.len(), i+1) require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue))) } for i := 10; i > 5; i-- { - q.popOne() + v, _ := q.popOneLast() + require_Equal(t, v[0]+1, byte(i)) require_Equal(t, q.len(), i-1) require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue))) } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index e41ccf3137d..303aaf358f2 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -886,18 +886,37 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // If we are here we have received this request over a non-client connection. // We need to make sure not to block. We will send the request to a long-lived // pool of go routines. - - // Increment inflight. Do this before queueing. - atomic.AddInt64(&js.apiInflight, 1) - // Copy the state. Note the JSAPI only uses the hdr index to piece apart the // header from the msg body. No other references are needed. // Check pending and warn if getting backed up. - pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) limit := atomic.LoadInt64(&js.queueLimit) +retry: + atomic.AddInt64(&js.apiInflight, 1) + pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) if pending >= int(limit) { - s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) + if _, ok := s.jsAPIRoutedReqs.popOne(); ok { + // If we were able to take one of the oldest items off the queue, then + // retry the insert. + s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request") + atomic.AddInt64(&js.apiInflight, -1) + s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ + TypedEvent: TypedEvent{ + Type: JSAPILimitReachedAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Server: s.Name(), + Domain: js.config.Domain, + Dropped: 1, + }) + goto retry + } + + // It's likely not possible to get to this point, but if for some reason we have got here, + // then something is wrong for us to be both over the limit but unable to pull entries, so + // throw everything away and hope we recover from it. drained := int64(s.jsAPIRoutedReqs.drain()) + s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", drained) atomic.AddInt64(&js.apiInflight, -drained) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ @@ -929,7 +948,7 @@ func (s *Server) processJSAPIRoutedRequests() { // Only pop one item at a time here, otherwise if the system is recovering // from queue buildup, then one worker will pull off all the tasks and the // others will be starved of work. - for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() { + for r, ok := queue.popOneLast(); ok && r != nil; r, ok = queue.popOneLast() { client.pa = r.pa start := time.Now() r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)