diff --git a/cmd/mo-service/config.go b/cmd/mo-service/config.go index 897323c2564d7..f5169e9414009 100644 --- a/cmd/mo-service/config.go +++ b/cmd/mo-service/config.go @@ -301,7 +301,7 @@ func (c *Config) setDefaultValue() error { func (c *Config) initMetaCache() { if c.MetaCache.MemoryCapacity > 0 { - objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity)) + objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo) } } diff --git a/pkg/embed/config.go b/pkg/embed/config.go index 02763283e710b..79f0a842c65f4 100644 --- a/pkg/embed/config.go +++ b/pkg/embed/config.go @@ -279,7 +279,7 @@ func (c *ServiceConfig) setDefaultValue() error { func (c *ServiceConfig) initMetaCache() { if c.MetaCache.MemoryCapacity > 0 { - objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity)) + objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo) } } diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 57a3bbcd957b9..853db8481d67a 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -16,53 +16,75 @@ package fileservice import ( "context" - "sync/atomic" + "sync" "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/matrixorigin/matrixone/pkg/fileservice/fscache" ) type Bytes struct { + mu sync.Mutex bytes []byte deallocator malloc.Deallocator - deallocated uint32 - _refs atomic.Int32 - refs *atomic.Int32 + refs int32 } func (b *Bytes) Size() int64 { + b.mu.Lock() + defer b.mu.Unlock() + if b.bytes == nil { + panic("fileservice.Bytes.Size() buffer already deallocated") + } return int64(len(b.bytes)) } func (b *Bytes) Bytes() []byte { + b.mu.Lock() + defer b.mu.Unlock() + if b.bytes == nil { + panic("fileservice.Bytes.Bytes() buffer already deallocated") + } return b.bytes } func (b *Bytes) Slice(length int) fscache.Data { + b.mu.Lock() + defer b.mu.Unlock() + if b.bytes == nil { + panic("fileservice.Bytes.Slice() buffer already deallocated") + } b.bytes = b.bytes[:length] return b } func (b *Bytes) Retain() { - if b.refs != nil { - b.refs.Add(1) + b.mu.Lock() + defer b.mu.Unlock() + + if b.bytes == nil { + panic("fileservice.Bytes.Retain() buffer already deallocated") } + + b.refs += 1 } -func (b *Bytes) Release() { - if b.refs != nil { - if n := b.refs.Add(-1); n == 0 { - if b.deallocator != nil && - atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { - b.deallocator.Deallocate(malloc.NoHints) - } - } - } else { - if b.deallocator != nil && - atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { +func (b *Bytes) Release() bool { + b.mu.Lock() + defer b.mu.Unlock() + + if b.bytes == nil { + panic("fileservice.Bytes.Release() double free") + } + + b.refs -= 1 + if b.refs == 0 { + if b.deallocator != nil { b.deallocator.Deallocate(malloc.NoHints) + b.bytes = nil + return true } } + return false } type bytesAllocator struct { @@ -79,9 +101,8 @@ func (b *bytesAllocator) allocateCacheData(size int, hints malloc.Hints) fscache bytes := &Bytes{ bytes: slice, deallocator: dec, + refs: 1, } - bytes._refs.Store(1) - bytes.refs = &bytes._refs return bytes } diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index 0859bdbb0445b..115aabec7b06d 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -28,7 +28,28 @@ func TestBytes(t *testing.T) { bs := &Bytes{ bytes: bytes, deallocator: deallocator, + refs: 1, } bs.Release() }) } + +func TestBytesPanic(t *testing.T) { + bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) + assert.Nil(t, err) + bs := &Bytes{ + bytes: bytes, + deallocator: deallocator, + refs: 1, + } + + released := bs.Release() + assert.Equal(t, released, true) + + assert.Panics(t, func() { bs.Release() }, "Bytes.Release panic()") + assert.Panics(t, func() { bs.Size() }, "Bytes.Size panic()") + assert.Panics(t, func() { bs.Bytes() }, "Bytes.Bytes panic()") + assert.Panics(t, func() { bs.Slice(0) }, "Bytes.Slice panic()") + assert.Panics(t, func() { bs.Retain() }, "Bytes.Retain panic()") + +} diff --git a/pkg/fileservice/cache.go b/pkg/fileservice/cache.go index f48e7cc13d1e3..1a43c5955e7c9 100644 --- a/pkg/fileservice/cache.go +++ b/pkg/fileservice/cache.go @@ -43,6 +43,7 @@ type CacheConfig struct { RemoteCacheEnabled bool `toml:"remote-cache-enabled"` RPC morpc.Config `toml:"rpc"` CheckOverlaps bool `toml:"check-overlaps"` + DisableS3Fifo bool `toml:"disable-s3fifo"` QueryClient client.QueryClient `json:"-"` KeyRouterFactory KeyRouterFactory[pb.CacheKey] `json:"-"` diff --git a/pkg/fileservice/cache_test.go b/pkg/fileservice/cache_test.go index 4ec2bb77cfca4..4b05d3fe17e43 100644 --- a/pkg/fileservice/cache_test.go +++ b/pkg/fileservice/cache_test.go @@ -32,7 +32,7 @@ func Test_readCache(t *testing.T) { slowCacheReadThreshold = time.Second size := int64(128) - m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "") + m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "", false) defer m.Close(ctx) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3) diff --git a/pkg/fileservice/disk_cache.go b/pkg/fileservice/disk_cache.go index cff7b09b760b7..1eca92c60b16a 100644 --- a/pkg/fileservice/disk_cache.go +++ b/pkg/fileservice/disk_cache.go @@ -60,6 +60,7 @@ func NewDiskCache( asyncLoad bool, cacheDataAllocator CacheDataAllocator, name string, + disable_s3fifo bool, ) (ret *DiskCache, err error) { err = os.MkdirAll(path, 0755) @@ -119,6 +120,7 @@ func NewDiskCache( ) } }, + disable_s3fifo, ), } ret.updatingPaths.Cond = sync.NewCond(new(sync.Mutex)) diff --git a/pkg/fileservice/disk_cache_test.go b/pkg/fileservice/disk_cache_test.go index 933238d4d3aae..abe7a63f10bdf 100644 --- a/pkg/fileservice/disk_cache_test.go +++ b/pkg/fileservice/disk_cache_test.go @@ -58,7 +58,7 @@ func TestDiskCache(t *testing.T) { } // new - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -143,7 +143,7 @@ func TestDiskCache(t *testing.T) { testRead(cache) // new cache instance and read - cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -152,7 +152,7 @@ func TestDiskCache(t *testing.T) { assert.Equal(t, 1, numWritten) // new cache instance and update - cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -178,7 +178,7 @@ func TestDiskCacheWriteAgain(t *testing.T) { }) ctx = OnDiskCacheEvict(ctx, nil) // for coverage - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -243,7 +243,7 @@ func TestDiskCacheWriteAgain(t *testing.T) { func TestDiskCacheFileCache(t *testing.T) { dir := t.TempDir() ctx := context.Background() - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -303,7 +303,7 @@ func TestDiskCacheDirSize(t *testing.T) { dir := t.TempDir() capacity := 1 << 20 - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -366,6 +366,7 @@ func benchmarkDiskCacheWriteThenRead( false, nil, "", + false, ) if err != nil { b.Fatal(err) @@ -461,6 +462,7 @@ func benchmarkDiskCacheReadRandomOffsetAtLargeFile( false, nil, "", + false, ) if err != nil { b.Fatal(err) @@ -532,6 +534,7 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) { false, nil, "", + false, ) if err != nil { b.Fatal(err) @@ -603,7 +606,7 @@ func TestDiskCacheClearFiles(t *testing.T) { assert.Nil(t, err) numFiles := len(files) - _, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + _, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) files, err = filepath.Glob(filepath.Join(dir, "*")) @@ -617,7 +620,7 @@ func TestDiskCacheClearFiles(t *testing.T) { func TestDiskCacheBadWrite(t *testing.T) { dir := t.TempDir() ctx := context.Background() - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) written, err := cache.writeFile( @@ -652,6 +655,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) { false, nil, "test", + false, ) assert.Nil(t, err) defer cache.Close(ctx) @@ -684,7 +688,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) { func TestDiskCacheSetFromFile(t *testing.T) { ctx := context.Background() - cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "") + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "", false) require.Nil(t, err) defer cache.Close(ctx) @@ -709,7 +713,7 @@ func TestDiskCacheSetFromFile(t *testing.T) { func TestDiskCacheQuotaExceeded(t *testing.T) { ctx := context.Background() - cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "") + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "", false) require.Nil(t, err) defer cache.Close(ctx) diff --git a/pkg/fileservice/fifocache/bench2_test.go b/pkg/fileservice/fifocache/bench2_test.go new file mode 100644 index 0000000000000..7370d79de4bdd --- /dev/null +++ b/pkg/fileservice/fifocache/bench2_test.go @@ -0,0 +1,133 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "context" + "math/rand/v2" + "runtime" + "sync" + "testing" + "time" + + "github.com/matrixorigin/matrixone/pkg/fileservice/fscache" +) + +const g_cache_size = 51200000 // 512M +const g_item_size = 128000 // 128K +const g_io_read_time = 20 * time.Microsecond + +func cache_read(ctx context.Context, cache *Cache[int64, int64], key int64) { + + _, ok := cache.Get(ctx, key) + if !ok { + // cache miss and sleep penalty as IO read + time.Sleep(g_io_read_time) + cache.Set(ctx, key, int64(0), g_item_size) + } +} + +func get_rand(start int64, end int64, r *rand.Rand, mutex *sync.Mutex) int64 { + mutex.Lock() + defer mutex.Unlock() + return start + r.Int64N(end-start) +} + +func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], startkey int64, endkey int64, r *rand.Rand, mutex *sync.Mutex) { + + ncpu := runtime.NumCPU() + var wg sync.WaitGroup + + for i := 0; i < ncpu; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for n := 0; n < b.N; n++ { + + if n%ncpu != i { + continue + } + + //fmt.Printf("start = %d, end = %d\n", startkey, endkey) + for range endkey - startkey { + key := get_rand(startkey, endkey, r, mutex) + cache_read(ctx, cache, key) + } + } + }() + } + + wg.Wait() +} + +func data_shift(b *testing.B, time int64) { + var mutex sync.Mutex + ctx := context.Background() + cache_size := g_cache_size + cache := New[int64, int64](fscache.ConstCapacity(int64(cache_size)), ShardInt[int64], nil, nil, nil, false) + r := rand.New(rand.NewPCG(1, 2)) + + offset := int64(0) + start := int64(0) + end := int64(g_cache_size) / int64(g_item_size) * time + d1 := []int64{start, end} + offset += end + d2 := []int64{offset + start, offset + end} + offset += end + d3 := []int64{offset + start, offset + end} + + b.ResetTimer() + + dataset_read(b, ctx, cache, d1[0], d1[1], r, &mutex) + dataset_read(b, ctx, cache, d2[0], d2[1], r, &mutex) + dataset_read(b, ctx, cache, d3[0], d3[1], r, &mutex) +} + +func data_readNx(b *testing.B, time int64) { + var mutex sync.Mutex + ctx := context.Background() + cache_size := g_cache_size + cache := New[int64, int64](fscache.ConstCapacity(int64(cache_size)), ShardInt[int64], nil, nil, nil, false) + start := int64(0) + end := int64(g_cache_size) / int64(g_item_size) * time + r := rand.New(rand.NewPCG(1, 2)) + + b.ResetTimer() + dataset_read(b, ctx, cache, start, end, r, &mutex) +} + +func BenchmarkSimCacheRead1x(b *testing.B) { + data_readNx(b, 1) +} + +func BenchmarkSimCacheRead1xShift(b *testing.B) { + data_shift(b, 1) +} + +func BenchmarkSimCacheRead2x(b *testing.B) { + data_readNx(b, 2) +} + +func BenchmarkSimCacheRead2xShift(b *testing.B) { + data_shift(b, 2) +} + +func BenchmarkSimCacheRead4x(b *testing.B) { + data_readNx(b, 4) +} + +func BenchmarkSimCacheRead4xShift(b *testing.B) { + data_shift(b, 4) +} diff --git a/pkg/fileservice/fifocache/bench_test.go b/pkg/fileservice/fifocache/bench_test.go index d4bce36f709cd..5168f2ce54735 100644 --- a/pkg/fileservice/fifocache/bench_test.go +++ b/pkg/fileservice/fifocache/bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkSequentialSet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() for i := 0; i < b.N; i++ { @@ -37,7 +37,7 @@ func BenchmarkSequentialSet(b *testing.B) { func BenchmarkParallelSet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -50,7 +50,7 @@ func BenchmarkParallelSet(b *testing.B) { func BenchmarkGet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 for i := 0; i < nElements; i++ { cache.Set(ctx, i, i, int64(1+i%3)) @@ -64,7 +64,7 @@ func BenchmarkGet(b *testing.B) { func BenchmarkParallelGet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 for i := 0; i < nElements; i++ { cache.Set(ctx, i, i, int64(1+i%3)) @@ -80,7 +80,7 @@ func BenchmarkParallelGet(b *testing.B) { func BenchmarkParallelGetOrSet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -97,7 +97,7 @@ func BenchmarkParallelGetOrSet(b *testing.B) { func BenchmarkParallelEvict(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { diff --git a/pkg/fileservice/fifocache/data_cache.go b/pkg/fileservice/fifocache/data_cache.go index a0aa508e015fb..da1ca664204d8 100644 --- a/pkg/fileservice/fifocache/data_cache.go +++ b/pkg/fileservice/fifocache/data_cache.go @@ -33,9 +33,10 @@ func NewDataCache( postSet func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64), postGet func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64), postEvict func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64), + disable_s3fifo bool, ) *DataCache { return &DataCache{ - fifo: New(capacity, shardCacheKey, postSet, postGet, postEvict), + fifo: New(capacity, shardCacheKey, postSet, postGet, postEvict, disable_s3fifo), } } @@ -45,6 +46,7 @@ func shardCacheKey(key fscache.CacheKey) uint64 { var hasher maphash.Hash hasher.SetSeed(seed) hasher.Write(util.UnsafeToBytes(&key.Offset)) + hasher.Write(util.UnsafeToBytes(&key.Sz)) hasher.WriteString(key.Path) return hasher.Sum64() } @@ -52,9 +54,7 @@ func shardCacheKey(key fscache.CacheKey) uint64 { var _ fscache.DataCache = new(DataCache) func (d *DataCache) Available() int64 { - d.fifo.queueLock.RLock() - defer d.fifo.queueLock.RUnlock() - ret := d.fifo.capacity() - d.fifo.used1 - d.fifo.used2 + ret := d.fifo.capacity() - d.fifo.Used() if ret < 0 { ret = 0 } @@ -66,24 +66,20 @@ func (d *DataCache) Capacity() int64 { } func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { + deletes := make([]*_CacheItem[fscache.CacheKey, fscache.Data], 0, 10) for _, path := range paths { - for i := 0; i < len(d.fifo.shards); i++ { - d.deletePath(ctx, i, path) - } + + key := fscache.CacheKey{Path: path} + d.fifo.htab.CompareAndDelete(key, func(key1, key2 fscache.CacheKey) bool { + return key1.Path == key2.Path + }, func(value *_CacheItem[fscache.CacheKey, fscache.Data]) { + deletes = append(deletes, value) + }) } -} -func (d *DataCache) deletePath(ctx context.Context, shardIndex int, path string) { - shard := &d.fifo.shards[shardIndex] - shard.Lock() - defer shard.Unlock() - for key, item := range shard.values { - if key.Path == path { - delete(shard.values, key) - if d.fifo.postEvict != nil { - d.fifo.postEvict(ctx, item.key, item.value, item.size) - } - } + // FSCACHEDATA RELEASE + for _, item := range deletes { + item.MarkAsDeleted(ctx, d.fifo.postEvict) } } @@ -109,7 +105,5 @@ func (d *DataCache) Set(ctx context.Context, key query.CacheKey, value fscache.D } func (d *DataCache) Used() int64 { - d.fifo.queueLock.RLock() - defer d.fifo.queueLock.RUnlock() - return d.fifo.used1 + d.fifo.used2 + return d.fifo.Used() } diff --git a/pkg/fileservice/fifocache/data_cache_test.go b/pkg/fileservice/fifocache/data_cache_test.go index 099e3640ed71c..4ab400f184ba6 100644 --- a/pkg/fileservice/fifocache/data_cache_test.go +++ b/pkg/fileservice/fifocache/data_cache_test.go @@ -29,6 +29,7 @@ func BenchmarkEnsureNBytesAndSet(b *testing.B) { cache := NewDataCache( fscache.ConstCapacity(1024), nil, nil, nil, + false, ) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -63,7 +64,7 @@ func TestShardCacheKeyAllocs(t *testing.T) { Offset: 3, Path: strings.Repeat("abc", 42), } - if n := testing.AllocsPerRun(64, func() { + if n := testing.AllocsPerRun(64000, func() { shardCacheKey(key) }); n != 0 { t.Fatalf("should not allocate") @@ -74,6 +75,7 @@ func BenchmarkDataCacheSet(b *testing.B) { cache := NewDataCache( fscache.ConstCapacity(1024), nil, nil, nil, + false, ) b.ResetTimer() for i := range b.N { @@ -93,6 +95,7 @@ func BenchmarkDataCacheGet(b *testing.B) { cache := NewDataCache( fscache.ConstCapacity(1024), nil, nil, nil, + false, ) key := fscache.CacheKey{ Path: "foo", @@ -118,7 +121,8 @@ func (t testBytes) Bytes() []byte { return t } -func (t testBytes) Release() { +func (t testBytes) Release() bool { + return false } func (t testBytes) Retain() { diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index bb05456a1b199..3a75140222336 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -16,237 +16,256 @@ package fifocache import ( "context" - "runtime" "sync" - "sync/atomic" - - "golang.org/x/sys/cpu" "github.com/matrixorigin/matrixone/pkg/fileservice/fscache" ) -const numShards = 256 - -// Cache implements an in-memory cache with FIFO-based eviction -// it's mostly like the S3-fifo, only without the ghost queue part +// Cache implements an in-memory cache with S3-FIFO-based eviction +// All postfn is very critical. They will increment and decrement the reference counter of the cache data and deallocate the memory when reference counter is 0. +// Make sure the postfn is protected by mutex from shardmap. type Cache[K comparable, V any] struct { - capacity fscache.CapacityFunc - capacity1 fscache.CapacityFunc - keyShardFunc func(K) uint64 + capacity fscache.CapacityFunc + capSmall fscache.CapacityFunc postSet func(ctx context.Context, key K, value V, size int64) postGet func(ctx context.Context, key K, value V, size int64) postEvict func(ctx context.Context, key K, value V, size int64) - shards [numShards]struct { - sync.Mutex - values map[K]*_CacheItem[K, V] - _ cpu.CacheLinePad - } - - itemQueue chan *_CacheItem[K, V] + mutex sync.Mutex + htab *ShardMap[K, *_CacheItem[K, V]] - queueLock sync.RWMutex - used1 int64 - queue1 Queue[*_CacheItem[K, V]] - used2 int64 - queue2 Queue[*_CacheItem[K, V]] - - capacityCut atomic.Int64 + usedSmall int64 + small Queue[*_CacheItem[K, V]] + usedMain int64 + main Queue[*_CacheItem[K, V]] + ghost *ghost[K] + disable_s3fifo bool } type _CacheItem[K comparable, V any] struct { key K value V size int64 - count atomic.Int32 + + freq int8 + deleted bool // flag indicate item is already deleted by either hashtable or evict } -func (c *_CacheItem[K, V]) inc() { - for { - cur := c.count.Load() - if cur >= 3 { - return - } - if c.count.CompareAndSwap(cur, cur+1) { - return - } +func (c *_CacheItem[K, V]) Inc() { + if c.freq < 3 { + c.freq += 1 } } -func (c *_CacheItem[K, V]) dec() { - for { - cur := c.count.Load() - if cur <= 0 { - return - } - if c.count.CompareAndSwap(cur, cur-1) { - return - } +func (c *_CacheItem[K, V]) Dec() { + if c.freq > 0 { + c.freq -= 1 } } +func (c *_CacheItem[K, V]) GetFreq() int8 { + return c.freq +} + +func (c *_CacheItem[K, V]) IsDeleted() bool { + return c.deleted +} + +// first MarkAsDeleted will decrement the ref counter and call postfn and set deleted = true. +// After first call, MarkAsDeleted will do nothing. +func (c *_CacheItem[K, V]) MarkAsDeleted(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { + // check item is already deleted + if c.deleted { + // exit and return false which means no need to deallocate the memory + return false + } + + // set deleted = true + c.deleted = true + + // call postEvict before decrement the ref counter + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } + + // decrement the ref counter + c.releaseValue() + return true +} + +func (c *_CacheItem[K, V]) PostFn(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } +} + +func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { + // first check item is already deleted + if c.deleted { + return false + } + + // if not deleted, increment ref counter to occupy the memory + c.retainValue() + + // value is safe to be accessed now and call postfn + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } + + return true +} + +// INTERNAL: non-thread safe. +// if deleted = true, item value is already released by this Cache and is NOT valid to use it inside the Cache. +// if deleted = false, increment the reference counter of the value and it is safe to use now. +func (c *_CacheItem[K, V]) retainValue() { + cdata, ok := any(c.value).(fscache.Data) + if ok { + cdata.Retain() + } +} + +// INTERNAL: non-thread safe. +// decrement the reference counter +func (c *_CacheItem[K, V]) releaseValue() { + cdata, ok := any(c.value).(fscache.Data) + if ok { + cdata.Release() + } +} + +// assume cache size is 128K +// if cache capacity is smaller than 4G, ghost size is 100%. Otherwise, 50% +func estimateGhostSize(capacity int64) int { + estimate := int(capacity / int64(32806)) + if capacity > 8000000000 { // 8G + // only 50% + estimate /= 2 + } + if estimate < 10000 { + estimate = 10000 + } + return estimate +} + func New[K comparable, V any]( capacity fscache.CapacityFunc, keyShardFunc func(K) uint64, postSet func(ctx context.Context, key K, value V, size int64), postGet func(ctx context.Context, key K, value V, size int64), postEvict func(ctx context.Context, key K, value V, size int64), + disable_s3fifo bool, ) *Cache[K, V] { + + ghostsize := estimateGhostSize(capacity()) ret := &Cache[K, V]{ capacity: capacity, - capacity1: func() int64 { - return capacity() / 10 + capSmall: func() int64 { + cs := capacity() / 10 + if cs == 0 { + cs = 1 + } + return cs }, - itemQueue: make(chan *_CacheItem[K, V], runtime.GOMAXPROCS(0)*2), - queue1: *NewQueue[*_CacheItem[K, V]](), - queue2: *NewQueue[*_CacheItem[K, V]](), - keyShardFunc: keyShardFunc, - postSet: postSet, - postGet: postGet, - postEvict: postEvict, - } - for i := range ret.shards { - ret.shards[i].values = make(map[K]*_CacheItem[K, V], 1024) + small: *NewQueue[*_CacheItem[K, V]](), + main: *NewQueue[*_CacheItem[K, V]](), + ghost: newGhost[K](ghostsize), + postSet: postSet, + postGet: postGet, + postEvict: postEvict, + htab: NewShardMap[K, *_CacheItem[K, V]](keyShardFunc), + disable_s3fifo: disable_s3fifo, } return ret } -func (c *Cache[K, V]) set(ctx context.Context, key K, value V, size int64) *_CacheItem[K, V] { - shard := &c.shards[c.keyShardFunc(key)%numShards] - shard.Lock() - defer shard.Unlock() - _, ok := shard.values[key] - if ok { - // existed - return nil - } +func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { + c.mutex.Lock() + defer c.mutex.Unlock() item := &_CacheItem[K, V]{ key: key, value: value, size: size, } - shard.values[key] = item - if c.postSet != nil { - c.postSet(ctx, key, value, size) - } - return item -} + // FSCACHEDATA RETAIN + // increment the ref counter first no matter what to make sure the memory is occupied before hashtable.Set + item.Retain(ctx, nil) -func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { - if item := c.set(ctx, key, value, size); item != nil { - c.enqueue(item) - c.Evict(ctx, nil, 0) + ok := c.htab.Set(key, item, nil) + if !ok { + // existed + // FSCACHEDATA RELEASE + // decrement the ref counter if not set to release the resource + item.MarkAsDeleted(ctx, nil) + return } -} -func (c *Cache[K, V]) enqueue(item *_CacheItem[K, V]) { - if !c.queueLock.TryLock() { - // try put itemQueue - select { - case c.itemQueue <- item: - // let the queueLock holder do the job - return - default: - // block until get lock - c.queueLock.Lock() - defer c.queueLock.Unlock() - } - } else { - defer c.queueLock.Unlock() - } + // postSet + item.PostFn(ctx, c.postSet) + + // evict + c.evictAll(ctx, nil, 0) // enqueue - c.queue1.enqueue(item) - c.used1 += item.size - - // help enqueue - for { - select { - case item := <-c.itemQueue: - c.queue1.enqueue(item) - c.used1 += item.size - default: - return + if c.disable_s3fifo { + c.small.enqueue(item) + c.usedSmall += item.size + } else { + if c.ghost.contains(item.key) { + c.ghost.remove(item.key) + c.main.enqueue(item) + c.usedMain += item.size + } else { + c.small.enqueue(item) + c.usedSmall += item.size } } + } func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { - shard := &c.shards[c.keyShardFunc(key)%numShards] - shard.Lock() + c.mutex.Lock() + defer c.mutex.Unlock() var item *_CacheItem[K, V] - item, ok = shard.values[key] + + item, ok = c.htab.Get(key, nil) if !ok { - shard.Unlock() return } - if c.postGet != nil { - c.postGet(ctx, item.key, item.value, item.size) + + // FSCACHEDATA RETAIN + ok = item.Retain(ctx, c.postGet) + if !ok { + return item.value, false } - shard.Unlock() - item.inc() + + // increment + item.Inc() + return item.value, true } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - shard := &c.shards[c.keyShardFunc(key)%numShards] - shard.Lock() - defer shard.Unlock() - item, ok := shard.values[key] - if !ok { - return - } - delete(shard.values, key) - if c.postEvict != nil { - c.postEvict(ctx, item.key, item.value, item.size) + c.mutex.Lock() + defer c.mutex.Unlock() + item, ok := c.htab.GetAndDelete(key, nil) + + if ok { + // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. + // item.deleted makes sure postEvict only call once. + item.MarkAsDeleted(ctx, c.postEvict) } - // queues will be update in evict + } func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut int64) { - if done == nil { - // can be async - if c.queueLock.TryLock() { - defer c.queueLock.Unlock() - } else { - if capacityCut > 0 { - // let the holder do more evict - c.capacityCut.Add(capacityCut) - } - return - } - - } else { - if cap(done) < 1 { - panic("should be buffered chan") - } - c.queueLock.Lock() - defer c.queueLock.Unlock() - } - - var target int64 - for { - globalCapacityCut := c.capacityCut.Swap(0) - target = c.capacity() - capacityCut - globalCapacityCut - if target < 0 { - target = 0 - } - if c.used1+c.used2 <= target { - break - } - target1 := c.capacity1() - capacityCut - globalCapacityCut - if target1 < 0 { - target1 = 0 - } - if c.used1 > target1 { - c.evict1(ctx) - } else { - c.evict2(ctx) - } - } + c.mutex.Lock() + defer c.mutex.Unlock() + target := c.evictAll(ctx, done, capacityCut) if done != nil { done <- target } @@ -254,64 +273,100 @@ func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut in // ForceEvict evicts n bytes despite capacity func (c *Cache[K, V]) ForceEvict(ctx context.Context, n int64) { - capacityCut := c.capacity() - c.used() + n - c.Evict(ctx, nil, capacityCut) + c.mutex.Lock() + defer c.mutex.Unlock() + capacityCut := c.capacity() - c.usedSmall + c.usedMain + n + c.evictAll(ctx, nil, capacityCut) +} + +func (c *Cache[K, V]) Used() int64 { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.usedSmall + c.usedMain } -func (c *Cache[K, V]) used() int64 { - c.queueLock.RLock() - defer c.queueLock.RUnlock() - return c.used1 + c.used2 +func (c *Cache[K, V]) evictAll(ctx context.Context, done chan int64, capacityCut int64) int64 { + var target int64 + target = c.capacity() - capacityCut - 1 + if target <= 0 { + target = 0 + } + targetSmall := c.capSmall() - capacityCut - 1 + if targetSmall <= 0 { + targetSmall = 0 + } + + for c.usedMain+c.usedSmall > target { + if c.usedSmall > targetSmall { + c.evictSmall(ctx) + } else { + c.evictMain(ctx) + } + } + + return target + 1 } -func (c *Cache[K, V]) evict1(ctx context.Context) { - // queue 1 - for { - item, ok := c.queue1.dequeue() +func (c *Cache[K, V]) evictSmall(ctx context.Context) { + // small fifo + for c.usedSmall > 0 { + item, ok := c.small.dequeue() if !ok { // queue empty return } - if item.count.Load() > 1 { - // put queue2 - c.queue2.enqueue(item) - c.used1 -= item.size - c.used2 += item.size + + deleted := item.IsDeleted() + if deleted { + c.usedSmall -= item.size + return + } + + if item.GetFreq() > 1 { + // put main + c.main.enqueue(item) + c.usedSmall -= item.size + c.usedMain += item.size } else { // evict - c.deleteItem(ctx, item) - c.used1 -= item.size + c.htab.Remove(item.key) + c.usedSmall -= item.size + if !c.disable_s3fifo { + c.ghost.add(item.key) + } + // mark item as deleted and item should not be accessed again + item.MarkAsDeleted(ctx, c.postEvict) return } } } -func (c *Cache[K, V]) deleteItem(ctx context.Context, item *_CacheItem[K, V]) { - shard := &c.shards[c.keyShardFunc(item.key)%numShards] - shard.Lock() - defer shard.Unlock() - delete(shard.values, item.key) - if c.postEvict != nil { - c.postEvict(ctx, item.key, item.value, item.size) - } -} - -func (c *Cache[K, V]) evict2(ctx context.Context) { - // queue 2 - for { - item, ok := c.queue2.dequeue() +func (c *Cache[K, V]) evictMain(ctx context.Context) { + // main fifo + for c.usedMain > 0 { + item, ok := c.main.dequeue() if !ok { // empty queue - break + return } - if item.count.Load() > 0 { + + deleted := item.IsDeleted() + if deleted { + c.usedMain -= item.size + return + } + + if item.GetFreq() > 0 { // re-enqueue - c.queue2.enqueue(item) - item.dec() + item.Dec() + c.main.enqueue(item) } else { // evict - c.deleteItem(ctx, item) - c.used2 -= item.size + c.htab.Remove(item.key) + c.usedMain -= item.size + // mark item as deleted and item should not be accessed again + item.MarkAsDeleted(ctx, c.postEvict) + return } } diff --git a/pkg/fileservice/fifocache/fifo_test.go b/pkg/fileservice/fifocache/fifo_test.go index 9e5ca3fcde918..f09e84f43d68a 100644 --- a/pkg/fileservice/fifocache/fifo_test.go +++ b/pkg/fileservice/fifocache/fifo_test.go @@ -24,7 +24,7 @@ import ( func TestCacheSetGet(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil, false) cache.Set(ctx, 1, 1, 1) n, ok := cache.Get(ctx, 1) @@ -42,18 +42,18 @@ func TestCacheSetGet(t *testing.T) { func TestCacheEvict(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil, false) for i := 0; i < 64; i++ { cache.Set(ctx, i, i, 1) - if cache.used1+cache.used2 > cache.capacity() { - t.Fatalf("capacity %v, used1 %v used2 %v", cache.capacity(), cache.used1, cache.used2) + if cache.Used() > cache.capacity() { + t.Fatalf("capacity %v, usedSmall %v usedMain %v", cache.capacity(), cache.usedSmall, cache.usedMain) } } } func TestCacheEvict2(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(2), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(20), ShardInt[int], nil, nil, nil, false) cache.Set(ctx, 1, 1, 1) cache.Set(ctx, 2, 2, 1) @@ -76,8 +76,8 @@ func TestCacheEvict2(t *testing.T) { v, ok = cache.Get(ctx, 4) assert.True(t, ok) assert.Equal(t, 4, v) - assert.Equal(t, int64(1), cache.used1) - assert.Equal(t, int64(1), cache.used2) + assert.Equal(t, int64(4), cache.usedSmall) + assert.Equal(t, int64(0), cache.usedMain) } func TestCacheEvict3(t *testing.T) { @@ -94,12 +94,13 @@ func TestCacheEvict3(t *testing.T) { func(_ context.Context, _ int, _ bool, _ int64) { nEvict++ }, + false, ) for i := 0; i < 1024; i++ { cache.Set(ctx, i, true, 1) cache.Get(ctx, i) cache.Get(ctx, i) - assert.True(t, cache.used1+cache.used2 <= 1024) + assert.True(t, cache.Used() <= 1024) } assert.Equal(t, 0, nEvict) assert.Equal(t, 1024, nSet) @@ -107,11 +108,123 @@ func TestCacheEvict3(t *testing.T) { for i := 0; i < 1024; i++ { cache.Set(ctx, 10000+i, true, 1) - assert.True(t, cache.used1+cache.used2 <= 1024) + assert.True(t, cache.Used() <= 1024) } - assert.Equal(t, int64(102), cache.used1) - assert.Equal(t, int64(922), cache.used2) + assert.Equal(t, int64(102), cache.usedSmall) + assert.Equal(t, int64(922), cache.usedMain) assert.Equal(t, 1024, nEvict) assert.Equal(t, 2048, nSet) assert.Equal(t, 2048, nGet) } + +func TestCacheOneHitWonder(t *testing.T) { + ctx := context.Background() + cache := New[int64, int64](fscache.ConstCapacity(1000), ShardInt[int64], nil, nil, nil, false) + + capsmall := int64(1000) + for i := range capsmall { + cache.Set(ctx, i, i, 1) + } + assert.Equal(t, int64(0), cache.usedMain) + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) +} + +func TestCacheMoveMain(t *testing.T) { + ctx := context.Background() + cache := New[int64, int64](fscache.ConstCapacity(100), ShardInt[int64], nil, nil, nil, false) + + // fill small fifo to 90 + for i := range int64(90) { + cache.Set(ctx, 10000+i, 10000+i, 1) + } + + results := [][]int64{{0, 100}, {0, 100}, {0, 100}, {0, 100}, {0, 100}, + {20, 80}, {40, 60}, {60, 40}, {80, 20}, + {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}} + + for k := range int64(20) { + //fmt.Printf("cache set 10 items\n") + capsmall := int64(10) + for i := range capsmall { + cache.Set(ctx, k*10+i, k*10+i, 1) + } + + //fmt.Printf("increment freq 2\n") + // increment the freq + for j := range 2 { + for i := range capsmall { + _, ok := cache.Get(ctx, k*10+i) + assert.Equal(t, ok, true) + } + _ = j + } + //fmt.Printf("Add %d to %d and Try move to main\n", (k+1)*200, (k+1)*200+capsmall) + // move to main + for i := range capsmall { + cache.Set(ctx, (k+1)*200+i, (k+1)*200+i, 1) + } + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + + assert.Equal(t, results[k][0], cache.usedMain) + assert.Equal(t, results[k][1], cache.usedSmall) + } + + assert.Equal(t, int64(90), cache.usedMain) + assert.Equal(t, int64(10), cache.usedSmall) + // remove all main 0 - 99 + //fmt.Printf("remove all main\n") +} + +func TestCacheMoveGhost(t *testing.T) { + ctx := context.Background() + cache := New[int64, int64](fscache.ConstCapacity(100), ShardInt[int64], nil, nil, nil, false) + + // fill small fifo to 90 + for i := range int64(90) { + cache.Set(ctx, 10000+i, 10000+i, 1) + } + + for k := range int64(2) { + //fmt.Printf("cache set 10 items\n") + capsmall := int64(10) + for i := range capsmall { + cache.Set(ctx, k*10+i, k*10+i, 1) + } + + //fmt.Printf("increment freq 2\n") + // increment the freq + for j := range 2 { + for i := range capsmall { + _, ok := cache.Get(ctx, k*10+i) + assert.Equal(t, ok, true) + } + _ = j + } + //fmt.Printf("Add %d to %d and Try move to main\n", (k+1)*200, (k+1)*200+capsmall) + // move to main + for i := range capsmall { + cache.Set(ctx, (k+1)*200+i, (k+1)*200+i, 1) + } + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + } + + for i := 10000; i < 10020; i++ { + assert.Equal(t, cache.ghost.contains(int64(i)), true) + + } + + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + for i := 10000; i < 10020; i++ { + cache.Set(ctx, int64(i), int64(i), 1) + assert.Equal(t, cache.ghost.contains(int64(i)), false) + } + + assert.Equal(t, cache.usedMain, int64(20)) + assert.Equal(t, cache.usedSmall, int64(80)) + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + //assert.Equal(t, int64(10), cache.usedMain.Load()) + //assert.Equal(t, int64(10), cache.usedSmall.Load()) + + // remove all main 0 - 99 + //fmt.Printf("remove all main\n") +} diff --git a/pkg/fileservice/fifocache/ghost.go b/pkg/fileservice/fifocache/ghost.go new file mode 100644 index 0000000000000..2eac3fd699322 --- /dev/null +++ b/pkg/fileservice/fifocache/ghost.go @@ -0,0 +1,90 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "container/list" + "sync" +) + +// ghost implements a thread-safe ghost queue for S3-FIFO +type ghost[K comparable] struct { + mu sync.RWMutex // Use RWMutex for better read performance + capacity int + keys map[K]*list.Element + queue *list.List +} + +func newGhost[K comparable](capacity int) *ghost[K] { + return &ghost[K]{ + capacity: capacity, + keys: make(map[K]*list.Element), + queue: list.New(), + } +} + +func (g *ghost[K]) add(key K) { + g.mu.Lock() + defer g.mu.Unlock() + + if _, ok := g.keys[key]; ok { + // Key already exists, maybe move it to back if needed by specific ghost logic + // For simple ghost queue, we might just ignore or update frequency if tracked + return + } + + // Evict if capacity is reached + if g.queue.Len() >= g.capacity { + elem := g.queue.Front() + if elem != nil { + evictedKey := g.queue.Remove(elem).(K) + delete(g.keys, evictedKey) + } + } + + // Add new key + elem := g.queue.PushBack(key) + g.keys[key] = elem +} + +func (g *ghost[K]) remove(key K) { + g.mu.Lock() + defer g.mu.Unlock() + + if elem, ok := g.keys[key]; ok { + g.queue.Remove(elem) + delete(g.keys, key) + } +} + +func (g *ghost[K]) contains(key K) bool { + g.mu.RLock() + defer g.mu.RUnlock() + + _, ok := g.keys[key] + return ok +} + +/* +func (g *ghost[K]) clear() { + g.mu.Lock() + defer g.mu.Unlock() + + g.queue.Init() + for k := range g.keys { + delete(g.keys, k) + } +} +*/ diff --git a/pkg/fileservice/fifocache/queue.go b/pkg/fileservice/fifocache/queue.go index fd9431d63e977..d73f4dace6300 100644 --- a/pkg/fileservice/fifocache/queue.go +++ b/pkg/fileservice/fifocache/queue.go @@ -20,6 +20,7 @@ type Queue[T any] struct { head *queuePart[T] tail *queuePart[T] partPool sync.Pool + size int } type queuePart[T any] struct { @@ -46,9 +47,9 @@ func NewQueue[T any]() *Queue[T] { return queue } +// empty is an internal helper, assumes lock is held func (p *Queue[T]) empty() bool { - return p.head == p.tail && - p.head.begin == len(p.head.values) + return p.head == p.tail && len(p.head.values) == p.head.begin } func (p *queuePart[T]) reset() { @@ -66,6 +67,7 @@ func (p *Queue[T]) enqueue(v T) { p.head = newPart } p.head.values = append(p.head.values, v) + p.size++ } func (p *Queue[T]) dequeue() (ret T, ok bool) { @@ -76,17 +78,25 @@ func (p *Queue[T]) dequeue() (ret T, ok bool) { if p.tail.begin >= len(p.tail.values) { // shrink if p.tail.next == nil { - panic("impossible") + // This should ideally not happen if empty() check passes, + // but adding a safeguard. + // Consider logging an error here if it does. + return } part := p.tail p.tail = p.tail.next - p.partPool.Put(part) + p.partPool.Put(part) // Return the old part to the pool } ret = p.tail.values[p.tail.begin] var zero T p.tail.values[p.tail.begin] = zero p.tail.begin++ + p.size-- ok = true return } + +func (p *Queue[T]) Len() int { + return p.size +} diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go new file mode 100644 index 0000000000000..d1d50f5efaf8d --- /dev/null +++ b/pkg/fileservice/fifocache/shardmap.go @@ -0,0 +1,92 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +const numShards = 256 + +type ShardMap[K comparable, V any] struct { + values map[K]V + hashfn func(K) uint64 +} + +func NewShardMap[K comparable, V any](hashfn func(K) uint64) *ShardMap[K, V] { + m := &ShardMap[K, V]{hashfn: hashfn, + values: make(map[K]V, 1024), + } + return m +} + +func (m *ShardMap[K, V]) Set(key K, value V, postfn func(V)) bool { + + _, ok := m.values[key] + if ok { + return false + } + + m.values[key] = value + + if postfn != nil { + // call postSet protected by mutex.Lock + postfn(value) + } + return true +} + +func (m *ShardMap[K, V]) Get(key K, postfn func(V)) (V, bool) { + + v, ok := m.values[key] + if !ok { + return v, ok + } + + if postfn != nil { + // call postGet protected the mutex RLock. + postfn(v) + } + return v, ok +} + +func (m *ShardMap[K, V]) Remove(key K) { + delete(m.values, key) +} + +func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn func(V)) { + + for k, v := range m.values { + if fn(k, key) { + delete(m.values, k) + if postfn != nil { + // call postfn to let parent know the item get deleted. (protected by mutex.Lock) + postfn(v) + } + } + } +} + +func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { + v, ok := m.values[key] + if !ok { + return v, ok + } + + delete(m.values, key) + + if postfn != nil { + // call postfn to let parent know the item get deleted. (protected by mutex.Lock) + postfn(v) + } + + return v, ok +} diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go new file mode 100644 index 0000000000000..da1695d58a88f --- /dev/null +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -0,0 +1,45 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestShardMap(t *testing.T) { + + m := NewShardMap[int, string](ShardInt[int]) + ok := m.Set(1, "1", func(v string) { + }) + assert.Equal(t, ok, true) + ok = m.Set(1, "1", func(v string) { + }) + assert.Equal(t, ok, false) + + v, ok := m.Get(1, func(v string) { + }) + assert.Equal(t, ok, true) + assert.Equal(t, v, "1") + + _, ok = m.GetAndDelete(0, func(v string) { + }) + assert.Equal(t, ok, false) + + _, ok = m.GetAndDelete(1, func(v string) { + }) + assert.Equal(t, ok, true) +} diff --git a/pkg/fileservice/fscache/data.go b/pkg/fileservice/fscache/data.go index 0a11fc38d4208..5761b155100cc 100644 --- a/pkg/fileservice/fscache/data.go +++ b/pkg/fileservice/fscache/data.go @@ -18,5 +18,5 @@ type Data interface { Bytes() []byte Slice(length int) Data Retain() - Release() + Release() bool } diff --git a/pkg/fileservice/io_vector.go b/pkg/fileservice/io_vector.go index 933b8e996cb84..1427aeb72f5b4 100644 --- a/pkg/fileservice/io_vector.go +++ b/pkg/fileservice/io_vector.go @@ -14,7 +14,9 @@ package fileservice -import "math" +import ( + "math" +) func (i *IOVector) allDone() bool { for _, entry := range i.Entries { @@ -28,7 +30,10 @@ func (i *IOVector) allDone() bool { func (i *IOVector) Release() { for _, entry := range i.Entries { if entry.CachedData != nil { - entry.CachedData.Release() + if entry.CachedData.Release() { + entry.CachedData = nil + entry.fromCache = nil + } } if entry.releaseData != nil { entry.releaseData() diff --git a/pkg/fileservice/local_fs.go b/pkg/fileservice/local_fs.go index 9a146f963b08b..65713215a71f1 100644 --- a/pkg/fileservice/local_fs.go +++ b/pkg/fileservice/local_fs.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "errors" - "github.com/matrixorigin/matrixone/pkg/common/malloc" "io" "io/fs" "iter" @@ -31,6 +30,8 @@ import ( "sync/atomic" "time" + "github.com/matrixorigin/matrixone/pkg/common/malloc" + "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -156,6 +157,7 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error { &config.CacheCallbacks, l.perfCounterSets, l.name, + config.DisableS3Fifo, ) logutil.Info("fileservice: memory cache initialized", zap.Any("fs-name", l.name), @@ -177,6 +179,7 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error { true, l, l.name, + config.DisableS3Fifo, ) if err != nil { return err diff --git a/pkg/fileservice/local_fs_test.go b/pkg/fileservice/local_fs_test.go index 9381c681d9169..8fad00036d3d2 100644 --- a/pkg/fileservice/local_fs_test.go +++ b/pkg/fileservice/local_fs_test.go @@ -211,12 +211,14 @@ func TestLocalFSWithIOVectorCache(t *testing.T) { fscache.ConstCapacity(1<<20), nil, nil, "", + false, ) defer memCache1.Close(ctx) memCache2 := NewMemCache( fscache.ConstCapacity(1<<20), nil, nil, "", + false, ) defer memCache2.Close(ctx) caches := []IOVectorCache{memCache1, memCache2} diff --git a/pkg/fileservice/mem_cache.go b/pkg/fileservice/mem_cache.go index cbf8ed45f58d0..d3c1b10119ac9 100644 --- a/pkg/fileservice/mem_cache.go +++ b/pkg/fileservice/mem_cache.go @@ -33,6 +33,7 @@ func NewMemCache( callbacks *CacheCallbacks, counterSets []*perfcounter.CounterSet, name string, + disable_s3fifo bool, ) *MemCache { inuseBytes, capacityBytes := metric.GetFsCacheBytesGauge(name, "mem") @@ -52,9 +53,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_set_begin) defer LogEvent(ctx, str_memory_cache_post_set_end) - // retain - value.Retain() - // metrics LogEvent(ctx, str_update_metrics_begin) inuseBytes.Add(float64(size)) @@ -76,9 +74,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_get_begin) defer LogEvent(ctx, str_memory_cache_post_get_end) - // retain - value.Retain() - // callbacks if callbacks != nil { LogEvent(ctx, str_memory_cache_callbacks_begin) @@ -94,9 +89,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_evict_begin) defer LogEvent(ctx, str_memory_cache_post_evict_end) - // relaese - value.Release() - // metrics LogEvent(ctx, str_update_metrics_begin) inuseBytes.Add(float64(-size)) @@ -113,7 +105,7 @@ func NewMemCache( } } - dataCache := fifocache.NewDataCache(capacityFunc, postSetFn, postGetFn, postEvictFn) + dataCache := fifocache.NewDataCache(capacityFunc, postSetFn, postGetFn, postEvictFn, disable_s3fifo) ret := &MemCache{ cache: dataCache, @@ -209,6 +201,7 @@ func (m *MemCache) Update( } LogEvent(ctx, str_set_memory_cache_entry_begin) + // NOTE: data existed in hashtable will skip setting this cache data. At a result, reference counter does not increment m.cache.Set(ctx, key, entry.CachedData) LogEvent(ctx, str_set_memory_cache_entry_end) } diff --git a/pkg/fileservice/mem_cache_test.go b/pkg/fileservice/mem_cache_test.go index e1dfb9b2a79b9..77df8b16dec76 100644 --- a/pkg/fileservice/mem_cache_test.go +++ b/pkg/fileservice/mem_cache_test.go @@ -45,7 +45,7 @@ func TestMemCacheLeak(t *testing.T) { assert.Nil(t, err) size := int64(128) - m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "") + m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "", false) defer m.Close(ctx) newReadVec := func() *IOVector { @@ -112,13 +112,17 @@ func TestMemCacheLeak(t *testing.T) { assert.Equal(t, int64(size)-1, m.cache.Available()) assert.Equal(t, int64(1), m.cache.Used()) + // check double free + // delete path will remove items from hashtable but items are still in queus and have reference counter 0. + m.DeletePaths(ctx, []string{"foo"}) + } // TestHighConcurrency this test is to mainly test concurrency issue in objectCache // and dataOverlap-checker. func TestHighConcurrency(t *testing.T) { ctx := context.Background() - m := NewMemCache(fscache.ConstCapacity(2), nil, nil, "") + m := NewMemCache(fscache.ConstCapacity(2), nil, nil, "", false) defer m.Close(ctx) n := 10 @@ -165,6 +169,7 @@ func BenchmarkMemoryCacheUpdate(b *testing.B) { nil, nil, "", + false, ) defer cache.Flush(ctx) @@ -199,6 +204,7 @@ func BenchmarkMemoryCacheRead(b *testing.B) { nil, nil, "", + false, ) defer cache.Flush(ctx) @@ -247,6 +253,7 @@ func TestMemoryCacheGlobalSizeHint(t *testing.T) { nil, nil, "test", + false, ) defer cache.Close(ctx) diff --git a/pkg/fileservice/memory_fs_test.go b/pkg/fileservice/memory_fs_test.go index ca1d89a57ff53..db2a338e860a3 100644 --- a/pkg/fileservice/memory_fs_test.go +++ b/pkg/fileservice/memory_fs_test.go @@ -62,6 +62,7 @@ func BenchmarkMemoryFSWithMemoryCache(b *testing.B) { nil, nil, "", + false, ) defer cache.Close(ctx) @@ -85,6 +86,7 @@ func BenchmarkMemoryFSWithMemoryCacheLowCapacity(b *testing.B) { cache := NewMemCache( fscache.ConstCapacity(2*1024*1024), nil, nil, "", + false, ) defer cache.Close(ctx) diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go index 87c7ad8745768..5a08503959428 100644 --- a/pkg/fileservice/s3_fs.go +++ b/pkg/fileservice/s3_fs.go @@ -199,6 +199,7 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error { &config.CacheCallbacks, s.perfCounterSets, s.name, + config.DisableS3Fifo, ) logutil.Info("fileservice: memory cache initialized", zap.Any("fs-name", s.name), @@ -219,6 +220,7 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error { true, s, s.name, + config.DisableS3Fifo, ) if err != nil { return err diff --git a/pkg/objectio/cache.go b/pkg/objectio/cache.go index b92d1641785c6..9f482e3058d0a 100644 --- a/pkg/objectio/cache.go +++ b/pkg/objectio/cache.go @@ -48,6 +48,7 @@ const ( type CacheConfig struct { MemoryCapacity toml.ByteSize `toml:"memory-capacity"` + DisableS3Fifo bool `toml:"disable-s3fifo"` } // BlockReadStats collect blk read related cache statistics, @@ -111,16 +112,16 @@ func cacheCapacityFunc(size int64) fscache.CapacityFunc { } func init() { - metaCache = newMetaCache(cacheCapacityFunc(metaCacheSize())) + metaCache = newMetaCache(cacheCapacityFunc(metaCacheSize()), false) } -func InitMetaCache(size int64) { +func InitMetaCache(size int64, disable_s3fifo bool) { onceInit.Do(func() { - metaCache = newMetaCache(cacheCapacityFunc(size)) + metaCache = newMetaCache(cacheCapacityFunc(size), disable_s3fifo) }) } -func newMetaCache(capacity fscache.CapacityFunc) *fifocache.Cache[mataCacheKey, []byte] { +func newMetaCache(capacity fscache.CapacityFunc, disable_s3fifo bool) *fifocache.Cache[mataCacheKey, []byte] { inuseBytes, capacityBytes := metric.GetFsCacheBytesGauge("", "meta") capacityBytes.Set(float64(capacity())) return fifocache.New[mataCacheKey, []byte]( @@ -134,7 +135,8 @@ func newMetaCache(capacity fscache.CapacityFunc) *fifocache.Cache[mataCacheKey, func(_ context.Context, _ mataCacheKey, _ []byte, size int64) { // postEvict inuseBytes.Add(float64(-size)) capacityBytes.Set(float64(capacity())) - }) + }, + disable_s3fifo) } func EvictCache(ctx context.Context) (target int64) {