Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ adhere to [Semantic Versioning](http://semver.org/spec/v2.0.0.html) starting v1.

- Remove dependency: github.com/pkg/errors (#443)
- Add public Cache.RemainingCost() method
- Implement public Cache.Iter() method
- Make tests concurrent

**Fixed**

Expand Down
76 changes: 43 additions & 33 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ var (
setBufSize = 32 * 1024
)

const itemSize = int64(unsafe.Sizeof(storeItem[any]{}))

func zeroValue[T any]() T {
var zero T
return zero
Expand All @@ -40,19 +38,19 @@ type Key = z.Key
// from as many goroutines as you want.
type Cache[K Key, V any] struct {
// storedItems is the central concurrent hashmap where key-value items are stored.
storedItems store[V]
storedItems store[K, V]
// cachePolicy determines what gets let in to the cache and what gets kicked out.
cachePolicy *defaultPolicy[V]
cachePolicy *defaultPolicy[K, V]
// getBuf is a custom ring buffer implementation that gets pushed to when
// keys are read.
getBuf *ringBuffer
// setBuf is a buffer allowing us to batch/drop Sets during times of high
// contention.
setBuf chan *Item[V]
setBuf chan *Item[K, V]
// onEvict is called for item evictions.
onEvict func(*Item[V])
onEvict func(*Item[K, V])
// onReject is called when an item is rejected via admission policy.
onReject func(*Item[V])
onReject func(*Item[K, V])
// onExit is called whenever a value goes out of scope from the cache.
onExit (func(V))
// KeyToHash function is used to customize the key hashing algorithm.
Expand All @@ -74,6 +72,8 @@ type Cache[K Key, V any] struct {
// Metrics contains a running log of important statistics like hits, misses,
// and dropped items.
Metrics *Metrics
// itemSize is size of each item im store
itemSize int64
}

// Config is passed to NewCache for creating new Cache instances.
Expand Down Expand Up @@ -127,10 +127,10 @@ type Config[K Key, V any] struct {
Metrics bool

// OnEvict is called for every eviction with the evicted item.
OnEvict func(item *Item[V])
OnEvict func(item *Item[K, V])

// OnReject is called for every rejection done via the policy.
OnReject func(item *Item[V])
OnReject func(item *Item[K, V])

// OnExit is called whenever a value is removed from cache. This can be
// used to do manual memory deallocation. Would also be called on eviction
Expand Down Expand Up @@ -191,14 +191,15 @@ const (
)

// Item is a full representation of what's stored in the cache for each key-value pair.
type Item[V any] struct {
flag itemFlag
Key uint64
Conflict uint64
Value V
Cost int64
Expiration time.Time
wait chan struct{}
type Item[K Key, V any] struct {
flag itemFlag
Key uint64
OriginalKey K
Conflict uint64
Value V
Cost int64
Expiration time.Time
wait chan struct{}
}

// NewCache returns a new Cache instance and any configuration errors, if any.
Expand All @@ -219,32 +220,33 @@ func NewCache[K Key, V any](config *Config[K, V]) (*Cache[K, V], error) {
case config.TtlTickerDurationInSec == 0:
config.TtlTickerDurationInSec = bucketDurationSecs
}
policy := newPolicy[V](config.NumCounters, config.MaxCost)
policy := newPolicy[K, V](config.NumCounters, config.MaxCost)
cache := &Cache[K, V]{
storedItems: newStore[V](),
storedItems: newStore[K, V](),
cachePolicy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *Item[V], setBufSize),
setBuf: make(chan *Item[K, V], setBufSize),
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
done: make(chan struct{}),
cost: config.Cost,
ignoreInternalCost: config.IgnoreInternalCost,
cleanupTicker: time.NewTicker(time.Duration(config.TtlTickerDurationInSec) * time.Second / 2),
itemSize: int64(unsafe.Sizeof(storeItem[K, V]{})),
}
cache.storedItems.SetShouldUpdateFn(config.ShouldUpdate)
cache.onExit = func(val V) {
if config.OnExit != nil {
config.OnExit(val)
}
}
cache.onEvict = func(item *Item[V]) {
cache.onEvict = func(item *Item[K, V]) {
if config.OnEvict != nil {
config.OnEvict(item)
}
cache.onExit(item.Value)
}
cache.onReject = func(item *Item[V]) {
cache.onReject = func(item *Item[K, V]) {
if config.OnReject != nil {
config.OnReject(item)
}
Expand All @@ -271,7 +273,7 @@ func (c *Cache[K, V]) Wait() {
return
}
wait := make(chan struct{})
c.setBuf <- &Item[V]{wait: wait}
c.setBuf <- &Item[K, V]{wait: wait}
<-wait
}

Expand Down Expand Up @@ -337,13 +339,14 @@ func (c *Cache[K, V]) SetWithTTL(key K, value V, cost int64, ttl time.Duration)
}

keyHash, conflictHash := c.keyToHash(key)
i := &Item[V]{
flag: itemNew,
Key: keyHash,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
i := &Item[K, V]{
flag: itemNew,
Key: keyHash,
OriginalKey: key,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
}
// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
Expand Down Expand Up @@ -380,7 +383,7 @@ func (c *Cache[K, V]) Del(key K) {
// So we must push the same item to `setBuf` with the deletion flag.
// This ensures that if a set is followed by a delete, it will be
// applied in the correct order.
c.setBuf <- &Item[V]{
c.setBuf <- &Item[K, V]{
flag: itemDelete,
Key: keyHash,
Conflict: conflictHash,
Expand Down Expand Up @@ -414,6 +417,13 @@ func (c *Cache[K, V]) GetTTL(key K) (time.Duration, bool) {
return time.Until(expiration), true
}

// Iter iterates the elements of the Map, passing them to the callback.
// It guarantees that any key in the Map will be visited only once.
// The set of keys visited by Iter is non-deterministic.
func (c *Cache[K, V]) Iter(cb func(k K, v V) (stop bool)) {
c.storedItems.Iter(cb)
}

// Close stops all goroutines and closes all channels.
func (c *Cache[K, V]) Close() {
if c == nil || c.isClosed.Load() {
Expand Down Expand Up @@ -516,7 +526,7 @@ func (c *Cache[K, V]) processItems() {
}
}
}
onEvict := func(i *Item[V]) {
onEvict := func(i *Item[K, V]) {
if ts, has := startTs[i.Key]; has {
c.Metrics.trackEviction(int64(time.Since(ts) / time.Second))
delete(startTs, i.Key)
Expand All @@ -539,7 +549,7 @@ func (c *Cache[K, V]) processItems() {
}
if !c.ignoreInternalCost {
// Add the cost of internally storing the object.
i.Cost += itemSize
i.Cost += c.itemSize
}

switch i.flag {
Expand Down
Loading