Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4034f7e
update deleted
cpegeric May 16, 2025
f224924
Merge branch 'main' into s3fifo-race-main
mergify[bot] May 16, 2025
dd044ff
cleanup
cpegeric May 16, 2025
dcd5490
Merge branch 's3fifo-race-main' of github.com:cpegeric/matrixone into…
cpegeric May 16, 2025
9e97aee
update
cpegeric May 16, 2025
15dbb34
update
cpegeric May 16, 2025
12ced87
add tests
cpegeric May 16, 2025
0331f9c
fix sca
cpegeric May 16, 2025
cbf8647
check double free
cpegeric May 16, 2025
9ae40fe
postfn protected by shardmap
cpegeric May 19, 2025
3f30dd3
update
cpegeric May 19, 2025
8ab612d
isDeleted protected by shardmap mutex
cpegeric May 19, 2025
c6338e7
use mutex instead
cpegeric May 19, 2025
d2fcc57
update
cpegeric May 19, 2025
ac7f439
use RLock
cpegeric May 19, 2025
a7ebf4b
protect Slice()
cpegeric May 19, 2025
b7847b6
check buffer deallocated
cpegeric May 20, 2025
a420ff5
add tests and comments
cpegeric May 20, 2025
dbc16ff
comments
cpegeric May 20, 2025
43ec47b
fix sca
cpegeric May 20, 2025
ee52cc7
panic
cpegeric May 20, 2025
f1de9d7
fix sca
cpegeric May 20, 2025
60f0513
DeletePaths add setDeleted to avoid multiple postEvict
cpegeric May 20, 2025
e9262aa
merge
cpegeric May 21, 2025
80f132a
merge fix
cpegeric May 21, 2025
72f6421
add test for double free and bug fix looping
cpegeric May 21, 2025
98b2724
merge fix
cpegeric Jul 10, 2025
663cf96
retain/release inside cache
cpegeric Jul 10, 2025
eda779e
update
cpegeric Jul 10, 2025
745f6c3
Merge branch 'main' into s3fifo-reborn-refcnt
mergify[bot] Jul 10, 2025
f9217dd
delete paths
cpegeric Jul 10, 2025
3e24a6b
Merge branch 's3fifo-reborn-refcnt' of github.com:cpegeric/matrixone …
cpegeric Jul 10, 2025
66eb809
item mutex protect everything
cpegeric Jul 11, 2025
1011890
update comment
cpegeric Jul 11, 2025
084c620
update
cpegeric Jul 11, 2025
536e323
bug fix
cpegeric Jul 14, 2025
7c5850b
bug fix
cpegeric Jul 14, 2025
560175b
comments
cpegeric Jul 14, 2025
235ab0b
clean up bytes
cpegeric Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/mo-service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (c *Config) setDefaultValue() error {

func (c *Config) initMetaCache() {
if c.MetaCache.MemoryCapacity > 0 {
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity))
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (c *ServiceConfig) setDefaultValue() error {

func (c *ServiceConfig) initMetaCache() {
if c.MetaCache.MemoryCapacity > 0 {
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity))
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo)
}
}

Expand Down
59 changes: 40 additions & 19 deletions pkg/fileservice/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,75 @@ package fileservice

import (
"context"
"sync/atomic"
"sync"

"github.com/matrixorigin/matrixone/pkg/common/malloc"
"github.com/matrixorigin/matrixone/pkg/fileservice/fscache"
)

type Bytes struct {
mu sync.Mutex
bytes []byte
deallocator malloc.Deallocator
deallocated uint32
_refs atomic.Int32
refs *atomic.Int32
refs int32
}

func (b *Bytes) Size() int64 {
b.mu.Lock()
defer b.mu.Unlock()
if b.bytes == nil {
panic("fileservice.Bytes.Size() buffer already deallocated")
}
return int64(len(b.bytes))
}

func (b *Bytes) Bytes() []byte {
b.mu.Lock()
defer b.mu.Unlock()
if b.bytes == nil {
panic("fileservice.Bytes.Bytes() buffer already deallocated")
}
return b.bytes
}

func (b *Bytes) Slice(length int) fscache.Data {
b.mu.Lock()
defer b.mu.Unlock()
if b.bytes == nil {
panic("fileservice.Bytes.Slice() buffer already deallocated")
}
b.bytes = b.bytes[:length]
return b
}

func (b *Bytes) Retain() {
if b.refs != nil {
b.refs.Add(1)
b.mu.Lock()
defer b.mu.Unlock()

if b.bytes == nil {
panic("fileservice.Bytes.Retain() buffer already deallocated")
}

b.refs += 1
}

func (b *Bytes) Release() {
if b.refs != nil {
if n := b.refs.Add(-1); n == 0 {
if b.deallocator != nil &&
atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) {
b.deallocator.Deallocate(malloc.NoHints)
}
}
} else {
if b.deallocator != nil &&
atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) {
func (b *Bytes) Release() bool {
b.mu.Lock()
defer b.mu.Unlock()

if b.bytes == nil {
panic("fileservice.Bytes.Release() double free")
}

b.refs -= 1
if b.refs == 0 {
if b.deallocator != nil {
b.deallocator.Deallocate(malloc.NoHints)
b.bytes = nil
return true
}
}
return false
}

type bytesAllocator struct {
Expand All @@ -79,9 +101,8 @@ func (b *bytesAllocator) allocateCacheData(size int, hints malloc.Hints) fscache
bytes := &Bytes{
bytes: slice,
deallocator: dec,
refs: 1,
}
bytes._refs.Store(1)
bytes.refs = &bytes._refs
return bytes
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/fileservice/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,28 @@ func TestBytes(t *testing.T) {
bs := &Bytes{
bytes: bytes,
deallocator: deallocator,
refs: 1,
}
bs.Release()
})
}

func TestBytesPanic(t *testing.T) {
bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints)
assert.Nil(t, err)
bs := &Bytes{
bytes: bytes,
deallocator: deallocator,
refs: 1,
}

released := bs.Release()
assert.Equal(t, released, true)

assert.Panics(t, func() { bs.Release() }, "Bytes.Release panic()")
assert.Panics(t, func() { bs.Size() }, "Bytes.Size panic()")
assert.Panics(t, func() { bs.Bytes() }, "Bytes.Bytes panic()")
assert.Panics(t, func() { bs.Slice(0) }, "Bytes.Slice panic()")
assert.Panics(t, func() { bs.Retain() }, "Bytes.Retain panic()")

}
1 change: 1 addition & 0 deletions pkg/fileservice/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type CacheConfig struct {
RemoteCacheEnabled bool `toml:"remote-cache-enabled"`
RPC morpc.Config `toml:"rpc"`
CheckOverlaps bool `toml:"check-overlaps"`
DisableS3Fifo bool `toml:"disable-s3fifo"`

QueryClient client.QueryClient `json:"-"`
KeyRouterFactory KeyRouterFactory[pb.CacheKey] `json:"-"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/fileservice/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_readCache(t *testing.T) {
slowCacheReadThreshold = time.Second

size := int64(128)
m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "")
m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "", false)
defer m.Close(ctx)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3)
Expand Down
2 changes: 2 additions & 0 deletions pkg/fileservice/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewDiskCache(
asyncLoad bool,
cacheDataAllocator CacheDataAllocator,
name string,
disable_s3fifo bool,
) (ret *DiskCache, err error) {

err = os.MkdirAll(path, 0755)
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewDiskCache(
)
}
},
disable_s3fifo,
),
}
ret.updatingPaths.Cond = sync.NewCond(new(sync.Mutex))
Expand Down
24 changes: 14 additions & 10 deletions pkg/fileservice/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestDiskCache(t *testing.T) {
}

// new
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -143,7 +143,7 @@ func TestDiskCache(t *testing.T) {
testRead(cache)

// new cache instance and read
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand All @@ -152,7 +152,7 @@ func TestDiskCache(t *testing.T) {
assert.Equal(t, 1, numWritten)

// new cache instance and update
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand All @@ -178,7 +178,7 @@ func TestDiskCacheWriteAgain(t *testing.T) {
})
ctx = OnDiskCacheEvict(ctx, nil) // for coverage

cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -243,7 +243,7 @@ func TestDiskCacheWriteAgain(t *testing.T) {
func TestDiskCacheFileCache(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -303,7 +303,7 @@ func TestDiskCacheDirSize(t *testing.T) {

dir := t.TempDir()
capacity := 1 << 20
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -366,6 +366,7 @@ func benchmarkDiskCacheWriteThenRead(
false,
nil,
"",
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -461,6 +462,7 @@ func benchmarkDiskCacheReadRandomOffsetAtLargeFile(
false,
nil,
"",
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -532,6 +534,7 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) {
false,
nil,
"",
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -603,7 +606,7 @@ func TestDiskCacheClearFiles(t *testing.T) {
assert.Nil(t, err)
numFiles := len(files)

_, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
_, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)

files, err = filepath.Glob(filepath.Join(dir, "*"))
Expand All @@ -617,7 +620,7 @@ func TestDiskCacheClearFiles(t *testing.T) {
func TestDiskCacheBadWrite(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)

written, err := cache.writeFile(
Expand Down Expand Up @@ -652,6 +655,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) {
false,
nil,
"test",
false,
)
assert.Nil(t, err)
defer cache.Close(ctx)
Expand Down Expand Up @@ -684,7 +688,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) {

func TestDiskCacheSetFromFile(t *testing.T) {
ctx := context.Background()
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "")
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "", false)
require.Nil(t, err)
defer cache.Close(ctx)

Expand All @@ -709,7 +713,7 @@ func TestDiskCacheSetFromFile(t *testing.T) {

func TestDiskCacheQuotaExceeded(t *testing.T) {
ctx := context.Background()
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "")
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "", false)
require.Nil(t, err)
defer cache.Close(ctx)

Expand Down
Loading
Loading