40
40
#include " cache/utils/bthread.h"
41
41
#include " cache/utils/context.h"
42
42
#include " cache/utils/helper.h"
43
+ #include " cache/utils/inflight_tracker.h"
43
44
#include " cache/utils/step_timer.h"
44
45
#include " common/io_buffer.h"
45
46
#include " common/status.h"
@@ -83,7 +84,8 @@ BlockCacheImpl::BlockCacheImpl(StoragePoolSPtr storage_pool)
83
84
: running_(false ),
84
85
storage_pool_ (storage_pool),
85
86
joiner_(std::make_unique<BthreadJoiner>()),
86
- inflight_tracker_(std::make_unique<InflightTracker>(1024 )) {
87
+ inflight_cache_(std::make_shared<InflightTracker>(1024 )),
88
+ inflight_prefetch_(std::make_shared<InflightTracker>(1024 )) {
87
89
if (HasCacheStore ()) {
88
90
store_ = std::make_shared<DiskCacheGroup>(ParseDiskCacheOption ());
89
91
} else {
@@ -99,7 +101,8 @@ Status BlockCacheImpl::Start() {
99
101
CHECK_NOTNULL (store_);
100
102
CHECK_NOTNULL (uploader_);
101
103
CHECK_NOTNULL (joiner_);
102
- CHECK_NOTNULL (inflight_tracker_);
104
+ CHECK_NOTNULL (inflight_cache_);
105
+ CHECK_NOTNULL (inflight_prefetch_);
103
106
104
107
if (running_) {
105
108
return Status::OK ();
@@ -313,13 +316,25 @@ void BlockCacheImpl::AsyncCache(ContextSPtr ctx, const BlockKey& key,
313
316
CacheOption option) {
314
317
CHECK_RUNNING (" Block cache" );
315
318
316
- auto * self = GetSelfPtr () ;
317
- auto tid = RunInBthread ([self, ctx, key, block, cb, option]() {
318
- Status status = self-> Cache (ctx, key, block, option);
319
+ auto inflight_tracker = inflight_cache_ ;
320
+ auto status = inflight_tracker-> Add ( key. Filename ());
321
+ if ( status. IsExist ()) {
319
322
if (cb) {
320
323
cb (status);
321
324
}
322
- });
325
+ return ;
326
+ }
327
+
328
+ auto * self = GetSelfPtr ();
329
+ auto tid =
330
+ RunInBthread ([inflight_tracker, self, ctx, key, block, cb, option]() {
331
+ Status status = self->Cache (ctx, key, block, option);
332
+ if (cb) {
333
+ cb (status);
334
+ }
335
+
336
+ inflight_tracker->Remove (key.Filename ());
337
+ });
323
338
324
339
if (tid != 0 ) {
325
340
joiner_->BackgroundJoin (tid);
@@ -331,7 +346,8 @@ void BlockCacheImpl::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key,
331
346
PrefetchOption option) {
332
347
CHECK_RUNNING (" Block cache" );
333
348
334
- auto status = inflight_tracker_->Add (key.Filename ());
349
+ auto inflight_tracker = inflight_prefetch_;
350
+ auto status = inflight_tracker->Add (key.Filename ());
335
351
if (status.IsExist ()) {
336
352
if (cb) {
337
353
cb (status);
@@ -340,14 +356,15 @@ void BlockCacheImpl::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key,
340
356
}
341
357
342
358
auto * self = GetSelfPtr ();
343
- auto tid = RunInBthread ([&, self, ctx, key, length, cb, option]() {
344
- Status status = self->Prefetch (ctx, key, length, option);
345
- if (cb) {
346
- cb (status);
347
- }
359
+ auto tid =
360
+ RunInBthread ([inflight_tracker, self, ctx, key, length, cb, option]() {
361
+ Status status = self->Prefetch (ctx, key, length, option);
362
+ if (cb) {
363
+ cb (status);
364
+ }
348
365
349
- inflight_tracker_ ->Remove (key.Filename ());
350
- });
366
+ inflight_tracker ->Remove (key.Filename ());
367
+ });
351
368
352
369
if (tid != 0 ) {
353
370
joiner_->BackgroundJoin (tid);
0 commit comments