Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions confv2/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ disk_state.check_duration_ms=3000
remote_cache.cache_group=
remote_cache.mds_version=v2
remote_cache.mds_addrs=127.0.0.1:6900
remote_cache.mds_rpc_timeout_ms=3000
remote_cache.mds_rpc_retry_times=1
remote_cache.mds_request_retry_times=3
remote_cache.load_members_interval_ms=1000

remote_cache.fill_group_cache=true
Expand Down
2 changes: 1 addition & 1 deletion src/cache/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ add_subdirectory(blockcache)
add_subdirectory(cachegroup)
add_subdirectory(remotecache)
add_subdirectory(tiercache)
#add_subdirectory(benchmark)
add_subdirectory(benchmark)

file(GLOB CACHE_LIB_SRCS
"dingo_cache.cpp"
Expand Down
20 changes: 15 additions & 5 deletions src/cache/benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@
# limitations under the License.
# Define the BASE_FLAGS and DINGO_DEFAULT_COPTS variables

file(GLOB BENCHMARK_LIB_SRCS
"*.cpp"
SET(BENCHMARK_LIB_SRCS
"benchmarker.cpp"
"collector.cpp"
"factory.cpp"
"option.cpp"
"reporter.cpp"
"worker.cpp"
)

add_executable(cache-bench ${BENCHMARK_LIB_SRCS})
target_link_libraries(cache-bench
cache_lib
add_library(cache_benchmark ${BENCHMARK_LIB_SRCS})
target_link_libraries(cache_benchmark
cache_utils
cache_common
cache_storage
)

add_executable(cache-bench main.cpp)
target_link_libraries(cache-bench cache_benchmark)
53 changes: 42 additions & 11 deletions src/cache/blockcache/block_cache_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "cache/utils/bthread.h"
#include "cache/utils/context.h"
#include "cache/utils/helper.h"
#include "cache/utils/inflight_tracker.h"
#include "cache/utils/step_timer.h"
#include "common/io_buffer.h"
#include "common/status.h"
Expand Down Expand Up @@ -82,7 +83,9 @@ BlockCacheImpl::BlockCacheImpl(StorageSPtr storage)
BlockCacheImpl::BlockCacheImpl(StoragePoolSPtr storage_pool)
: running_(false),
storage_pool_(storage_pool),
joiner_(std::make_unique<BthreadJoiner>()) {
joiner_(std::make_unique<BthreadJoiner>()),
inflight_cache_(std::make_shared<InflightTracker>(1024)),
inflight_prefetch_(std::make_shared<InflightTracker>(1024)) {
if (HasCacheStore()) {
store_ = std::make_shared<DiskCacheGroup>(ParseDiskCacheOption());
} else {
Expand All @@ -98,6 +101,8 @@ Status BlockCacheImpl::Start() {
CHECK_NOTNULL(store_);
CHECK_NOTNULL(uploader_);
CHECK_NOTNULL(joiner_);
CHECK_NOTNULL(inflight_cache_);
CHECK_NOTNULL(inflight_prefetch_);

if (running_) {
return Status::OK();
Expand Down Expand Up @@ -248,6 +253,8 @@ Status BlockCacheImpl::Prefetch(ContextSPtr ctx, const BlockKey& key,

if (IsCached(key)) {
return Status::OK();
} else if (store_->IsFull(key)) {
return Status::CacheFull("disk cache is full");
}

NEXT_STEP("s3_range");
Expand Down Expand Up @@ -309,13 +316,25 @@ void BlockCacheImpl::AsyncCache(ContextSPtr ctx, const BlockKey& key,
CacheOption option) {
CHECK_RUNNING("Block cache");

auto* self = GetSelfPtr();
auto tid = RunInBthread([self, ctx, key, block, cb, option]() {
Status status = self->Cache(ctx, key, block, option);
auto inflight_tracker = inflight_cache_;
auto status = inflight_tracker->Add(key.Filename());
if (status.IsExist()) {
if (cb) {
cb(status);
}
});
return;
}

auto* self = GetSelfPtr();
auto tid =
RunInBthread([inflight_tracker, self, ctx, key, block, cb, option]() {
Status status = self->Cache(ctx, key, block, option);
if (cb) {
cb(status);
}

inflight_tracker->Remove(key.Filename());
});

if (tid != 0) {
joiner_->BackgroundJoin(tid);
Expand All @@ -327,13 +346,25 @@ void BlockCacheImpl::AsyncPrefetch(ContextSPtr ctx, const BlockKey& key,
PrefetchOption option) {
CHECK_RUNNING("Block cache");

auto* self = GetSelfPtr();
auto tid = RunInBthread([self, ctx, key, length, cb, option]() {
Status status = self->Prefetch(ctx, key, length, option);
auto inflight_tracker = inflight_prefetch_;
auto status = inflight_tracker->Add(key.Filename());
if (status.IsExist()) {
if (cb) {
cb(status);
}
});
return;
}

auto* self = GetSelfPtr();
auto tid =
RunInBthread([inflight_tracker, self, ctx, key, length, cb, option]() {
Status status = self->Prefetch(ctx, key, length, option);
if (cb) {
cb(status);
}

inflight_tracker->Remove(key.Filename());
});

if (tid != 0) {
joiner_->BackgroundJoin(tid);
Expand All @@ -351,7 +382,7 @@ Status BlockCacheImpl::StoragePut(ContextSPtr ctx, const BlockKey& key,
return status;
}

status = storage->Upload(ctx, key, block);
status = storage->Put(ctx, key, block);
if (!status.ok()) {
GENERIC_LOG_UPLOAD_ERROR();
}
Expand All @@ -370,7 +401,7 @@ Status BlockCacheImpl::StorageRange(ContextSPtr ctx, const BlockKey& key,
return status;
}

status = storage->Download(ctx, key, offset, length, buffer);
status = storage->Range(ctx, key, offset, length, buffer);
if (!status.ok()) {
GENERIC_LOG_DOWNLOAD_ERROR();
}
Expand Down
3 changes: 3 additions & 0 deletions src/cache/blockcache/block_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "cache/storage/storage.h"
#include "cache/storage/storage_pool.h"
#include "cache/utils/context.h"
#include "cache/utils/inflight_tracker.h"

namespace dingofs {
namespace cache {
Expand Down Expand Up @@ -85,6 +86,8 @@ class BlockCacheImpl final : public BlockCache {
CacheStoreSPtr store_;
BlockCacheUploaderSPtr uploader_;
BthreadJoinerUPtr joiner_;
InflightTrackerSPtr inflight_cache_;
InflightTrackerSPtr inflight_prefetch_;
};

} // namespace cache
Expand Down
2 changes: 1 addition & 1 deletion src/cache/blockcache/block_cache_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ Status BlockCacheUploader::Upload(const StagingBlock& staging_block,
return status;
}

status = storage->Upload(staging_block.ctx, key, Block(buffer));
status = storage->Put(staging_block.ctx, key, Block(buffer));
if (!status.ok()) {
LOG_CTX(ERROR) << "Upload staging block failed: key = " << key.Filename()
<< ", status = " << status.ToString();
Expand Down
1 change: 1 addition & 0 deletions src/cache/blockcache/cache_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class CacheStore {
virtual std::string Id() const = 0;
virtual bool IsRunning() const = 0;
virtual bool IsCached(const BlockKey& key) const = 0;
virtual bool IsFull(const BlockKey& key) const = 0;
};

using CacheStoreSPtr = std::shared_ptr<CacheStore>;
Expand Down
8 changes: 5 additions & 3 deletions src/cache/blockcache/disk_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ Status DiskCache::CreateDirs() {
GetProbeDir(),
};
for (const auto& dir : dirs) {
auto status = FSHelper::MkDirs(dir);
auto status = FSUtil::MkDirs(dir);
if (!status.ok()) {
LOG(ERROR) << "Create directory failed: dir = " << dir
<< ", status = " << status.ToString();
Expand All @@ -204,12 +204,12 @@ Status DiskCache::CreateDirs() {
Status DiskCache::LoadOrCreateLockFile() {
std::string content;
auto lock_path = GetLockPath();
auto status = FSHelper::ReadFile(lock_path, &content);
auto status = FSUtil::ReadFile(lock_path, &content);
if (status.ok()) {
uuid_ = utils::TrimSpace(content);
} else if (status.IsNotFound()) {
uuid_ = utils::GenUuid();
status = FSHelper::WriteFile(lock_path, uuid_);
status = FSUtil::WriteFile(lock_path, uuid_);
}

if (!status.ok()) {
Expand Down Expand Up @@ -421,6 +421,8 @@ bool DiskCache::IsCached(const BlockKey& key) const {
return false;
}

bool DiskCache::IsFull(const BlockKey& /*key*/) const { return CacheFull(); }

// CheckStatus cache status:
// 1. check running status (UP/DOWN)
// 2. check disk healthy (HEALTHY/UNHEALTHY)
Expand Down
1 change: 1 addition & 0 deletions src/cache/blockcache/disk_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class DiskCache final : public CacheStore {
std::string Id() const override;
bool IsRunning() const override;
bool IsCached(const BlockKey& key) const override;
bool IsFull(const BlockKey& key) const override;

private:
friend class Target;
Expand Down
8 changes: 7 additions & 1 deletion src/cache/blockcache/disk_cache_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ Status DiskCacheGroup::Load(ContextSPtr ctx, const BlockKey& key, off_t offset,
return status;
}

std::string DiskCacheGroup::Id() const { return "disk_cache_group"; }

bool DiskCacheGroup::IsRunning() const {
return running_.load(std::memory_order_relaxed);
}
Expand All @@ -163,7 +165,11 @@ bool DiskCacheGroup::IsCached(const BlockKey& key) const {
return GetStore(key)->IsCached(key);
}

std::string DiskCacheGroup::Id() const { return "disk_cache_group"; }
bool DiskCacheGroup::IsFull(const BlockKey& key) const {
DCHECK_RUNNING("Disk cache group");

return GetStore(key)->IsFull(key);
}

std::vector<uint64_t> DiskCacheGroup::CalcWeights(
std::vector<DiskCacheOption> options) {
Expand Down
1 change: 1 addition & 0 deletions src/cache/blockcache/disk_cache_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DiskCacheGroup final : public CacheStore {
std::string Id() const override;
bool IsRunning() const override;
bool IsCached(const BlockKey& key) const override;
bool IsFull(const BlockKey& key) const override;

private:
static std::vector<uint64_t> CalcWeights(
Expand Down
4 changes: 2 additions & 2 deletions src/cache/blockcache/disk_cache_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void DiskCacheLoader::LoadAllBlocks(const std::string& dir, BlockType type) {

timer.start();
status =
FSHelper::Walk(dir, [&](const std::string& prefix, const FileInfo& file) {
FSUtil::Walk(dir, [&](const std::string& prefix, const FileInfo& file) {
if (!running_.load(std::memory_order_relaxed)) {
return Status::Abort("disk cache loader stopped");
}
Expand Down Expand Up @@ -149,7 +149,7 @@ bool DiskCacheLoader::LoadOneBlock(const std::string& prefix,
std::string path = Helper::PathJoin({prefix, name});

if (Helper::IsTempFilepath(name) || !key.ParseFromFilename(name)) {
auto status = FSHelper::RemoveFile(path);
auto status = FSUtil::RemoveFile(path);
if (status.ok()) {
LOG(INFO) << "Remove invalid block (path=" << path << ") success.";
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/cache/blockcache/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void DiskCacheManager::CheckFreeSpace() {
std::string root_dir = GetRootDir();

while (running_.load(std::memory_order_relaxed)) {
auto status = FSHelper::StatFS(root_dir, &stat);
auto status = FSUtil::StatFS(root_dir, &stat);
if (!status.ok()) {
LOG(ERROR) << "Check disk free space failed: dir = " << root_dir
<< ", status = " << status.ToString();
Expand Down Expand Up @@ -312,7 +312,7 @@ void DiskCacheManager::DeleteBlocks(const ToDel& to_del) {
CacheKey key = item.key;
CacheValue value = item.value;
std::string cache_path = GetCachePath(key);
auto status = FSHelper::RemoveFile(cache_path);
auto status = FSUtil::RemoveFile(cache_path);
if (status.IsNotFound()) {
LOG(WARNING) << "Block file already deleted: path = " << cache_path;
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/cache/blockcache/disk_cache_watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void DiskCacheWatcher::WatchingWorker() {
DiskCacheWatcher::Should DiskCacheWatcher::CheckTarget(Target* target) {
std::string lock_path = target->GetLockPath();

if (!FSHelper::FileExists(lock_path)) { // cache is down
if (!FSUtil::FileExists(lock_path)) { // cache is down
return Should::kShutdown;
} else if (target->IsRunning()) { // cache already up
return Should::kDoNothing;
Expand All @@ -102,7 +102,7 @@ DiskCacheWatcher::Should DiskCacheWatcher::CheckTarget(Target* target) {
bool DiskCacheWatcher::CheckUuid(const std::string& lock_path,
const std::string& uuid) {
std::string content;
auto status = FSHelper::ReadFile(lock_path, &content);
auto status = FSUtil::ReadFile(lock_path, &content);
if (!status.ok()) {
LOG(ERROR) << "Read lock file failed: path = " << lock_path
<< ", status = " << status.ToString();
Expand Down
6 changes: 3 additions & 3 deletions src/cache/blockcache/disk_state_health_checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ void DiskStateHealthChecker::ProbeDisk() {
std::string content(100, '0');
std::string filepath = GetProbeFilepath();

auto status = FSHelper::WriteFile(filepath, content);
auto status = FSUtil::WriteFile(filepath, content);
if (status.ok()) {
status = FSHelper::ReadFile(filepath, &out);
status = FSUtil::ReadFile(filepath, &out);
}

if (!status.ok()) {
Expand All @@ -107,7 +107,7 @@ void DiskStateHealthChecker::ProbeDisk() {
}

SetStatusPage(state_machine_->GetState());
FSHelper::RemoveFile(filepath);
FSUtil::RemoveFile(filepath);
}

std::string DiskStateHealthChecker::GetProbeFilepath() const {
Expand Down
1 change: 1 addition & 0 deletions src/cache/blockcache/mem_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MemStore final : public CacheStore {
std::string Id() const override { return "memory_cache"; }
bool IsRunning() const override { return true; }
bool IsCached(const BlockKey&) const override { return false; }
bool IsFull(const BlockKey&) const override { return true; }
};

} // namespace cache
Expand Down
Loading
Loading