Skip to content

Commit 3488552

Browse files
committed
Improve tracking of apiInflight metric
Signed-off-by: Neil Twigg <[email protected]>
1 parent 631340b commit 3488552

File tree

3 files changed

+9
-6
lines changed

3 files changed

+9
-6
lines changed

server/ipqueue.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,14 +283,16 @@ func (q *ipQueue[T]) size() uint64 {
283283
}
284284

285285
// Empty the queue and consumes the notification signal if present.
286+
// Returns the number of items that were drained from the queue.
286287
// Note that this could cause a reader go routine that has been
287288
// notified that there is something in the queue (reading from queue's `ch`)
288289
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
289-
func (q *ipQueue[T]) drain() {
290+
func (q *ipQueue[T]) drain() int {
290291
if q == nil {
291-
return
292+
return 0
292293
}
293294
q.Lock()
295+
olen := len(q.elts) - q.pos
294296
q.elts, q.pos, q.sz = nil, 0, 0
295297
// Consume the signal if it was present to reduce the chance of a reader
296298
// routine to be think that there is something in the queue...
@@ -299,6 +301,7 @@ func (q *ipQueue[T]) drain() {
299301
default:
300302
}
301303
q.Unlock()
304+
return olen
302305
}
303306

304307
// Since the length of the queue goes to 0 after a pop(), it is good to

server/jetstream_api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -888,12 +888,14 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
888888
// Check pending and warn if getting backed up.
889889
limit := atomic.LoadInt64(&js.queueLimit)
890890
retry:
891+
atomic.AddInt64(&js.apiInflight, 1)
891892
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
892893
if pending >= int(limit) {
893894
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
894895
// If we were able to take one of the oldest items off the queue, then
895896
// retry the insert.
896897
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
898+
atomic.AddInt64(&js.apiInflight, -1)
897899
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
898900
TypedEvent: TypedEvent{
899901
Type: JSAPILimitReachedAdvisoryType,
@@ -911,7 +913,7 @@ retry:
911913
// then something is wrong for us to be both over the limit but unable to pull entries, so
912914
// throw everything away and hope we recover from it.
913915
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
914-
s.jsAPIRoutedReqs.drain()
916+
atomic.AddInt64(&js.apiInflight, -int64(s.jsAPIRoutedReqs.drain()))
915917

916918
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
917919
TypedEvent: TypedEvent{
@@ -923,8 +925,6 @@ retry:
923925
Domain: js.config.Domain,
924926
Dropped: int64(pending),
925927
})
926-
} else {
927-
atomic.StoreInt64(&js.apiInflight, int64(pending))
928928
}
929929
}
930930

server/raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1962,7 +1962,7 @@ runner:
19621962
// just will remove them from the central monitoring map
19631963
queues := []interface {
19641964
unregister()
1965-
drain()
1965+
drain() int
19661966
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
19671967
for _, q := range queues {
19681968
q.drain()

0 commit comments

Comments
 (0)