Skip to content

Commit f0de6ac

Browse files
committed
Improve tracking of apiInflight metric
Signed-off-by: Neil Twigg <[email protected]>
1 parent 631340b commit f0de6ac

File tree

3 files changed

+9
-5
lines changed

3 files changed

+9
-5
lines changed

server/ipqueue.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,14 +283,16 @@ func (q *ipQueue[T]) size() uint64 {
283283
}
284284

285285
// Empty the queue and consumes the notification signal if present.
286+
// Returns the number of items that were drained from the queue.
286287
// Note that this could cause a reader go routine that has been
287288
// notified that there is something in the queue (reading from queue's `ch`)
288289
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
289-
func (q *ipQueue[T]) drain() {
290+
func (q *ipQueue[T]) drain() int {
290291
if q == nil {
291-
return
292+
return 0
292293
}
293294
q.Lock()
295+
olen := len(q.elts) - q.pos
294296
q.elts, q.pos, q.sz = nil, 0, 0
295297
// Consume the signal if it was present to reduce the chance of a reader
296298
// routine to be think that there is something in the queue...
@@ -299,6 +301,7 @@ func (q *ipQueue[T]) drain() {
299301
default:
300302
}
301303
q.Unlock()
304+
return olen
302305
}
303306

304307
// Since the length of the queue goes to 0 after a pop(), it is good to

server/jetstream_api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,7 @@ retry:
894894
// If we were able to take one of the oldest items off the queue, then
895895
// retry the insert.
896896
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
897+
atomic.AddInt64(&js.apiInflight, -1)
897898
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
898899
TypedEvent: TypedEvent{
899900
Type: JSAPILimitReachedAdvisoryType,
@@ -911,7 +912,7 @@ retry:
911912
// then something is wrong for us to be both over the limit but unable to pull entries, so
912913
// throw everything away and hope we recover from it.
913914
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
914-
s.jsAPIRoutedReqs.drain()
915+
atomic.AddInt64(&js.apiInflight, -int64(s.jsAPIRoutedReqs.drain()))
915916

916917
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
917918
TypedEvent: TypedEvent{
@@ -924,7 +925,7 @@ retry:
924925
Dropped: int64(pending),
925926
})
926927
} else {
927-
atomic.StoreInt64(&js.apiInflight, int64(pending))
928+
atomic.AddInt64(&js.apiInflight, 1)
928929
}
929930
}
930931

server/raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1962,7 +1962,7 @@ runner:
19621962
// just will remove them from the central monitoring map
19631963
queues := []interface {
19641964
unregister()
1965-
drain()
1965+
drain() int
19661966
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
19671967
for _, q := range queues {
19681968
q.drain()

0 commit comments

Comments
 (0)