@@ -92,6 +92,7 @@ type DB struct {
9292 pub * publisher
9393 registry * KeyRegistry
9494 blockCache * ristretto.Cache
95+ bfCache * ristretto.Cache
9596}
9697
9798const (
@@ -234,33 +235,35 @@ func Open(opt Options) (db *DB, err error) {
234235 if err := createDirs (opt ); err != nil {
235236 return nil , err
236237 }
237- dirLockGuard , err = acquireDirectoryLock (opt .Dir , lockFile , opt .ReadOnly )
238- if err != nil {
239- return nil , err
240- }
241- defer func () {
242- if dirLockGuard != nil {
243- _ = dirLockGuard .release ()
244- }
245- }()
246- absDir , err := filepath .Abs (opt .Dir )
247- if err != nil {
248- return nil , err
249- }
250- absValueDir , err := filepath .Abs (opt .ValueDir )
251- if err != nil {
252- return nil , err
253- }
254- if absValueDir != absDir {
255- valueDirLockGuard , err = acquireDirectoryLock (opt .ValueDir , lockFile , opt .ReadOnly )
238+ if ! opt .BypassLockGuard {
239+ dirLockGuard , err = acquireDirectoryLock (opt .Dir , lockFile , opt .ReadOnly )
256240 if err != nil {
257241 return nil , err
258242 }
259243 defer func () {
260- if valueDirLockGuard != nil {
261- _ = valueDirLockGuard .release ()
244+ if dirLockGuard != nil {
245+ _ = dirLockGuard .release ()
262246 }
263247 }()
248+ absDir , err := filepath .Abs (opt .Dir )
249+ if err != nil {
250+ return nil , err
251+ }
252+ absValueDir , err := filepath .Abs (opt .ValueDir )
253+ if err != nil {
254+ return nil , err
255+ }
256+ if absValueDir != absDir {
257+ valueDirLockGuard , err = acquireDirectoryLock (opt .ValueDir , lockFile , opt .ReadOnly )
258+ if err != nil {
259+ return nil , err
260+ }
261+ defer func () {
262+ if valueDirLockGuard != nil {
263+ _ = valueDirLockGuard .release ()
264+ }
265+ }()
266+ }
264267 }
265268 }
266269
@@ -302,13 +305,28 @@ func Open(opt Options) (db *DB, err error) {
302305 }
303306 db .blockCache , err = ristretto .NewCache (& config )
304307 if err != nil {
305- return nil , errors .Wrap (err , "failed to create cache" )
308+ return nil , errors .Wrap (err , "failed to create data cache" )
309+ }
310+ }
311+
312+ if opt .MaxBfCacheSize > 0 {
313+ config := ristretto.Config {
314+ // Use 5% of cache memory for storing counters.
315+ NumCounters : int64 (float64 (opt .MaxBfCacheSize ) * 0.05 * 2 ),
316+ MaxCost : int64 (float64 (opt .MaxBfCacheSize ) * 0.95 ),
317+ BufferItems : 64 ,
318+ Metrics : true ,
319+ }
320+ db .blockCache , err = ristretto .NewCache (& config )
321+ if err != nil {
322+ return nil , errors .Wrap (err , "failed to create bf cache" )
306323 }
307324 }
308325
309326 if db .opt .InMemory {
310327 db .opt .SyncWrites = false
311- db .opt .ValueThreshold = maxValueThreshold
328+ // If badger is running in memory mode, push everything into the LSM Tree.
329+ db .opt .ValueThreshold = math .MaxInt32
312330 }
313331 krOpt := KeyRegistryOptions {
314332 ReadOnly : opt .ReadOnly ,
@@ -331,6 +349,9 @@ func Open(opt Options) (db *DB, err error) {
331349 return nil , err
332350 }
333351
352+ // Initialize vlog struct.
353+ db .vlog .init (db )
354+
334355 if ! opt .ReadOnly {
335356 db .closers .compactors = y .NewCloser (1 )
336357 db .lc .startCompact (db .closers .compactors )
@@ -387,14 +408,22 @@ func Open(opt Options) (db *DB, err error) {
387408 return db , nil
388409}
389410
390- // CacheMetrics returns the metrics for the underlying cache.
391- func (db * DB ) CacheMetrics () * ristretto.Metrics {
411+ // DataCacheMetrics returns the metrics for the underlying data cache.
412+ func (db * DB ) DataCacheMetrics () * ristretto.Metrics {
392413 if db .blockCache != nil {
393414 return db .blockCache .Metrics
394415 }
395416 return nil
396417}
397418
419+ // BfCacheMetrics returns the metrics for the underlying bloom filter cache.
420+ func (db * DB ) BfCacheMetrics () * ristretto.Metrics {
421+ if db .bfCache != nil {
422+ return db .bfCache .Metrics
423+ }
424+ return nil
425+ }
426+
398427// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
399428// disk. Calling DB.Close() multiple times would still only close the DB once.
400429func (db * DB ) Close () error {
@@ -484,6 +513,7 @@ func (db *DB) close() (err error) {
484513 db .closers .updateSize .SignalAndWait ()
485514 db .orc .Stop ()
486515 db .blockCache .Close ()
516+ db .bfCache .Close ()
487517
488518 db .elog .Finish ()
489519 if db .opt .InMemory {
@@ -951,6 +981,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
951981 bopts .DataKey = dk
952982 // Builder does not need cache but the same options are used for opening table.
953983 bopts .Cache = db .blockCache
984+ bopts .BfCache = db .bfCache
954985 tableData := buildL0Table (ft , bopts )
955986
956987 fileID := db .lc .reserveFileID ()
@@ -1509,6 +1540,7 @@ func (db *DB) dropAll() (func(), error) {
15091540 db .lc .nextFileID = 1
15101541 db .opt .Infof ("Deleted %d value log files. DropAll done.\n " , num )
15111542 db .blockCache .Clear ()
1543+ db .bfCache .Clear ()
15121544
15131545 return resume , nil
15141546}
@@ -1576,9 +1608,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, prefixes
15761608 if cb == nil {
15771609 return ErrNilCallback
15781610 }
1579- if len (prefixes ) == 0 {
1580- return ErrNoPrefixes
1581- }
1611+
15821612 c := y .NewCloser (1 )
15831613 recvCh , id := db .pub .newSubscriber (c , prefixes ... )
15841614 slurp := func (batch * pb.KVList ) error {
@@ -1612,7 +1642,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, prefixes
16121642 err := slurp (batch )
16131643 if err != nil {
16141644 c .Done ()
1615- // Delete the subsriber if there is an error by the callback.
1645+ // Delete the subscriber if there is an error by the callback.
16161646 db .pub .deleteSubscriber (id )
16171647 return err
16181648 }
0 commit comments