Skip to content

Commit 434742d

Browse files
HayoungOh5soonkuk
authored andcommitted
fix: reduce memory by avoiding clone in Iter and adapt callers
1 parent edbec33 commit 434742d

File tree

4 files changed

+148
-143
lines changed

4 files changed

+148
-143
lines changed

isaac/database/center.go

Lines changed: 44 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -777,85 +777,74 @@ func (db *Center) cleanRemoved(limit int) error {
777777

778778
func loadTemp(
779779
st *leveldbstorage.Storage,
780-
height base.Height,
780+
h base.Height,
781781
encs *encoder.Encoders,
782782
enc encoder.Encoder,
783783
) (isaac.TempDatabase, error) {
784+
784785
r := &leveldbutil.Range{
785-
Start: emptyPrefixStoragePrefixByHeight(leveldbLabelBlockWrite, height), //nolint:gomnd //...
786-
Limit: emptyPrefixStoragePrefixByHeight(leveldbLabelBlockWrite, height+1), //nolint:gomnd //...
786+
Start: emptyPrefixStoragePrefixByHeight(leveldbLabelBlockWrite, h),
787+
Limit: emptyPrefixStoragePrefixByHeight(leveldbLabelBlockWrite, h+1),
788+
}
789+
790+
const delBufCap = 128
791+
var delBuf = make([][]byte, 0, delBufCap)
792+
793+
flushDeletes := func() error {
794+
if len(delBuf) == 0 {
795+
return nil
796+
}
797+
for _, p := range delBuf {
798+
if err := leveldbstorage.RemoveByPrefix(st, p); err != nil {
799+
return err
800+
}
801+
}
802+
delBuf = delBuf[:0]
803+
return nil
787804
}
788805

789-
var lastprefix []byte
790-
var prefixes [][]byte
806+
var lastPrefix []byte
807+
var found isaac.TempDatabase
791808

792809
if err := st.Iter(
793810
r,
794-
func(key, _ []byte) (bool, error) {
795-
k, err := prefixStoragePrefixFromKey(key)
796-
if err != nil {
811+
func(k, _ []byte) (bool, error) {
812+
prefix, err := prefixStoragePrefixFromKey(k)
813+
if err != nil || bytes.Equal(prefix, lastPrefix) {
797814
return true, nil
798815
}
816+
lastPrefix = prefix
799817

800-
if bytes.Equal(k, lastprefix) {
818+
if found != nil {
819+
delBuf = append(delBuf, prefix)
820+
if len(delBuf) >= delBufCap {
821+
return false, flushDeletes()
822+
}
801823
return true, nil
802824
}
803825

804-
lastprefix = k
826+
temp, err := NewTempLeveldbFromPrefix(st, prefix, encs, enc)
827+
if err != nil {
828+
delBuf = append(delBuf, prefix)
829+
return true, nil
830+
}
805831

806-
prefixes = append(prefixes, k)
832+
isMerged, err := temp.isMerged()
833+
if err != nil || !isMerged {
834+
delBuf = append(delBuf, prefix)
835+
return true, nil
836+
}
807837

838+
found = temp
808839
return true, nil
809840
},
810841
false,
811842
); err != nil {
812843
return nil, err
813844
}
814845

815-
if len(prefixes) < 1 {
816-
return nil, nil
817-
}
818-
819-
var useless [][]byte
820-
821-
var found isaac.TempDatabase
822-
823-
for i := range prefixes {
824-
prefix := prefixes[i]
825-
826-
if found != nil {
827-
useless = append(useless, prefix)
828-
829-
continue
830-
}
831-
832-
temp, err := NewTempLeveldbFromPrefix(st, prefix, encs, enc)
833-
if err != nil {
834-
useless = append(useless, prefix)
835-
836-
continue
837-
}
838-
839-
switch ismerged, err := temp.isMerged(); {
840-
case err != nil:
841-
useless = append(useless, prefix)
842-
843-
continue
844-
case !ismerged:
845-
useless = append(useless, prefix)
846-
847-
continue
848-
default:
849-
found = temp
850-
}
851-
}
852-
853-
if len(useless) > 0 {
854-
for i := range useless {
855-
if err := leveldbstorage.RemoveByPrefix(st, useless[i]); err != nil {
856-
return nil, err
857-
}
858-
}
846+
if err := flushDeletes(); err != nil {
847+
return nil, err
859848
}
860849

861850
return found, nil

isaac/database/pool.go

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -982,74 +982,90 @@ func (db *TempPool) cleanByHeight(
982982
deep int,
983983
keyf func(*leveldbstorage.PrefixStorageBatch, []byte, []byte),
984984
) (int, error) {
985+
985986
pst, err := db.st()
986987
if err != nil {
987988
return 0, err
988989
}
989990

990-
top := base.NilHeight
991-
992-
var keys [][3]interface{}
993-
994-
_ = pst.Iter(
991+
var top base.Height
992+
err = pst.Iter(
995993
leveldbutil.BytesPrefix(prefix[:]),
996-
func(key, b []byte) (bool, error) {
997-
i, err := heightFromKey(key, prefix)
998-
if err != nil {
999-
keys = append(keys, [3]interface{}{key, b, nil})
1000-
1001-
return true, nil
1002-
}
1003-
1004-
if i > top {
1005-
top = i
994+
func(k, _ []byte) (bool, error) {
995+
h, e := heightFromKey(k, prefix)
996+
if e != nil {
997+
return false, e
1006998
}
1007-
1008-
keys = append(keys, [3]interface{}{key, b, i})
1009-
1010-
return true, nil
999+
top = h
1000+
return false, nil
10111001
},
10121002
false,
10131003
)
1014-
1015-
height := top
1016-
1017-
switch {
1018-
case len(keys) < 1:
1019-
return 0, nil
1020-
case top-3 < base.GenesisHeight:
1004+
if err != nil {
1005+
return 0, err
1006+
}
1007+
if top == base.NilHeight {
10211008
return 0, nil
1022-
default:
1023-
for range make([]int, deep) {
1024-
height = height.SafePrev()
1009+
}
1010+
1011+
cutoff := top
1012+
for i := 0; i < deep; i++ {
1013+
cutoff = cutoff.SafePrev()
1014+
if cutoff < base.GenesisHeight {
1015+
return 0, nil
10251016
}
10261017
}
10271018

1019+
const recLimit = 10_000 // Record count threshold (tunable)
1020+
const byteLimit = 4 << 20 // 4MiB threshold (tunable)
1021+
10281022
batch := pst.NewBatch()
10291023
defer batch.Reset()
10301024

1031-
var removed int
1025+
var removed, recCnt, byteCnt int
10321026

1033-
for i := range keys {
1034-
key, b, j := keys[i][0].([]byte), keys[i][1].([]byte), keys[i][2] //nolint:forcetypeassert //...
1035-
if j != nil && j.(base.Height) > height { //nolint:forcetypeassert //...
1036-
continue
1037-
}
1027+
err = pst.Iter(
1028+
leveldbutil.BytesPrefix(prefix[:]),
1029+
func(k, v []byte) (bool, error) {
10381030

1039-
batch.Delete(key)
1031+
h, e := heightFromKey(k, prefix)
1032+
if e != nil || h > cutoff {
1033+
return true, e
1034+
}
10401035

1041-
if keyf != nil {
1042-
keyf(batch, key, b)
1043-
}
1036+
batch.Delete(k)
1037+
1038+
if keyf != nil {
1039+
keyf(batch, k, v)
1040+
}
1041+
1042+
recCnt++
1043+
byteCnt += len(k) + len(v)
10441044

1045-
removed++
1045+
if recCnt >= recLimit || byteCnt >= byteLimit {
1046+
if err := pst.Batch(batch, nil); err != nil {
1047+
return false, err
1048+
}
1049+
removed += recCnt
1050+
batch.Reset()
1051+
recCnt, byteCnt = 0, 0
1052+
}
1053+
return true, nil
1054+
},
1055+
true,
1056+
)
1057+
if err != nil {
1058+
return removed, err
10461059
}
10471060

1048-
if batch.Len() < 1 {
1049-
return removed, nil
1061+
if recCnt > 0 {
1062+
if err := pst.Batch(batch, nil); err != nil {
1063+
return removed, err
1064+
}
1065+
removed += recCnt
10501066
}
10511067

1052-
return removed, pst.Batch(batch, nil)
1068+
return removed, nil
10531069
}
10541070

10551071
func (db *TempPool) opFromCache(h util.Hash) (base.Operation, bool) {

storage/leveldb/db.go

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package leveldbstorage
22

33
import (
4-
"bytes"
54
"context"
65
"sync"
76

@@ -106,47 +105,32 @@ func (st *Storage) Exists(key []byte) (bool, error) {
106105

107106
func (st *Storage) Iter(
108107
r *leveldbutil.Range,
109-
callback func(key []byte, raw []byte) (bool, error),
110-
sort bool, // NOTE if true, ascend order
108+
callback func(key, val []byte) (bool, error),
109+
ascend bool,
111110
) error {
112111
db, err := st.db()
113112
if err != nil {
114113
return err
115114
}
116-
117115
iter := db.NewIterator(r, nil)
118116
defer iter.Release()
119117

120-
var seek func() bool
121-
var next func() bool
122-
if sort {
123-
seek = iter.First
124-
next = iter.Next
125-
} else {
126-
seek = iter.Last
127-
next = iter.Prev
128-
}
129-
130-
if !seek() {
131-
return nil
118+
seek, next := iter.Last, iter.Prev
119+
if ascend {
120+
seek, next = iter.First, iter.Next
132121
}
133-
134-
end:
135-
for {
136-
switch keep, err := callback(bytes.Clone(iter.Key()), bytes.Clone(iter.Value())); {
137-
case err != nil:
122+
for ok := seek(); ok; ok = next() {
123+
keep, err := callback(iter.Key(), iter.Value())
124+
if err != nil {
138125
return err
139-
case !keep:
140-
break end
141-
case !next():
142-
break end
126+
}
127+
if !keep {
128+
break
143129
}
144130
}
145-
146131
if err := iter.Error(); err != nil {
147132
return storage.ErrExec.Errorf("iter")
148133
}
149-
150134
return nil
151135
}
152136

@@ -315,51 +299,51 @@ func BatchRemove(st *Storage, r *leveldbutil.Range, limit int) (int, error) {
315299
return 0, err
316300
}
317301

318-
var removed int
319-
320-
var batch leveldb.Batch
302+
var (
303+
removed int
304+
batch leveldb.Batch
305+
)
321306
defer batch.Reset()
322307

323308
if r == nil {
324309
r = &leveldbutil.Range{}
325310
}
326-
327311
start := r.Start
328312

329313
for {
330314
r.Start = start
315+
var nextStart []byte
331316

332317
if err := st.Iter(
333318
r,
334319
func(key, _ []byte) (bool, error) {
335320
if batch.Len() == limit {
336-
start = key
337-
321+
// Copy the key only for one moment when the limit is reached
322+
nextStart = append(nextStart[:0], key...)
338323
return false, nil
339324
}
340-
341325
batch.Delete(key)
342-
343326
return true, nil
344327
},
345328
true,
346329
); err != nil {
347330
return removed, err
348331
}
349332

350-
if batch.Len() < 1 {
333+
if batch.Len() == 0 {
351334
break
352335
}
353-
354336
if err := st.Batch(&batch, nil); err != nil {
355337
return removed, err
356338
}
357-
358339
removed += batch.Len()
359-
360340
batch.Reset()
361-
}
362341

342+
if nextStart == nil {
343+
break
344+
}
345+
start = nextStart
346+
}
363347
return removed, nil
364348
}
365349

0 commit comments

Comments
 (0)