Skip to content

Commit cbc3355

Browse files
committed
Limit the number of in-memory cached blocks per store
Signed-off-by: Neil Twigg <[email protected]>
1 parent e77abce commit cbc3355

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

server/filestore.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ type fileStore struct {
205205
ttls *thw.HashWheel
206206
sdm *SDMMeta
207207
lpex time.Time // Last PurgeEx call.
208+
evict *ipQueue[*msgBlock]
208209
}
209210

210211
// Represents a message store block and its data.
@@ -313,6 +314,8 @@ const (
313314
maxBufReuse = 2 * 1024 * 1024
314315
// default cache buffer expiration
315316
defaultCacheBufferExpiration = 10 * time.Second
317+
// deault cache size before eviction
318+
defaultCacheEvictionThreshold = 32
316319
// default sync interval
317320
defaultSyncInterval = 2 * time.Minute
318321
// default idle timeout to close FDs.
@@ -420,6 +423,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
420423
qch: make(chan struct{}),
421424
fsld: make(chan struct{}),
422425
srv: fcfg.srv,
426+
evict: newIPQueue[*msgBlock](nil, _EMPTY_),
423427
}
424428

425429
// Register with access time service.
@@ -7058,6 +7062,14 @@ checkCache:
70587062
if len(buf) > 0 {
70597063
mb.cloads++
70607064
mb.startCacheExpireTimer()
7065+
7066+
// If loading this block into the cache caused us to reach the eviction
7067+
// threshold for cached blocks, evict the oldest one.
7068+
if n, _ := mb.fs.evict.push(mb); n == defaultCacheEvictionThreshold {
7069+
if emb, ok := mb.fs.evict.popOne(); ok && emb != mb {
7070+
emb.tryForceExpireCache()
7071+
}
7072+
}
70617073
}
70627074

70637075
return nil

server/ipqueue.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,17 @@ func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T
9595
},
9696
},
9797
name: name,
98-
m: &s.ipQueues,
9998
ipQueueOpts: ipQueueOpts[T]{
10099
mrs: ipQueueDefaultMaxRecycleSize,
101100
},
102101
}
102+
if s != nil {
103+
q.m = &s.ipQueues
104+
q.m.Store(name, q)
105+
}
103106
for _, o := range opts {
104107
o(&q.ipQueueOpts)
105108
}
106-
s.ipQueues.Store(name, q)
107109
return q
108110
}
109111

@@ -279,7 +281,7 @@ func (q *ipQueue[T]) inProgress() int64 {
279281
// Remove this queue from the server's map of ipQueues.
280282
// All ipQueue operations (such as push/pop/etc..) are still possible.
281283
func (q *ipQueue[T]) unregister() {
282-
if q == nil {
284+
if q == nil || q.m == nil {
283285
return
284286
}
285287
q.m.Delete(q.name)

0 commit comments

Comments
 (0)