diff --git a/confv2/client.conf b/confv2/client.conf index fd61bd9b3..ab9382ec1 100644 --- a/confv2/client.conf +++ b/confv2/client.conf @@ -42,6 +42,9 @@ disk_state.check_duration_ms=3000 remote_cache.cache_group= remote_cache.mds_version=v2 remote_cache.mds_addrs=127.0.0.1:6900 +remote_cache.mds_rpc_timeout_ms=3000 +remote_cache.mds_rpc_retry_times=1 +remote_cache.mds_request_retry_times=3 remote_cache.load_members_interval_ms=1000 remote_cache.fill_group_cache=true diff --git a/src/cache/CMakeLists.txt b/src/cache/CMakeLists.txt index 5ed035c54..d68011039 100644 --- a/src/cache/CMakeLists.txt +++ b/src/cache/CMakeLists.txt @@ -20,7 +20,7 @@ add_subdirectory(blockcache) add_subdirectory(cachegroup) add_subdirectory(remotecache) add_subdirectory(tiercache) -#add_subdirectory(benchmark) +add_subdirectory(benchmark) file(GLOB CACHE_LIB_SRCS "dingo_cache.cpp" diff --git a/src/cache/benchmark/CMakeLists.txt b/src/cache/benchmark/CMakeLists.txt index bfa20cc9e..71d1184c5 100644 --- a/src/cache/benchmark/CMakeLists.txt +++ b/src/cache/benchmark/CMakeLists.txt @@ -13,11 +13,21 @@ # limitations under the License. # Define the BASE_FLAGS and DINGO_DEFAULT_COPTS variables -file(GLOB BENCHMARK_LIB_SRCS - "*.cpp" +SET(BENCHMARK_LIB_SRCS + "benchmarker.cpp" + "collector.cpp" + "factory.cpp" + "option.cpp" + "reporter.cpp" + "worker.cpp" ) -add_executable(cache-bench ${BENCHMARK_LIB_SRCS}) -target_link_libraries(cache-bench - cache_lib +add_library(cache_benchmark ${BENCHMARK_LIB_SRCS}) +target_link_libraries(cache_benchmark + cache_utils + cache_common + cache_storage ) + +add_executable(cache-bench main.cpp) +target_link_libraries(cache-bench cache_benchmark) diff --git a/src/cache/blockcache/block_cache_impl.cpp b/src/cache/blockcache/block_cache_impl.cpp index 42970408a..8901bbb0b 100644 --- a/src/cache/blockcache/block_cache_impl.cpp +++ b/src/cache/blockcache/block_cache_impl.cpp @@ -40,6 +40,7 @@ #include "cache/utils/bthread.h" #include "cache/utils/context.h" #include "cache/utils/helper.h" +#include "cache/utils/inflight_tracker.h" #include "cache/utils/step_timer.h" #include "common/io_buffer.h" #include "common/status.h" @@ -82,7 +83,9 @@ BlockCacheImpl::BlockCacheImpl(StorageSPtr storage) BlockCacheImpl::BlockCacheImpl(StoragePoolSPtr storage_pool) : running_(false), storage_pool_(storage_pool), - joiner_(std::make_unique()) { + joiner_(std::make_unique()), + inflight_cache_(std::make_shared(1024)), + inflight_prefetch_(std::make_shared(1024)) { if (HasCacheStore()) { store_ = std::make_shared(ParseDiskCacheOption()); } else { @@ -98,6 +101,8 @@ Status BlockCacheImpl::Start() { CHECK_NOTNULL(store_); CHECK_NOTNULL(uploader_); CHECK_NOTNULL(joiner_); + CHECK_NOTNULL(inflight_cache_); + CHECK_NOTNULL(inflight_prefetch_); if (running_) { return Status::OK(); @@ -248,6 +253,8 @@ Status BlockCacheImpl::Prefetch(ContextSPtr ctx, const BlockKey& key, if (IsCached(key)) { return Status::OK(); + } else if (store_->IsFull(key)) { + return Status::CacheFull("disk cache is full"); } NEXT_STEP("s3_range"); @@ -309,13 +316,25 @@ void BlockCacheImpl::AsyncCache(ContextSPtr ctx, const BlockKey& key, CacheOption option) { CHECK_RUNNING("Block cache"); - auto* self = GetSelfPtr(); - auto tid = RunInBthread([self, ctx, key, block, cb, option]() { - Status status = self->Cache(ctx, key, block, option); + auto inflight_tracker = inflight_cache_; + auto status = inflight_tracker->Add(key.Filename()); + if (status.IsExist()) { if (cb) { cb(status); } - }); + return; + } + + auto* self = GetSelfPtr(); + auto tid = + RunInBthread([inflight_tracker, self, ctx, key, block, cb, option]() { + Status status = self->Cache(ctx, key, block, option); + if (cb) { + cb(status); + } + + inflight_tracker->Remove(key.Filename()); + }); if (tid != 0) { joiner_->BackgroundJoin(tid); @@ -327,13 +346,25 @@ void BlockCacheImpl::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key, PrefetchOption option) { CHECK_RUNNING("Block cache"); - auto* self = GetSelfPtr(); - auto tid = RunInBthread([self, ctx, key, length, cb, option]() { - Status status = self->Prefetch(ctx, key, length, option); + auto inflight_tracker = inflight_prefetch_; + auto status = inflight_tracker->Add(key.Filename()); + if (status.IsExist()) { if (cb) { cb(status); } - }); + return; + } + + auto* self = GetSelfPtr(); + auto tid = + RunInBthread([inflight_tracker, self, ctx, key, length, cb, option]() { + Status status = self->Prefetch(ctx, key, length, option); + if (cb) { + cb(status); + } + + inflight_tracker->Remove(key.Filename()); + }); if (tid != 0) { joiner_->BackgroundJoin(tid); @@ -351,7 +382,7 @@ Status BlockCacheImpl::StoragePut(ContextSPtr ctx, const BlockKey& key, return status; } - status = storage->Upload(ctx, key, block); + status = storage->Put(ctx, key, block); if (!status.ok()) { GENERIC_LOG_UPLOAD_ERROR(); } @@ -370,7 +401,7 @@ Status BlockCacheImpl::StorageRange(ContextSPtr ctx, const BlockKey& key, return status; } - status = storage->Download(ctx, key, offset, length, buffer); + status = storage->Range(ctx, key, offset, length, buffer); if (!status.ok()) { GENERIC_LOG_DOWNLOAD_ERROR(); } diff --git a/src/cache/blockcache/block_cache_impl.h b/src/cache/blockcache/block_cache_impl.h index f8fb59bfa..45bf45bec 100644 --- a/src/cache/blockcache/block_cache_impl.h +++ b/src/cache/blockcache/block_cache_impl.h @@ -28,6 +28,7 @@ #include "cache/storage/storage.h" #include "cache/storage/storage_pool.h" #include "cache/utils/context.h" +#include "cache/utils/inflight_tracker.h" namespace dingofs { namespace cache { @@ -85,6 +86,8 @@ class BlockCacheImpl final : public BlockCache { CacheStoreSPtr store_; BlockCacheUploaderSPtr uploader_; BthreadJoinerUPtr joiner_; + InflightTrackerSPtr inflight_cache_; + InflightTrackerSPtr inflight_prefetch_; }; } // namespace cache diff --git a/src/cache/blockcache/block_cache_uploader.cpp b/src/cache/blockcache/block_cache_uploader.cpp index 9b0a2e260..ab3bac4fd 100644 --- a/src/cache/blockcache/block_cache_uploader.cpp +++ b/src/cache/blockcache/block_cache_uploader.cpp @@ -229,7 +229,7 @@ Status BlockCacheUploader::Upload(const StagingBlock& staging_block, return status; } - status = storage->Upload(staging_block.ctx, key, Block(buffer)); + status = storage->Put(staging_block.ctx, key, Block(buffer)); if (!status.ok()) { LOG_CTX(ERROR) << "Upload staging block failed: key = " << key.Filename() << ", status = " << status.ToString(); diff --git a/src/cache/blockcache/cache_store.h b/src/cache/blockcache/cache_store.h index 9a2d2505f..d312472cc 100644 --- a/src/cache/blockcache/cache_store.h +++ b/src/cache/blockcache/cache_store.h @@ -163,6 +163,7 @@ class CacheStore { virtual std::string Id() const = 0; virtual bool IsRunning() const = 0; virtual bool IsCached(const BlockKey& key) const = 0; + virtual bool IsFull(const BlockKey& key) const = 0; }; using CacheStoreSPtr = std::shared_ptr; diff --git a/src/cache/blockcache/disk_cache.cpp b/src/cache/blockcache/disk_cache.cpp index 6fbe6d62a..5a697a2fc 100644 --- a/src/cache/blockcache/disk_cache.cpp +++ b/src/cache/blockcache/disk_cache.cpp @@ -191,7 +191,7 @@ Status DiskCache::CreateDirs() { GetProbeDir(), }; for (const auto& dir : dirs) { - auto status = FSHelper::MkDirs(dir); + auto status = FSUtil::MkDirs(dir); if (!status.ok()) { LOG(ERROR) << "Create directory failed: dir = " << dir << ", status = " << status.ToString(); @@ -204,12 +204,12 @@ Status DiskCache::CreateDirs() { Status DiskCache::LoadOrCreateLockFile() { std::string content; auto lock_path = GetLockPath(); - auto status = FSHelper::ReadFile(lock_path, &content); + auto status = FSUtil::ReadFile(lock_path, &content); if (status.ok()) { uuid_ = utils::TrimSpace(content); } else if (status.IsNotFound()) { uuid_ = utils::GenUuid(); - status = FSHelper::WriteFile(lock_path, uuid_); + status = FSUtil::WriteFile(lock_path, uuid_); } if (!status.ok()) { @@ -421,6 +421,8 @@ bool DiskCache::IsCached(const BlockKey& key) const { return false; } +bool DiskCache::IsFull(const BlockKey& /*key*/) const { return CacheFull(); } + // CheckStatus cache status: // 1. check running status (UP/DOWN) // 2. check disk healthy (HEALTHY/UNHEALTHY) diff --git a/src/cache/blockcache/disk_cache.h b/src/cache/blockcache/disk_cache.h index 172b4ac07..0def9d817 100644 --- a/src/cache/blockcache/disk_cache.h +++ b/src/cache/blockcache/disk_cache.h @@ -65,6 +65,7 @@ class DiskCache final : public CacheStore { std::string Id() const override; bool IsRunning() const override; bool IsCached(const BlockKey& key) const override; + bool IsFull(const BlockKey& key) const override; private: friend class Target; diff --git a/src/cache/blockcache/disk_cache_group.cpp b/src/cache/blockcache/disk_cache_group.cpp index 60bf29e3d..eb27035ff 100644 --- a/src/cache/blockcache/disk_cache_group.cpp +++ b/src/cache/blockcache/disk_cache_group.cpp @@ -153,6 +153,8 @@ Status DiskCacheGroup::Load(ContextSPtr ctx, const BlockKey& key, off_t offset, return status; } +std::string DiskCacheGroup::Id() const { return "disk_cache_group"; } + bool DiskCacheGroup::IsRunning() const { return running_.load(std::memory_order_relaxed); } @@ -163,7 +165,11 @@ bool DiskCacheGroup::IsCached(const BlockKey& key) const { return GetStore(key)->IsCached(key); } -std::string DiskCacheGroup::Id() const { return "disk_cache_group"; } +bool DiskCacheGroup::IsFull(const BlockKey& key) const { + DCHECK_RUNNING("Disk cache group"); + + return GetStore(key)->IsFull(key); +} std::vector DiskCacheGroup::CalcWeights( std::vector options) { diff --git a/src/cache/blockcache/disk_cache_group.h b/src/cache/blockcache/disk_cache_group.h index e8d6c17e5..a0c6a206a 100644 --- a/src/cache/blockcache/disk_cache_group.h +++ b/src/cache/blockcache/disk_cache_group.h @@ -53,6 +53,7 @@ class DiskCacheGroup final : public CacheStore { std::string Id() const override; bool IsRunning() const override; bool IsCached(const BlockKey& key) const override; + bool IsFull(const BlockKey& key) const override; private: static std::vector CalcWeights( diff --git a/src/cache/blockcache/disk_cache_loader.cpp b/src/cache/blockcache/disk_cache_loader.cpp index 8f50cbcd7..47ccf53a3 100644 --- a/src/cache/blockcache/disk_cache_loader.cpp +++ b/src/cache/blockcache/disk_cache_loader.cpp @@ -103,7 +103,7 @@ void DiskCacheLoader::LoadAllBlocks(const std::string& dir, BlockType type) { timer.start(); status = - FSHelper::Walk(dir, [&](const std::string& prefix, const FileInfo& file) { + FSUtil::Walk(dir, [&](const std::string& prefix, const FileInfo& file) { if (!running_.load(std::memory_order_relaxed)) { return Status::Abort("disk cache loader stopped"); } @@ -149,7 +149,7 @@ bool DiskCacheLoader::LoadOneBlock(const std::string& prefix, std::string path = Helper::PathJoin({prefix, name}); if (Helper::IsTempFilepath(name) || !key.ParseFromFilename(name)) { - auto status = FSHelper::RemoveFile(path); + auto status = FSUtil::RemoveFile(path); if (status.ok()) { LOG(INFO) << "Remove invalid block (path=" << path << ") success."; } else { diff --git a/src/cache/blockcache/disk_cache_manager.cpp b/src/cache/blockcache/disk_cache_manager.cpp index 24be67641..36690c728 100644 --- a/src/cache/blockcache/disk_cache_manager.cpp +++ b/src/cache/blockcache/disk_cache_manager.cpp @@ -189,7 +189,7 @@ void DiskCacheManager::CheckFreeSpace() { std::string root_dir = GetRootDir(); while (running_.load(std::memory_order_relaxed)) { - auto status = FSHelper::StatFS(root_dir, &stat); + auto status = FSUtil::StatFS(root_dir, &stat); if (!status.ok()) { LOG(ERROR) << "Check disk free space failed: dir = " << root_dir << ", status = " << status.ToString(); @@ -312,7 +312,7 @@ void DiskCacheManager::DeleteBlocks(const ToDel& to_del) { CacheKey key = item.key; CacheValue value = item.value; std::string cache_path = GetCachePath(key); - auto status = FSHelper::RemoveFile(cache_path); + auto status = FSUtil::RemoveFile(cache_path); if (status.IsNotFound()) { LOG(WARNING) << "Block file already deleted: path = " << cache_path; continue; diff --git a/src/cache/blockcache/disk_cache_watcher.cpp b/src/cache/blockcache/disk_cache_watcher.cpp index a377adaca..8723fb09f 100644 --- a/src/cache/blockcache/disk_cache_watcher.cpp +++ b/src/cache/blockcache/disk_cache_watcher.cpp @@ -89,7 +89,7 @@ void DiskCacheWatcher::WatchingWorker() { DiskCacheWatcher::Should DiskCacheWatcher::CheckTarget(Target* target) { std::string lock_path = target->GetLockPath(); - if (!FSHelper::FileExists(lock_path)) { // cache is down + if (!FSUtil::FileExists(lock_path)) { // cache is down return Should::kShutdown; } else if (target->IsRunning()) { // cache already up return Should::kDoNothing; @@ -102,7 +102,7 @@ DiskCacheWatcher::Should DiskCacheWatcher::CheckTarget(Target* target) { bool DiskCacheWatcher::CheckUuid(const std::string& lock_path, const std::string& uuid) { std::string content; - auto status = FSHelper::ReadFile(lock_path, &content); + auto status = FSUtil::ReadFile(lock_path, &content); if (!status.ok()) { LOG(ERROR) << "Read lock file failed: path = " << lock_path << ", status = " << status.ToString(); diff --git a/src/cache/blockcache/disk_state_health_checker.cpp b/src/cache/blockcache/disk_state_health_checker.cpp index ae80ad081..0c68fff5b 100644 --- a/src/cache/blockcache/disk_state_health_checker.cpp +++ b/src/cache/blockcache/disk_state_health_checker.cpp @@ -93,9 +93,9 @@ void DiskStateHealthChecker::ProbeDisk() { std::string content(100, '0'); std::string filepath = GetProbeFilepath(); - auto status = FSHelper::WriteFile(filepath, content); + auto status = FSUtil::WriteFile(filepath, content); if (status.ok()) { - status = FSHelper::ReadFile(filepath, &out); + status = FSUtil::ReadFile(filepath, &out); } if (!status.ok()) { @@ -107,7 +107,7 @@ void DiskStateHealthChecker::ProbeDisk() { } SetStatusPage(state_machine_->GetState()); - FSHelper::RemoveFile(filepath); + FSUtil::RemoveFile(filepath); } std::string DiskStateHealthChecker::GetProbeFilepath() const { diff --git a/src/cache/blockcache/mem_cache.h b/src/cache/blockcache/mem_cache.h index cec4d1c5f..e02d01955 100644 --- a/src/cache/blockcache/mem_cache.h +++ b/src/cache/blockcache/mem_cache.h @@ -61,6 +61,7 @@ class MemStore final : public CacheStore { std::string Id() const override { return "memory_cache"; } bool IsRunning() const override { return true; } bool IsCached(const BlockKey&) const override { return false; } + bool IsFull(const BlockKey&) const override { return true; } }; } // namespace cache diff --git a/src/cache/cachegroup/async_cacher.cpp b/src/cache/cachegroup/async_cacher.cpp deleted file mode 100644 index 9ed8f340c..000000000 --- a/src/cache/cachegroup/async_cacher.cpp +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: DingoFS - * Created Date: 2025-02-10 - * Author: Jingli Chen (Wine93) - */ - -#include "cache/cachegroup/async_cacher.h" - -#include "cache/common/macro.h" -#include "cache/utils/context.h" - -namespace dingofs { -namespace cache { - -AsyncCacherImpl::AsyncCacherImpl(BlockCacheSPtr block_cache) - : running_(false), block_cache_(block_cache), queue_id_({0}) {} - -Status AsyncCacherImpl::Start() { - CHECK_NOTNULL(block_cache_); - - if (running_) { - return Status::OK(); - } - - LOG(INFO) << "Async cacher is starting..."; - - bthread::ExecutionQueueOptions queue_options; - queue_options.use_pthread = true; - int rc = bthread::execution_queue_start(&queue_id_, &queue_options, - HandleTask, this); - if (rc != 0) { - LOG(ERROR) << "Start execution queue failed: rc = " << rc; - return Status::Internal("start execution queue failed"); - } - - running_ = true; - - LOG(INFO) << "Async cacher is up."; - - CHECK_RUNNING("Async cacher"); - return Status::OK(); -} - -Status AsyncCacherImpl::Shutdown() { - if (running_.exchange(false)) { - return Status::OK(); - } - - LOG(INFO) << "Async cacher is shutting down..."; - - if (bthread::execution_queue_stop(queue_id_) != 0) { - LOG(ERROR) << "Stop execution queue failed."; - return Status::Internal("stop execution queue failed"); - } else if (bthread::execution_queue_join(queue_id_) != 0) { - LOG(ERROR) << "Join execution queue failed."; - return Status::Internal("join execution queue failed"); - } - - LOG(INFO) << "Async cacher is down."; - - CHECK_DOWN("Async cacher"); - return Status::OK(); -} - -void AsyncCacherImpl::AsyncCache(ContextSPtr ctx, const BlockKey& block_key, - const Block& block) { - CHECK_EQ(0, - bthread::execution_queue_execute( - queue_id_, Task(NewContext(ctx->TraceId()), block_key, block))); -} - -// TODO: -// 1) MUST retrive the block which in async cache queue but not in disk -// instead of request storage -// 2) add option to lock the blocks which request storage at the same time -int AsyncCacherImpl::HandleTask(void* meta, bthread::TaskIterator& iter) { - if (iter.is_queue_stopped()) { - return 0; - } - - AsyncCacherImpl* self = static_cast(meta); - for (; iter; iter++) { - auto& task = *iter; - self->block_cache_->AsyncCache( - task.ctx, task.key, task.block, [task](Status status) { - if (!status.ok()) { - const auto& ctx = task.ctx; - LOG_CTX(ERROR) << "Async cache block failed: key = " - << task.key.Filename() - << ", status = " << status.ToString(); - } - }); - } - return 0; -} - -} // namespace cache -} // namespace dingofs diff --git a/src/cache/cachegroup/async_cacher.h b/src/cache/cachegroup/async_cacher.h deleted file mode 100644 index 861b973d8..000000000 --- a/src/cache/cachegroup/async_cacher.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: DingoFS - * Created Date: 2025-03-18 - * Author: Jingli Chen (Wine93) - */ - -#ifndef DINGOFS_SRC_CACHE_CACHEGROUP_ASYNC_CACHE_H_ -#define DINGOFS_SRC_CACHE_CACHEGROUP_ASYNC_CACHE_H_ - -#include - -#include "cache/blockcache/block_cache.h" -#include "cache/utils/context.h" - -namespace dingofs { -namespace cache { - -class AsyncCacher { - public: - virtual ~AsyncCacher() = default; - - virtual Status Start() = 0; - virtual Status Shutdown() = 0; - - virtual void AsyncCache(ContextSPtr ctx, const BlockKey& block_key, - const Block& block) = 0; -}; - -using AsyncCacherUPtr = std::unique_ptr; - -class AsyncCacherImpl final : public AsyncCacher { - public: - explicit AsyncCacherImpl(BlockCacheSPtr block_cache); - - Status Start() override; - Status Shutdown() override; - - void AsyncCache(ContextSPtr ctx, const BlockKey& block_key, - const Block& block) override; - - private: - struct Task { - Task(ContextSPtr ctx, BlockKey key, Block block) - : ctx(ctx), key(key), block(block) {} - - ContextSPtr ctx; - BlockKey key; - Block block; - }; - - static int HandleTask(void* meta, bthread::TaskIterator& iter); - - std::atomic running_; - BlockCacheSPtr block_cache_; - bthread::ExecutionQueueId queue_id_; -}; - -} // namespace cache -} // namespace dingofs - -#endif // DINGOFS_SRC_CACHE_CACHEGROUP_ASYNC_CACHE_H_ diff --git a/src/cache/cachegroup/cache_group_node.cpp b/src/cache/cachegroup/cache_group_node.cpp index b26d30abe..d6fe81edd 100644 --- a/src/cache/cachegroup/cache_group_node.cpp +++ b/src/cache/cachegroup/cache_group_node.cpp @@ -33,10 +33,11 @@ #include "cache/blockcache/block_cache.h" #include "cache/blockcache/block_cache_impl.h" +#include "cache/cachegroup/task_tracker.h" #include "cache/common/const.h" #include "cache/common/macro.h" -#include "cache/metric/cache_group_node_metric.h" #include "cache/utils/context.h" +#include "cache/utils/helper.h" #include "cache/utils/step_timer.h" #include "common/io_buffer.h" #include "common/status.h" @@ -54,22 +55,32 @@ DEFINE_string(listen_ip, "", DEFINE_validator(listen_ip, Helper::NonEmptyString); DEFINE_uint32(listen_port, 9300, "Port to listen on for this cache group node"); + DEFINE_uint32(group_weight, 100, "Weight of this cache group node, used for consistent hashing"); DEFINE_uint32(max_range_size_kb, 128, - "Retrive the whole block if length of range request is larger " + "Retrieve the whole block if length of range request is larger " "than this value"); +DEFINE_bool(retrieve_storage_lock, true, "Lock when retrieve from storage"); + +DEFINE_uint32(retrieve_storage_lock_timeout_ms, 10000, + "Timeout of retrieve from storage lock"); +DEFINE_validator(retrieve_storage_lock_timeout_ms, brpc::PassValidate); + static const std::string kModule = "cachenode"; CacheGroupNodeImpl::CacheGroupNodeImpl() : running_(false), mds_client_(BuildSharedMDSClient()), - member_(std::make_shared(mds_client_)), + member_(std::make_shared(mds_client_)), heartbeat_( - std::make_unique(member_, mds_client_)), - storage_pool_(std::make_shared(mds_client_)) {} + std::make_unique(member_, mds_client_)), + storage_pool_(std::make_shared(mds_client_)), + task_tracker_(std::make_unique()), + metric_cache_hit_count_("dingofs_cache_hit_count"), + metric_cache_miss_count_("dingofs_cache_miss_count") {} Status CacheGroupNodeImpl::Start() { CHECK_NOTNULL(mds_client_); @@ -108,13 +119,6 @@ Status CacheGroupNodeImpl::Start() { return status; } - async_cacher_ = std::make_unique(block_cache_); - status = async_cacher_->Start(); - if (!status.ok()) { - LOG(ERROR) << "Start async cacher failed: " << status.ToString(); - return status; - } - heartbeat_->Start(); running_ = true; @@ -140,12 +144,6 @@ Status CacheGroupNodeImpl::Shutdown() { return status; } - status = async_cacher_->Shutdown(); - if (!status.ok()) { - LOG(ERROR) << "Shutdown async cacher failed: " << status.ToString(); - return status; - } - status = block_cache_->Shutdown(); if (!status.ok()) { LOG(ERROR) << "Shutdown block cache failed: " << status.ToString(); @@ -167,10 +165,6 @@ Status CacheGroupNodeImpl::StartBlockCache() { return block_cache_->Start(); } -bool CacheGroupNodeImpl::IsRunning() { - return running_.load(std::memory_order_relaxed); -} - Status CacheGroupNodeImpl::Put(ContextSPtr ctx, const BlockKey& key, const Block& block, PutOption option) { if (!IsRunning()) { @@ -202,51 +196,13 @@ Status CacheGroupNodeImpl::Range(ContextSPtr ctx, const BlockKey& key, key.Filename(), offset, length); StepTimerGuard guard(timer); - status = RangeCachedBlock(ctx, timer, key, offset, length, buffer, option); - if (status.ok()) { - // do nothing - } else if (status.IsNotFound()) { - status = RangeStorage(ctx, timer, key, offset, length, buffer, option); + status = RetrieveCache(ctx, timer, key, offset, length, buffer, option); + if (status.IsNotFound()) { + status = RetrieveStorage(ctx, timer, key, offset, length, buffer, option); } return status; } -Status CacheGroupNodeImpl::Cache(ContextSPtr ctx, const BlockKey& key, - const Block& block, CacheOption option) { - if (!IsRunning()) { - return Status::Internal("cache group node is not running"); - } - - Status status; - StepTimer timer; - TraceLogGuard log(ctx, status, timer, kModule, "cache(%s,%zu)", - key.Filename(), block.size); - StepTimerGuard guard(timer); - - NEXT_STEP("cache"); - status = block_cache_->Cache(ctx, key, block, option); - - return status; -} - -Status CacheGroupNodeImpl::Prefetch(ContextSPtr ctx, const BlockKey& key, - size_t length, PrefetchOption option) { - if (!IsRunning()) { - return Status::Internal("cache group node is not running"); - } - - Status status; - StepTimer timer; - TraceLogGuard log(ctx, status, timer, kModule, "prefetch(%s,%zu)", - key.Filename(), length); - StepTimerGuard guard(timer); - - NEXT_STEP("local_prefetch"); - status = block_cache_->Prefetch(ctx, key, length, option); - - return status; -} - void CacheGroupNodeImpl::AsyncCache(ContextSPtr ctx, const BlockKey& key, const Block& block, AsyncCallback callback, CacheOption option) { @@ -289,26 +245,26 @@ void CacheGroupNodeImpl::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key, block_cache_->AsyncPrefetch(ctx, key, length, cb, option); } -Status CacheGroupNodeImpl::RangeCachedBlock(ContextSPtr ctx, StepTimer& timer, - const BlockKey& key, off_t offset, - size_t length, IOBuffer* buffer, - RangeOption option) { +Status CacheGroupNodeImpl::RetrieveCache(ContextSPtr ctx, StepTimer& timer, + const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, + RangeOption option) { NEXT_STEP("local_range"); option.retrive = false; auto status = block_cache_->Range(ctx, key, offset, length, buffer, option); if (status.ok()) { - AddCacheHitCount(1); + metric_cache_hit_count_ << 1; ctx->SetCacheHit(true); } else { - AddCacheMissCount(1); + metric_cache_miss_count_ << 1; } return status; } -Status CacheGroupNodeImpl::RangeStorage(ContextSPtr ctx, StepTimer& timer, - const BlockKey& key, off_t offset, - size_t length, IOBuffer* buffer, - RangeOption option) { +Status CacheGroupNodeImpl::RetrieveStorage(ContextSPtr ctx, StepTimer& timer, + const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, + RangeOption option) { NEXT_STEP("get_storage") StorageSPtr storage; auto status = storage_pool_->GetStorage(key.fs_id, storage); @@ -316,40 +272,120 @@ Status CacheGroupNodeImpl::RangeStorage(ContextSPtr ctx, StepTimer& timer, return status; } - // Retrive range of block: unknown block size or unreach max_range_size - auto block_size = option.block_size; - if (block_size == 0 || length <= FLAGS_max_range_size_kb * kKiB) { - NEXT_STEP("s3_range") - status = storage->Download(ctx, key, offset, length, buffer); - if (status.ok() && block_size > 0) { - block_cache_->AsyncPrefetch(ctx, key, block_size, [](Status status) {}); + // Retrieve range of block: unknown block size or unreach max_range_size + auto block_whole_length = option.block_size; + if (block_whole_length == 0 || length <= FLAGS_max_range_size_kb * kKiB) { + return RetrievePartBlock(ctx, timer, storage, key, offset, length, + block_whole_length, buffer); + } + + // Retrieve the whole block + IOBuffer block; + status = + RetrieveWholeBlock(ctx, timer, storage, key, block_whole_length, &block); + if (status.ok()) { + block.AppendTo(buffer, length, offset); + } + return status; +} + +Status CacheGroupNodeImpl::RetrievePartBlock( + ContextSPtr ctx, StepTimer& timer, StorageSPtr storage, const BlockKey& key, + off_t offset, size_t length, size_t block_whole_length, IOBuffer* buffer) { + NEXT_STEP("s3_range") + auto status = storage->Range(ctx, key, offset, length, buffer); + if (status.ok() && block_whole_length > 0) { + NEXT_STEP("async_prefetch") + block_cache_->AsyncPrefetch(ctx, key, block_whole_length, + [](Status status) {}); + } + return status; +} + +Status CacheGroupNodeImpl::RetrieveWholeBlock(ContextSPtr ctx, StepTimer& timer, + StorageSPtr storage, + const BlockKey& key, + size_t length, IOBuffer* buffer) { + if (!FLAGS_retrieve_storage_lock) { + NEXT_STEP("s3_get") + return storage->Range(ctx, key, 0, length, buffer); + } + + DownloadTaskSPtr task; + bool created = + task_tracker_->GetOrCreateTask(ctx, storage, key, length, buffer, task); + if (created) { + NEXT_STEP("s3_get") + auto status = task->Run(true); + if (!status.ok()) { + task_tracker_->RemoveTask(key); + } else { + block_cache_->AsyncCache( + task->ctx, task->key, Block(*task->buffer), [&](Status status) { + auto ctx = task->ctx; + if (!status.ok()) { + LOG_CTX(ERROR) + << "Async cache failed: key = " << task->key.Filename() + << ", status = " << status.ToString(); + } + task_tracker_->RemoveTask(task->key); + }); } + return status; } - // Retrive the whole block + return WaitTask(timer, task); +} + +Status CacheGroupNodeImpl::RunTask(StepTimer& timer, DownloadTaskSPtr task) { NEXT_STEP("s3_get") - IOBuffer block; - status = storage->Download(ctx, key, 0, block_size, &block); + auto status = task->Run(true); if (!status.ok()) { + task_tracker_->RemoveTask(task->key); return status; } NEXT_STEP("async_cache") - async_cacher_->AsyncCache(ctx, key, block); - - butil::IOBuf piecs; - block.IOBuf().append_to(&piecs, length, offset); - *buffer = IOBuffer(piecs); - return status; + block_cache_->AsyncCache( + task->ctx, task->key, Block(*task->buffer), [task](Status status) { + auto ctx = task->ctx; + if (!status.ok()) { + LOG_CTX(ERROR) << "Async cache failed: key = " << task->key.Filename() + << ", status = " << status.ToString(); + } + + // task_tracker->RemoveTask(task->key); + }); + return Status::OK(); } -void CacheGroupNodeImpl::AddCacheHitCount(int64_t count) { - CacheGroupNodeMetric::GetInstance().cache_hit_count << count; +Status CacheGroupNodeImpl::WaitTask(DownloadTaskSPtr task) { + bool finished = task->Wait(FLAGS_retrieve_storage_lock_timeout_ms); + if (finished) { + auto status = task->status; + if (status.ok() || status.IsNotFound()) { + return status; + } + } + + // not finished or finished with other status, so will re-run the task + auto status = task->Run(false); + if (status.ok()) { + AsyncCache(task); + } + return status; } -void CacheGroupNodeImpl::AddCacheMissCount(int64_t count) { - CacheGroupNodeMetric::GetInstance().cache_miss_count << count; +void CacheGroupNodeImpl::AsyncCache(DownloadTaskSPtr task) { + block_cache_->AsyncCache( + task->ctx, task->key, Block(*task->buffer), [task](Status status) { + auto ctx = task->ctx; + if (!status.ok()) { + LOG_CTX(ERROR) << "Async cache failed: key = " << task->key.Filename() + << ", status = " << status.ToString(); + } + }); } } // namespace cache diff --git a/src/cache/cachegroup/cache_group_node.h b/src/cache/cachegroup/cache_group_node.h index 5bc493b9c..e53ac3bb9 100644 --- a/src/cache/cachegroup/cache_group_node.h +++ b/src/cache/cachegroup/cache_group_node.h @@ -24,14 +24,15 @@ #define DINGOFS_SRC_CACHE_CACHEGROUP_CACHE_GROUP_NODE_H_ #include "cache/blockcache/block_cache.h" -#include "cache/blockcache/disk_cache.h" -#include "cache/cachegroup/async_cacher.h" +#include "cache/blockcache/cache_store.h" #include "cache/cachegroup/cache_group_node_heartbeat.h" #include "cache/cachegroup/cache_group_node_member.h" +#include "cache/cachegroup/task_tracker.h" #include "cache/common/mds_client.h" #include "cache/storage/storage_pool.h" #include "cache/utils/context.h" #include "cache/utils/step_timer.h" +#include "common/io_buffer.h" namespace dingofs { namespace cache { @@ -48,11 +49,6 @@ class CacheGroupNode { virtual Status Range(ContextSPtr ctx, const BlockKey& key, off_t offset, size_t length, IOBuffer* buffer, RangeOption option = RangeOption()) = 0; - virtual Status Cache(ContextSPtr ctx, const BlockKey& key, const Block& block, - CacheOption option = CacheOption()) = 0; - virtual Status Prefetch(ContextSPtr ctx, const BlockKey& key, size_t length, - PrefetchOption option = PrefetchOption()) = 0; - virtual void AsyncCache(ContextSPtr ctx, const BlockKey& key, const Block& block, AsyncCallback callback, CacheOption option = CacheOption()) = 0; @@ -75,11 +71,6 @@ class CacheGroupNodeImpl final : public CacheGroupNode { Status Range(ContextSPtr ctx, const BlockKey& key, off_t offset, size_t length, IOBuffer* buffer, RangeOption option = RangeOption()) override; - Status Cache(ContextSPtr ctx, const BlockKey& key, const Block& block, - CacheOption option = CacheOption()) override; - Status Prefetch(ContextSPtr ctx, const BlockKey& key, size_t length, - PrefetchOption option = PrefetchOption()) override; - void AsyncCache(ContextSPtr ctx, const BlockKey& key, const Block& block, AsyncCallback callback, CacheOption option = CacheOption()) override; @@ -88,28 +79,39 @@ class CacheGroupNodeImpl final : public CacheGroupNode { PrefetchOption option = PrefetchOption()) override; private: - Status StartBlockCache(); + bool IsRunning() const { return running_.load(std::memory_order_relaxed); } - bool IsRunning(); - - Status RangeCachedBlock(ContextSPtr ctx, StepTimer& timer, - const BlockKey& key, off_t offset, size_t length, - IOBuffer* buffer, RangeOption option); - Status RangeStorage(ContextSPtr ctx, StepTimer& timer, const BlockKey& key, - off_t offset, size_t length, IOBuffer* buffer, - RangeOption option); + Status StartBlockCache(); - void AddCacheHitCount(int64_t count); - void AddCacheMissCount(int64_t count); + Status RetrieveCache(ContextSPtr ctx, StepTimer& timer, const BlockKey& key, + off_t offset, size_t length, IOBuffer* buffer, + RangeOption option); + Status RetrieveStorage(ContextSPtr ctx, StepTimer& timer, const BlockKey& key, + off_t offset, size_t length, IOBuffer* buffer, + RangeOption option); + Status RetrievePartBlock(ContextSPtr ctx, StepTimer& timer, + StorageSPtr storage, const BlockKey& key, + off_t offset, size_t length, + size_t block_whole_length, IOBuffer* buffer); + Status RetrieveWholeBlock(ContextSPtr ctx, StepTimer& timer, + StorageSPtr storage, const BlockKey& key, + size_t length, IOBuffer* buffer); + + Status RunTask(StepTimer& timer, DownloadTaskSPtr task); + Status WaitTask(StepTimer& timer, DownloadTaskSPtr task); + void AsyncCache(DownloadTaskSPtr task); private: std::atomic running_; MDSClientSPtr mds_client_; CacheGroupNodeMemberSPtr member_; BlockCacheSPtr block_cache_; - AsyncCacherUPtr async_cacher_; CacheGroupNodeHeartbeatUPtr heartbeat_; StoragePoolSPtr storage_pool_; + TaskTrackerUPtr task_tracker_; + + bvar::Adder metric_cache_hit_count_; + bvar::Adder metric_cache_miss_count_; }; } // namespace cache diff --git a/src/cache/cachegroup/cache_group_node_heartbeat.cpp b/src/cache/cachegroup/cache_group_node_heartbeat.cpp index 1723d0959..c55a32508 100644 --- a/src/cache/cachegroup/cache_group_node_heartbeat.cpp +++ b/src/cache/cachegroup/cache_group_node_heartbeat.cpp @@ -31,14 +31,14 @@ namespace cache { DEFINE_uint32(send_heartbeat_interval_s, 3, "Interval to send heartbeat to MDS in seconds"); -CacheGroupNodeHeartbeatImpl::CacheGroupNodeHeartbeatImpl( +CacheGroupNodeHeartbeat::CacheGroupNodeHeartbeat( CacheGroupNodeMemberSPtr member, MDSClientSPtr mds_client) : running_(false), member_(member), mds_client_(mds_client), executor_(std::make_unique()) {} -void CacheGroupNodeHeartbeatImpl::Start() { +void CacheGroupNodeHeartbeat::Start() { CHECK_NOTNULL(member_); CHECK_NOTNULL(mds_client_); CHECK_NOTNULL(executor_); @@ -60,7 +60,7 @@ void CacheGroupNodeHeartbeatImpl::Start() { CHECK_RUNNING("Cache group node heartbeat"); } -void CacheGroupNodeHeartbeatImpl::Shutdown() { +void CacheGroupNodeHeartbeat::Shutdown() { if (!running_.exchange(false)) { return; } @@ -74,7 +74,7 @@ void CacheGroupNodeHeartbeatImpl::Shutdown() { CHECK_DOWN("Cache group node heartbeat"); } -void CacheGroupNodeHeartbeatImpl::SendHeartbeat() { +void CacheGroupNodeHeartbeat::SendHeartbeat() { std::string group_name = member_->GetGroupName(); auto status = mds_client_->Heartbeat( member_->GetMemberId(), member_->GetListenIP(), member_->GetListenPort()); diff --git a/src/cache/cachegroup/cache_group_node_heartbeat.h b/src/cache/cachegroup/cache_group_node_heartbeat.h index 7f9d9b441..c74f074dc 100644 --- a/src/cache/cachegroup/cache_group_node_heartbeat.h +++ b/src/cache/cachegroup/cache_group_node_heartbeat.h @@ -34,21 +34,11 @@ namespace cache { class CacheGroupNodeHeartbeat { public: - virtual ~CacheGroupNodeHeartbeat() = default; + CacheGroupNodeHeartbeat(CacheGroupNodeMemberSPtr member, + MDSClientSPtr mds_client); - virtual void Start() = 0; - virtual void Shutdown() = 0; -}; - -using CacheGroupNodeHeartbeatUPtr = std::unique_ptr; - -class CacheGroupNodeHeartbeatImpl final : public CacheGroupNodeHeartbeat { - public: - CacheGroupNodeHeartbeatImpl(CacheGroupNodeMemberSPtr member, - MDSClientSPtr mds_client); - - void Start() override; - void Shutdown() override; + void Start(); + void Shutdown(); private: void SendHeartbeat(); @@ -60,6 +50,8 @@ class CacheGroupNodeHeartbeatImpl final : public CacheGroupNodeHeartbeat { std::unique_ptr executor_; }; +using CacheGroupNodeHeartbeatUPtr = std::unique_ptr; + } // namespace cache } // namespace dingofs diff --git a/src/cache/cachegroup/cache_group_node_member.cpp b/src/cache/cachegroup/cache_group_node_member.cpp index 9366790d2..245371f3d 100644 --- a/src/cache/cachegroup/cache_group_node_member.cpp +++ b/src/cache/cachegroup/cache_group_node_member.cpp @@ -27,10 +27,10 @@ namespace dingofs { namespace cache { -CacheGroupNodeMemberImpl::CacheGroupNodeMemberImpl(MDSClientSPtr mds_client) +CacheGroupNodeMember::CacheGroupNodeMember(MDSClientSPtr mds_client) : mds_client_(mds_client) {} -Status CacheGroupNodeMemberImpl::JoinGroup() { +Status CacheGroupNodeMember::JoinGroup() { CHECK_NOTNULL(mds_client_); auto status = mds_client_->JoinCacheGroup(FLAGS_id, FLAGS_listen_ip, @@ -53,7 +53,7 @@ Status CacheGroupNodeMemberImpl::JoinGroup() { return Status::OK(); } -Status CacheGroupNodeMemberImpl::LeaveGroup() { +Status CacheGroupNodeMember::LeaveGroup() { CHECK_NOTNULL(mds_client_); auto status = mds_client_->LeaveCacheGroup( @@ -71,19 +71,5 @@ Status CacheGroupNodeMemberImpl::LeaveGroup() { return Status::OK(); } -std::string CacheGroupNodeMemberImpl::GetGroupName() const { - return FLAGS_group_name; -} - -std::string CacheGroupNodeMemberImpl::GetListenIP() const { - return FLAGS_listen_ip; -} - -uint32_t CacheGroupNodeMemberImpl::GetListenPort() const { - return FLAGS_listen_port; -} - -std::string CacheGroupNodeMemberImpl::GetMemberId() const { return member_id_; } - } // namespace cache } // namespace dingofs diff --git a/src/cache/cachegroup/cache_group_node_member.h b/src/cache/cachegroup/cache_group_node_member.h index 6e3303148..9a84795c8 100644 --- a/src/cache/cachegroup/cache_group_node_member.h +++ b/src/cache/cachegroup/cache_group_node_member.h @@ -25,44 +25,30 @@ #include "cache/common/mds_client.h" #include "common/status.h" +#include "options/cache/option.h" namespace dingofs { namespace cache { class CacheGroupNodeMember { public: - virtual ~CacheGroupNodeMember() = default; + explicit CacheGroupNodeMember(MDSClientSPtr mds_client); - virtual Status JoinGroup() = 0; - virtual Status LeaveGroup() = 0; + Status JoinGroup(); + Status LeaveGroup(); - virtual std::string GetGroupName() const = 0; - virtual std::string GetListenIP() const = 0; - virtual uint32_t GetListenPort() const = 0; - virtual std::string GetMemberId() const = 0; -}; - -using CacheGroupNodeMemberSPtr = std::shared_ptr; - -class CacheGroupNodeMemberImpl final : public CacheGroupNodeMember { - public: - explicit CacheGroupNodeMemberImpl(MDSClientSPtr mds_client); - - ~CacheGroupNodeMemberImpl() override = default; - - Status JoinGroup() override; - Status LeaveGroup() override; - - std::string GetGroupName() const override; - std::string GetListenIP() const override; - uint32_t GetListenPort() const override; - std::string GetMemberId() const override; + std::string GetGroupName() const { return FLAGS_group_name; } + std::string GetListenIP() const { return FLAGS_listen_ip; } + uint32_t GetListenPort() const { return FLAGS_listen_port; } + std::string GetMemberId() const { return member_id_; } private: std::string member_id_; MDSClientSPtr mds_client_; }; +using CacheGroupNodeMemberSPtr = std::shared_ptr; + } // namespace cache } // namespace dingofs diff --git a/src/cache/cachegroup/task_tracker.cpp b/src/cache/cachegroup/task_tracker.cpp new file mode 100644 index 000000000..5fcb6a0d9 --- /dev/null +++ b/src/cache/cachegroup/task_tracker.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: DingoFS + * Created Date: 2025-09-03 + * Author: Jingli Chen (Wine93) + */ + +#include "cache/cachegroup/task_tracker.h" + +#include +#include + +#include + +#include "cache/blockcache/cache_store.h" +#include "cache/storage/storage.h" + +namespace dingofs { +namespace cache { + +DownloadTask::DownloadTask(ContextSPtr ctx, StorageSPtr storage, + const BlockKey& key, size_t length, IOBuffer* buffer) + : ctx(ctx), storage(storage), key(key), length(length), buffer(buffer) {} + +Status DownloadTask::Run(bool notify) { + status = storage->Range(ctx, key, 0, length, buffer); + if (notify) { + std::lock_guard lock(mutex); + finished = true; + cond.notify_all(); + } + return status; +} + +bool DownloadTask::Wait(long timeout_ms) { + std::unique_lock lock(mutex); + if (!finished) { + cond.wait_for(lock, timeout_ms * 1000); + } + return finished; +} + +bool TaskTracker::GetOrCreateTask(ContextSPtr ctx, StorageSPtr storage, + const BlockKey& key, size_t length, + IOBuffer* buffer, DownloadTaskSPtr& task) { + std::lock_guard lock(mutex_); + auto iter = tasks_.find(key.Filename()); + if (iter != tasks_.end()) { + task = iter->second; + return false; + } + + tasks_[key.Filename()] = + std::make_shared(ctx, storage, key, length, buffer); + return true; +} + +void TaskTracker::RemoveTask(const BlockKey& key) { + std::lock_guard lock(mutex_); + tasks_.erase(key.Filename()); +} + +} // namespace cache +} // namespace dingofs diff --git a/src/cache/cachegroup/task_tracker.h b/src/cache/cachegroup/task_tracker.h new file mode 100644 index 000000000..cae163fa2 --- /dev/null +++ b/src/cache/cachegroup/task_tracker.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: DingoFS + * Created Date: 2025-09-03 + * Author: Jingli Chen (Wine93) + */ + +#ifndef DINGOFS_SRC_CACHE_CACHEGROUP_TASK_TRACKER_H_ +#define DINGOFS_SRC_CACHE_CACHEGROUP_TASK_TRACKER_H_ + +#include +#include + +#include +#include +#include + +#include "cache/blockcache/cache_store.h" +#include "cache/storage/storage.h" +#include "cache/utils/context.h" +#include "common/io_buffer.h" +#include "common/status.h" + +namespace dingofs { +namespace cache { + +struct DownloadTask { + DownloadTask(ContextSPtr ctx, StorageSPtr storage, const BlockKey& key, + size_t length, IOBuffer* buffer); + + Status Run(bool notify); + bool Wait(long timeout_ms); + + ContextSPtr ctx; + StorageSPtr storage; + BlockKey key; + size_t length; + IOBuffer* buffer; + Status status; + + bool finished{false}; + bthread::Mutex mutex; + bthread::ConditionVariable cond; +}; + +using DownloadTaskSPtr = std::shared_ptr; + +class TaskTracker { + public: + TaskTracker() = default; + + // return true if new task created + bool GetOrCreateTask(ContextSPtr ctx, StorageSPtr storage, + const BlockKey& key, size_t length, IOBuffer* buffer, + DownloadTaskSPtr& task); + void RemoveTask(const BlockKey& key); + + private: + bthread::Mutex mutex_; + std::unordered_map tasks_; +}; + +using TaskTrackerUPtr = std::unique_ptr; + +}; // namespace cache +}; // namespace dingofs + +#endif // DINGOFS_SRC_CACHE_CACHEGROUP_TASK_TRACKER_H_ diff --git a/src/cache/dingo_cache.cpp b/src/cache/dingo_cache.cpp index b435f6568..a2f4af77b 100644 --- a/src/cache/dingo_cache.cpp +++ b/src/cache/dingo_cache.cpp @@ -36,7 +36,7 @@ namespace cache { FlagsInfo DingoCache::flags; -int DingoCache::HandleFlags(int argc, char** argv) { +int DingoCache::ParseFlags(int argc, char** argv) { flags = FlagsHelper::Parse(argc, argv); if (flags.show_help) { std::cout << FlagsHelper::GenHelp(flags) << "\n"; @@ -97,7 +97,7 @@ int DingoCache::StartServer() { } int DingoCache::Run(int argc, char** argv) { - int rc = HandleFlags(argc, argv); + int rc = ParseFlags(argc, argv); if (rc != 0) { return rc; } diff --git a/src/cache/dingo_cache.h b/src/cache/dingo_cache.h index 124dfc7b8..44ec684e5 100644 --- a/src/cache/dingo_cache.h +++ b/src/cache/dingo_cache.h @@ -33,7 +33,7 @@ class DingoCache { static int Run(int argc, char** argv); private: - static int HandleFlags(int argc, char** argv); + static int ParseFlags(int argc, char** argv); static void InitGlog(); static void LogFlags(); diff --git a/src/cache/metric/cache_group_node_metric.h b/src/cache/metric/cache_group_node_metric.h deleted file mode 100644 index d2df9774a..000000000 --- a/src/cache/metric/cache_group_node_metric.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: DingoFS - * Created Date: 2025-02-17 - * Author: Jingli Chen (Wine93) - */ - -#ifndef DINGOFS_SRC_CACHE_METRICS_CACHE_GROUP_NODE_METRIC_H_ -#define DINGOFS_SRC_CACHE_METRICS_CACHE_GROUP_NODE_METRIC_H_ - -#include - -namespace dingofs { -namespace cache { - -struct CacheGroupNodeMetric { - static CacheGroupNodeMetric& GetInstance() { - static CacheGroupNodeMetric instance; - return instance; - } - - bvar::Adder cache_hit_count{"dingofs_cache_hit_count"}; - bvar::Adder cache_miss_count{"dingofs_cache_miss_count"}; -}; - -} // namespace cache -} // namespace dingofs - -#endif // DINGOFS_SRC_CACHE_METRICS_CACHE_GROUP_NODE_METRIC_H_ diff --git a/src/cache/remotecache/remote_block_cache.cpp b/src/cache/remotecache/remote_block_cache.cpp index c75f12e1c..58f43252f 100644 --- a/src/cache/remotecache/remote_block_cache.cpp +++ b/src/cache/remotecache/remote_block_cache.cpp @@ -162,13 +162,13 @@ Status RemoteBlockCacheImpl::Put(ContextSPtr ctx, const BlockKey& key, if (!option.writeback) { NEXT_STEP("s3_put"); - status = storage_->Upload(ctx, key, block); + status = storage_->Put(ctx, key, block); } else { NEXT_STEP("remote_put"); status = remote_node_->Put(ctx, key, block); if (!status.ok()) { NEXT_STEP("s3_put"); - status = storage_->Upload(ctx, key, block); + status = storage_->Put(ctx, key, block); } } @@ -230,7 +230,7 @@ Status RemoteBlockCacheImpl::Range(ContextSPtr ctx, const BlockKey& key, } NEXT_STEP("s3_range"); - status = storage_->Download(ctx, key, offset, length, buffer); + status = storage_->Range(ctx, key, offset, length, buffer); if (!status.ok()) { GENERIC_LOG_DOWNLOAD_ERROR(); } diff --git a/src/cache/storage/base_filesystem.cpp b/src/cache/storage/base_filesystem.cpp index ab2969db5..d6c4e65f4 100644 --- a/src/cache/storage/base_filesystem.cpp +++ b/src/cache/storage/base_filesystem.cpp @@ -180,5 +180,45 @@ Status BaseFileSystem::CheckStatus(Status status) { return status; } +Status FSUtil::MkDirs(const std::string& dir) { + return BaseFileSystem::GetInstance().MkDirs(dir); +} + +Status FSUtil::Walk(const std::string& dir, WalkFunc walk_func) { + return BaseFileSystem::GetInstance().Walk(dir, walk_func); +} + +Status FSUtil::WriteFile(const std::string& filepath, + const std::string& content) { + int rc = butil::WriteFile(butil::FilePath(filepath), content.data(), + content.size()); + if (rc == static_cast(content.size())) { + return Status::OK(); + } + return Status::IoError("write file failed"); +} + +Status FSUtil::ReadFile(const std::string& filepath, std::string* content) { + if (!FileExists(filepath)) { + return Status::NotFound("file not found"); + } else if (butil::ReadFileToString(butil::FilePath(filepath), content), + 4 * kMiB) { + return Status::OK(); + } + return Status::IoError("read file failed"); +} + +Status FSUtil::RemoveFile(const std::string& filepath) { + return BaseFileSystem::GetInstance().RemoveFile(filepath); +} + +bool FSUtil::FileExists(const std::string& filepath) { + return BaseFileSystem::GetInstance().FileExists(filepath); +} + +Status FSUtil::StatFS(const std::string& dir, FSStat* stat) { + return BaseFileSystem::GetInstance().StatFS(dir, stat); +} + } // namespace cache } // namespace dingofs diff --git a/src/cache/storage/base_filesystem.h b/src/cache/storage/base_filesystem.h index 563c60a8d..e2d3d107c 100644 --- a/src/cache/storage/base_filesystem.h +++ b/src/cache/storage/base_filesystem.h @@ -75,47 +75,22 @@ class BaseFileSystem : public FileSystem { CheckStatusFunc check_status_func_; }; -class FSHelper { +class FSUtil { public: - static Status MkDirs(const std::string& dir) { - return BaseFileSystem::GetInstance().MkDirs(dir); - } + static Status MkDirs(const std::string& dir); - static Status Walk(const std::string& dir, WalkFunc walk_func) { - return BaseFileSystem::GetInstance().Walk(dir, walk_func); - } + static Status Walk(const std::string& dir, WalkFunc walk_func); static Status WriteFile(const std::string& filepath, - const std::string& content) { - int rc = butil::WriteFile(butil::FilePath(filepath), content.data(), - content.size()); - if (rc == static_cast(content.size())) { - return Status::OK(); - } - return Status::IoError("write file failed"); - } - - static Status ReadFile(const std::string& filepath, std::string* content) { - if (!FileExists(filepath)) { - return Status::NotFound("file not found"); - } else if (butil::ReadFileToString(butil::FilePath(filepath), content), - 4 * kMiB) { - return Status::OK(); - } - return Status::IoError("read file failed"); - } - - static Status RemoveFile(const std::string& filepath) { - return BaseFileSystem::GetInstance().RemoveFile(filepath); - } - - static bool FileExists(const std::string& filepath) { - return BaseFileSystem::GetInstance().FileExists(filepath); - } - - static Status StatFS(const std::string& dir, FSStat* stat) { - return BaseFileSystem::GetInstance().StatFS(dir, stat); - } + const std::string& content); + + static Status ReadFile(const std::string& filepath, std::string* content); + + static Status RemoveFile(const std::string& filepath); + + static bool FileExists(const std::string& filepath); + + static Status StatFS(const std::string& dir, FSStat* stat); }; } // namespace cache diff --git a/src/cache/storage/storage.h b/src/cache/storage/storage.h index f66b9f4fa..13f4c90e4 100644 --- a/src/cache/storage/storage.h +++ b/src/cache/storage/storage.h @@ -30,29 +30,28 @@ namespace dingofs { namespace cache { -struct UploadOption { - using AsyncCacheFunc = +class Storage { + public: + using AsyncCacheFn = std::function; - UploadOption() : async_cache_func(nullptr) {} - AsyncCacheFunc async_cache_func; -}; + struct PutOption { + PutOption() : async_cache_fn(nullptr) {} + AsyncCacheFn async_cache_fn; + }; -struct DownloadOption {}; + struct RangeOption {}; -class Storage { - public: virtual ~Storage() = default; virtual Status Start() = 0; virtual Status Shutdown() = 0; - virtual Status Upload(ContextSPtr ctx, const BlockKey& key, - const Block& block, - UploadOption option = UploadOption()) = 0; - virtual Status Download(ContextSPtr ctx, const BlockKey& key, off_t offset, - size_t length, IOBuffer* buffer, - DownloadOption option = DownloadOption()) = 0; + virtual Status Put(ContextSPtr ctx, const BlockKey& key, const Block& block, + PutOption option = PutOption()) = 0; + virtual Status Range(ContextSPtr ctx, const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, + RangeOption option = RangeOption()) = 0; }; using StorageUPtr = std::unique_ptr; diff --git a/src/cache/storage/storage_closure.cpp b/src/cache/storage/storage_closure.cpp index acf78cf07..0a71782b1 100644 --- a/src/cache/storage/storage_closure.cpp +++ b/src/cache/storage/storage_closure.cpp @@ -31,19 +31,19 @@ namespace dingofs { namespace cache { -UploadClosure::UploadClosure(ContextSPtr ctx, const BlockKey& key, - const Block& block, UploadOption option, - blockaccess::BlockAccesser* block_accesser) +PutClosure::PutClosure(ContextSPtr ctx, const BlockKey& key, const Block& block, + Storage::PutOption option, + blockaccess::BlockAccesser* block_accesser) : ctx_(ctx), key_(key), block_(block), option_(option), block_accesser_(block_accesser) {} -void UploadClosure::Run() { +void PutClosure::Run() { auto block = CopyBlock(); // Copy data to continuous memory - if (option_.async_cache_func) { - option_.async_cache_func(key_, block); + if (option_.async_cache_fn) { + option_.async_cache_fn(key_, block); } auto retry_cb = [this, block](Status s) { @@ -58,24 +58,24 @@ void UploadClosure::Run() { retry_cb); } -void UploadClosure::OnComplete(Status s) { +void PutClosure::OnComplete(Status s) { StorageClosure::status() = s; StorageClosure::Run(); } -Block UploadClosure::CopyBlock() { +Block PutClosure::CopyBlock() { char* data = new char[block_.size]; block_.buffer.CopyTo(data); - butil::IOBuf iobuf; - iobuf.append_user_data(data, block_.size, Helper::DeleteBuffer); - return Block(IOBuffer(iobuf)); + IOBuffer buffer; + buffer.AppendUserData(data, block_.size, Helper::DeleteBuffer); + return Block(buffer); } -DownloadClosure::DownloadClosure(ContextSPtr ctx, const BlockKey& key, - off_t offset, size_t length, IOBuffer* buffer, - DownloadOption option, - blockaccess::BlockAccesser* block_accesser) +RangeClosure::RangeClosure(ContextSPtr ctx, const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, + Storage::RangeOption option, + blockaccess::BlockAccesser* block_accesser) : ctx_(ctx), key_(key), offset_(offset), @@ -84,11 +84,9 @@ DownloadClosure::DownloadClosure(ContextSPtr ctx, const BlockKey& key, option_(option), block_accesser_(block_accesser) {} -void DownloadClosure::Run() { +void RangeClosure::Run() { char* data = new char[length_]; - butil::IOBuf iobuf; - iobuf.append_user_data(data, length_, Helper::DeleteBuffer); - *buffer_ = IOBuffer(iobuf); + buffer_->AppendUserData(data, length_, Helper::DeleteBuffer); auto retry_cb = [this](Status s) { this->OnComplete(s); @@ -99,7 +97,7 @@ void DownloadClosure::Run() { buffer_->Fetch1(), retry_cb); } -void DownloadClosure::OnComplete(Status s) { +void RangeClosure::OnComplete(Status s) { StorageClosure::status() = s; StorageClosure::Run(); } diff --git a/src/cache/storage/storage_closure.h b/src/cache/storage/storage_closure.h index d28323717..1458c6158 100644 --- a/src/cache/storage/storage_closure.h +++ b/src/cache/storage/storage_closure.h @@ -59,11 +59,11 @@ class StorageClosure : public Closure { BthreadConditionVariable cond_; }; -class UploadClosure final : public StorageClosure { +class PutClosure final : public StorageClosure { public: - UploadClosure(ContextSPtr ctx, const BlockKey& key, const Block& block, - UploadOption option, - blockaccess::BlockAccesser* block_accesser); + PutClosure(ContextSPtr ctx, const BlockKey& key, const Block& block, + Storage::PutOption option, + blockaccess::BlockAccesser* block_accesser); void Run() override; @@ -75,15 +75,15 @@ class UploadClosure final : public StorageClosure { ContextSPtr ctx_; BlockKey key_; Block block_; - UploadOption option_; + Storage::PutOption option_; blockaccess::BlockAccesser* block_accesser_; }; -class DownloadClosure final : public StorageClosure { +class RangeClosure final : public StorageClosure { public: - DownloadClosure(ContextSPtr ctx, const BlockKey& key, off_t offset, - size_t length, IOBuffer* buffer, DownloadOption option, - blockaccess::BlockAccesser* block_accesser); + RangeClosure(ContextSPtr ctx, const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, Storage::RangeOption option, + blockaccess::BlockAccesser* block_accesser); void Run() override; @@ -95,7 +95,7 @@ class DownloadClosure final : public StorageClosure { off_t offset_; size_t length_; IOBuffer* buffer_; - DownloadOption option_; + Storage::RangeOption option_; blockaccess::BlockAccesser* block_accesser_; }; diff --git a/src/cache/storage/storage_impl.cpp b/src/cache/storage/storage_impl.cpp index 5d5eb233a..ac5c95d8b 100644 --- a/src/cache/storage/storage_impl.cpp +++ b/src/cache/storage/storage_impl.cpp @@ -93,8 +93,8 @@ Status StorageImpl::Shutdown() { return Status::OK(); } -Status StorageImpl::Upload(ContextSPtr ctx, const BlockKey& key, - const Block& block, UploadOption option) { +Status StorageImpl::Put(ContextSPtr ctx, const BlockKey& key, + const Block& block, PutOption option) { DCHECK_RUNNING("Storage"); Status status; @@ -104,7 +104,7 @@ Status StorageImpl::Upload(ContextSPtr ctx, const BlockKey& key, StepTimerGuard guard(timer); NEXT_STEP("enqueue"); - auto closure = UploadClosure(ctx, key, block, option, block_accesser_); + auto closure = PutClosure(ctx, key, block, option, block_accesser_); CHECK_EQ(0, bthread::execution_queue_execute(queue_id_, &closure)); NEXT_STEP("s3_put"); @@ -118,9 +118,8 @@ Status StorageImpl::Upload(ContextSPtr ctx, const BlockKey& key, return status; } -Status StorageImpl::Download(ContextSPtr ctx, const BlockKey& key, off_t offset, - size_t length, IOBuffer* buffer, - DownloadOption option) { +Status StorageImpl::Range(ContextSPtr ctx, const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, RangeOption option) { DCHECK_RUNNING("Storage"); Status status; @@ -130,8 +129,8 @@ Status StorageImpl::Download(ContextSPtr ctx, const BlockKey& key, off_t offset, StepTimerGuard guard(timer); NEXT_STEP("enqueue"); - auto closure = DownloadClosure(ctx, key, offset, length, buffer, option, - block_accesser_); + auto closure = + RangeClosure(ctx, key, offset, length, buffer, option, block_accesser_); CHECK_EQ(0, bthread::execution_queue_execute(queue_id_, &closure)); NEXT_STEP("s3_range"); diff --git a/src/cache/storage/storage_impl.h b/src/cache/storage/storage_impl.h index 268962de1..1b6177cc2 100644 --- a/src/cache/storage/storage_impl.h +++ b/src/cache/storage/storage_impl.h @@ -44,11 +44,11 @@ class StorageImpl final : public Storage { Status Start() override; Status Shutdown() override; - Status Upload(ContextSPtr ctx, const BlockKey& key, const Block& block, - UploadOption option = UploadOption()) override; - Status Download(ContextSPtr ctx, const BlockKey& key, off_t offset, - size_t length, IOBuffer* buffer, - DownloadOption option = DownloadOption()) override; + Status Put(ContextSPtr ctx, const BlockKey& key, const Block& block, + PutOption option = PutOption()) override; + Status Range(ContextSPtr ctx, const BlockKey& key, off_t offset, + size_t length, IOBuffer* buffer, + RangeOption option = RangeOption()) override; private: static int HandleClosure(void* meta, diff --git a/src/cache/tiercache/tier_block_cache.cpp b/src/cache/tiercache/tier_block_cache.cpp index 9b979ee0b..71e9245fd 100644 --- a/src/cache/tiercache/tier_block_cache.cpp +++ b/src/cache/tiercache/tier_block_cache.cpp @@ -58,8 +58,9 @@ TierBlockCache::TierBlockCache(StorageSPtr storage) local_block_cache_(std::make_unique(storage_)), remote_block_cache_(std::make_unique(storage_)), joiner_(std::make_unique()), - inflight_tracker_( - std::make_unique(FLAGS_prefetch_max_inflights)) {} + inflight_prefetch_( + std::make_shared(FLAGS_prefetch_max_inflights)), + inflight_cache_(std::make_shared(1024)) {} TierBlockCache::TierBlockCache(blockaccess::BlockAccesser* block_accesser) : TierBlockCache(std::make_shared(block_accesser)) {} @@ -69,7 +70,8 @@ Status TierBlockCache::Start() { CHECK_NOTNULL(local_block_cache_); CHECK_NOTNULL(remote_block_cache_); CHECK_NOTNULL(joiner_); - CHECK_NOTNULL(inflight_tracker_); + CHECK_NOTNULL(inflight_prefetch_); + CHECK_NOTNULL(inflight_cache_); if (running_) { return Status::OK(); @@ -176,11 +178,11 @@ Status TierBlockCache::Put(ContextSPtr ctx, const BlockKey& key, } NEXT_STEP("s3_put"); - UploadOption opt; + Storage::PutOption opt; if (EnableRemoteCache() && FLAGS_fill_group_cache) { - opt.async_cache_func = NewFillGroupCacheCb(ctx); + opt.async_cache_fn = NewFillGroupCacheCb(ctx); } - status = storage_->Upload(ctx, key, block, opt); + status = storage_->Put(ctx, key, block, opt); if (!status.ok()) { GENERIC_LOG_UPLOAD_ERROR(); @@ -217,7 +219,7 @@ Status TierBlockCache::Range(ContextSPtr ctx, const BlockKey& key, off_t offset, remote_block_cache_->Range(ctx, key, offset, length, buffer, option); } else if (option.retrive) { // No remote cache, retrive storage NEXT_STEP("s3_range"); - status = storage_->Download(ctx, key, offset, length, buffer); + status = storage_->Range(ctx, key, offset, length, buffer); } else { status = Status::NotFound("no available cache can be tried"); } @@ -332,13 +334,26 @@ void TierBlockCache::AsyncCache(ContextSPtr ctx, const BlockKey& key, CacheOption option) { CHECK_RUNNING("Tier block cache"); - auto* self = GetSelfPtr(); - auto tid = RunInBthread([self, ctx, key, block, cb, option]() { - Status status = self->Cache(ctx, key, block, option); + // TODO: maybe filter out duplicate task by up-level is better + auto inflight_tracker = inflight_cache_; + auto status = inflight_tracker->Add(key.Filename()); + if (status.IsExist()) { if (cb) { cb(status); } - }); + return; + } + + auto* self = GetSelfPtr(); + auto tid = + RunInBthread([inflight_tracker, self, ctx, key, block, cb, option]() { + Status status = self->Cache(ctx, key, block, option); + if (cb) { + cb(status); + } + + inflight_tracker->Remove(key.Filename()); + }); if (tid != 0) { joiner_->BackgroundJoin(tid); @@ -351,7 +366,8 @@ void TierBlockCache::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key, CHECK_RUNNING("Tier block cache"); // TODO: maybe filter out duplicate task by up-level is better - auto status = inflight_tracker_->Add(key.Filename()); + auto inflight_tracker = inflight_prefetch_; + auto status = inflight_tracker->Add(key.Filename()); if (status.IsExist()) { if (cb) { cb(status); @@ -360,14 +376,15 @@ void TierBlockCache::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key, } auto* self = GetSelfPtr(); - auto tid = RunInBthread([&, self, ctx, key, length, cb, option]() { - Status status = self->Prefetch(ctx, key, length, option); - if (cb) { - cb(status); - } + auto tid = + RunInBthread([inflight_tracker, self, ctx, key, length, cb, option]() { + Status status = self->Prefetch(ctx, key, length, option); + if (cb) { + cb(status); + } - inflight_tracker_->Remove(key.Filename()); - }); + inflight_tracker->Remove(key.Filename()); + }); if (tid != 0) { joiner_->BackgroundJoin(tid); diff --git a/src/cache/tiercache/tier_block_cache.h b/src/cache/tiercache/tier_block_cache.h index 36a2b573f..36865c2b2 100644 --- a/src/cache/tiercache/tier_block_cache.h +++ b/src/cache/tiercache/tier_block_cache.h @@ -77,7 +77,7 @@ class TierBlockCache final : public BlockCache { bool EnableRemoteStage() const; bool EnableRemoteCache() const; - using FillGroupCacheCb = UploadOption::AsyncCacheFunc; + using FillGroupCacheCb = Storage::AsyncCacheFn; FillGroupCacheCb NewFillGroupCacheCb(ContextSPtr ctx); // The behavior of local block cache is same as remote block cache, @@ -89,7 +89,8 @@ class TierBlockCache final : public BlockCache { BlockCacheUPtr local_block_cache_; BlockCacheUPtr remote_block_cache_; BthreadJoinerUPtr joiner_; - InflightTrackerUPtr inflight_tracker_; + InflightTrackerSPtr inflight_cache_; + InflightTrackerSPtr inflight_prefetch_; }; } // namespace cache diff --git a/src/cache/utils/inflight_tracker.h b/src/cache/utils/inflight_tracker.h index fc228c293..14ae088dc 100644 --- a/src/cache/utils/inflight_tracker.h +++ b/src/cache/utils/inflight_tracker.h @@ -48,6 +48,7 @@ class InflightTracker { }; using InflightTrackerUPtr = std::unique_ptr; +using InflightTrackerSPtr = std::shared_ptr; } // namespace cache } // namespace dingofs diff --git a/src/options/client/option.h b/src/options/client/option.h index 3c67d16c0..6935bf4fc 100644 --- a/src/options/client/option.h +++ b/src/options/client/option.h @@ -18,6 +18,7 @@ #define DINGOFS_SRC_OPTIONS_CLIENT_OPTION_H_ #include + #include "blockaccess/accesser_common.h" #include "common/const.h" #include "options/cache/option.h" @@ -179,6 +180,12 @@ static void InitRemoteBlockCacheOption(utils::Configuration* c) { c->GetValue("remote_cache.cache_group", &cache::FLAGS_cache_group); c->GetValue("remote_cache.mds_version", &cache::FLAGS_mds_version); c->GetValue("remote_cache.mds_addrs", &cache::FLAGS_mds_addrs); + c->GetValue("remote_cache.mds_rpc_timeout_ms", + &cache::FLAGS_mdsv2_rpc_timeout_ms); + c->GetValue("remote_cache.mds_rpc_retry_times", + &cache::FLAGS_mdsv2_rpc_retry_times); + c->GetValue("remote_cache.mds_request_retry_times", + &cache::FLAGS_mdsv2_request_retry_times); c->GetValue("remote_cache.load_members_interval_ms", &cache::FLAGS_load_members_interval_ms);