Skip to content

Commit 08cdf97

Browse files
committed
bugfix: resolve multi-threading conflict in kv cache events reporting.
1 parent 579ba32 commit 08cdf97

File tree

11 files changed

+72
-46
lines changed

11 files changed

+72
-46
lines changed

xllm/core/framework/block/block_manager.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ class BlockManager {
6666

6767
// get merged all dp rank KVCacheEvent
6868
virtual void get_merged_kvcache_event(KvCacheEvent* event) const = 0;
69-
virtual float get_gpu_cache_usage_perc() const = 0;
7069

7170
virtual size_t num_blocks_in_prefix_cache() const = 0;
7271
virtual size_t num_free_blocks() const = 0;

xllm/core/framework/block/block_manager_impl.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,7 @@ void BlockManagerImpl::deallocate(const Slice<Block>& blocks) {
7070
for (const auto& block : blocks) {
7171
// the block is not shared by other sequence
7272
if (block.is_valid() && block.ref_count() <= 2) {
73-
auto origin_num_used_blocks =
74-
num_used_blocks_.fetch_sub(1, std::memory_order_relaxed);
75-
if (origin_num_used_blocks < 0) {
73+
if (num_used_blocks_ == 0) {
7674
LOG(ERROR) << "num_used_blocks_==0 cannot fetch_sub for id:"
7775
<< block.id()
7876
<< ", total block size: " << num_total_blocks();
@@ -86,6 +84,7 @@ void BlockManagerImpl::deallocate(const Slice<Block>& blocks) {
8684
}
8785
LOG(FATAL) << error_msg;
8886
}
87+
num_used_blocks_.fetch_sub(1, std::memory_order_relaxed);
8988
}
9089
}
9190
} else {

xllm/core/framework/block/block_manager_impl.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,6 @@ class BlockManagerImpl : public BlockManager {
7676
}
7777
}
7878

79-
float get_gpu_cache_usage_perc() const override {
80-
return 1 - static_cast<float>(num_free_blocks_) / num_total_blocks();
81-
}
82-
8379
// call BlockManager to free block used by Block.
8480
void free(int32_t block_id) override;
8581

xllm/core/framework/block/block_manager_pool.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size)
3030
.block_size(options_.block_size())
3131
.enable_prefix_cache(options_.enable_prefix_cache())
3232
.enable_disagg_pd(options_.enable_disagg_pd())
33-
.enable_cache_upload(options_.enable_cache_upload());
33+
.enable_cache_upload(options_.host_num_blocks() > 0
34+
? false
35+
: options_.enable_cache_upload());
3436

3537
for (int32_t i = 0; i < dp_size; ++i) {
3638
if (options.enable_disagg_pd() || options_.enable_kvcache_store()) {
@@ -221,7 +223,7 @@ void BlockManagerPool::get_merged_kvcache_event(KvCacheEvent* event) const {
221223
float BlockManagerPool::get_gpu_cache_usage_perc() const {
222224
float perc = 0.0;
223225
for (int32_t i = 0; i < block_managers_.size(); ++i) {
224-
perc += block_managers_[i]->get_gpu_cache_usage_perc();
226+
perc += block_managers_[i]->kv_cache_utilization();
225227
}
226228
return perc / block_managers_.size();
227229
}

xllm/core/framework/block/hierarchy_block_manager_pool.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ HierarchyBlockManagerPool::HierarchyBlockManagerPool(
3434
.enable_prefix_cache(options_.enable_prefix_cache())
3535
.enable_disagg_pd(options_.enable_disagg_pd())
3636
.num_blocks(options_.host_num_blocks())
37-
.enable_cache_upload(false);
37+
.enable_cache_upload(options_.enable_cache_upload());
3838

3939
for (int32_t i = 0; i < dp_size; ++i) {
4040
if (options.enable_disagg_pd() || options_.enable_kvcache_store()) {
@@ -69,13 +69,11 @@ void HierarchyBlockManagerPool::deallocate(Sequence* sequence) {
6969
size_t needed_block_num =
7070
sequence->num_tokens() / options_.block_size() - host_blocks->size();
7171

72-
if (needed_block_num == 0) {
73-
return;
72+
if (needed_block_num != 0) {
73+
sequence->host_kv_state().add_kv_blocks(
74+
host_block_managers_[dp_rank]->allocate(needed_block_num));
7475
}
7576

76-
sequence->host_kv_state().add_kv_blocks(
77-
host_block_managers_[dp_rank]->allocate(needed_block_num));
78-
7977
for (size_t i = cached_block_num; i < host_blocks->size(); i++) {
8078
if (blocks->at(i).ref_count() != 2) {
8179
continue;
@@ -86,8 +84,7 @@ void HierarchyBlockManagerPool::deallocate(Sequence* sequence) {
8684
std::move(blocks->at(i)), std::move(host_blocks->at(i)));
8785
offload_block_pair_queues_[dp_rank].enqueue(std::move(block_pair));
8886
}
89-
host_block_managers_[dp_rank]->cache(
90-
*sequence->host_kv_state().mutable_kv_blocks());
87+
9188
host_block_managers_[dp_rank]->deallocate(
9289
sequence->host_kv_state().kv_blocks());
9390

xllm/core/framework/prefix_cache/prefix_cache.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ size_t PrefixCache::insert(const Slice<int32_t>& token_ids,
124124
return insert(token_ids, blocks, &insert_keys);
125125
}
126126

127+
size_t PrefixCache::insert(const std::vector<Block>& blocks) {
128+
std::vector<Murmur3Key> insert_keys;
129+
return insert(blocks, &insert_keys);
130+
}
131+
127132
size_t PrefixCache::evict(size_t n_blocks) {
128133
std::vector<Murmur3Key> evict_keys;
129134
return evict(n_blocks, &evict_keys);
@@ -192,11 +197,13 @@ size_t PrefixCache::insert(const Slice<int32_t>& token_ids,
192197
return n_tokens;
193198
}
194199

195-
size_t PrefixCache::insert(const std::vector<Block>& blocks) {
200+
size_t PrefixCache::insert(const std::vector<Block>& blocks,
201+
std::vector<Murmur3Key>* insert_keys) {
196202
const int64_t now = absl::ToUnixMicros(absl::Now());
197203
DNodeList node_list;
198204
Murmur3Key token_hash_key;
199205

206+
insert_keys->reserve(blocks.size());
200207
for (size_t i = 0; i < blocks.size(); i++) {
201208
if (!blocks[i].is_valid()) {
202209
continue;
@@ -220,6 +227,8 @@ size_t PrefixCache::insert(const std::vector<Block>& blocks) {
220227
cached_blocks_.emplace(std::make_pair(token_hash_key, new_node));
221228

222229
num_blocks_++;
230+
231+
insert_keys->emplace_back(token_hash_key.data);
223232
}
224233
}
225234

xllm/core/framework/prefix_cache/prefix_cache.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,13 @@ class PrefixCache {
6666
const Slice<int32_t>& token_ids,
6767
const Slice<Block>& existed_shared_blocks = {});
6868

69+
// insert the token ids and blocks into the prefix tree
70+
// and set hash key to the corresponding block
71+
// return the length of new inserted tokens
6972
virtual size_t insert(const Slice<int32_t>& token_ids,
7073
std::vector<Block>& blocks);
74+
75+
// insert the blocks with hash key into the prefix tree
7176
virtual size_t insert(const std::vector<Block>& blocks);
7277

7378
// evict blocks hold by the prefix cache
@@ -98,6 +103,10 @@ class PrefixCache {
98103
size_t insert(const Slice<int32_t>& token_ids,
99104
std::vector<Block>& blocks,
100105
std::vector<Murmur3Key>* insert_keys);
106+
107+
size_t insert(const std::vector<Block>& blocks,
108+
std::vector<Murmur3Key>* insert_keys);
109+
101110
size_t evict(size_t n_blocks, std::vector<Murmur3Key>* evict_keys);
102111

103112
struct Node {

xllm/core/framework/prefix_cache/prefix_cache_with_upload.cpp

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,47 +35,54 @@ size_t PrefixCacheWithUpload::insert(const Slice<int32_t>& token_ids,
3535
std::vector<Block>& blocks) {
3636
std::vector<Murmur3Key> insert_keys;
3737
auto n_tokens = PrefixCache::insert(token_ids, blocks, &insert_keys);
38+
save_event_async(true, insert_keys);
39+
return n_tokens;
40+
}
3841

39-
threadpool_.schedule([insert_keys = std::move(insert_keys), this]() {
40-
auto front_ptr = this->db_kvcache_events_.get_front_value();
41-
if (!front_ptr) {
42-
LOG(INFO) << "Front DoubleBufferKvCacheEvent is nullptr!";
43-
return;
44-
}
45-
if (!this->exited_.load()) {
46-
for (const auto& hash_id : insert_keys) {
47-
front_ptr->removed_cache.erase(hash_id);
48-
front_ptr->stored_cache.insert(hash_id);
49-
}
50-
}
51-
});
52-
42+
size_t PrefixCacheWithUpload::insert(const std::vector<Block>& blocks) {
43+
std::vector<Murmur3Key> insert_keys;
44+
auto n_tokens = PrefixCache::insert(blocks, &insert_keys);
45+
save_event_async(true, insert_keys);
5346
return n_tokens;
5447
}
5548

5649
size_t PrefixCacheWithUpload::evict(size_t n_blocks) {
5750
std::vector<Murmur3Key> evict_keys;
5851
auto evict_count = PrefixCache::evict(n_blocks, &evict_keys);
52+
save_event_async(false, evict_keys);
53+
return evict_count;
54+
}
5955

60-
threadpool_.schedule([evict_keys = std::move(evict_keys), this]() {
56+
void PrefixCacheWithUpload::save_event_async(const bool is_insert,
57+
std::vector<Murmur3Key>& keys) {
58+
threadpool_.schedule([this, is_insert = is_insert, keys = std::move(keys)]() {
59+
std::lock_guard<std::mutex> lock(this->mutex_);
6160
auto front_ptr = this->db_kvcache_events_.get_front_value();
6261
if (!front_ptr) {
6362
LOG(INFO) << "Front DoubleBufferKvCacheEvent is nullptr!";
6463
return;
6564
}
6665
if (!this->exited_.load()) {
67-
for (const auto& hash_id : evict_keys) {
68-
front_ptr->removed_cache.insert(hash_id);
69-
front_ptr->stored_cache.erase(hash_id);
66+
if (is_insert) {
67+
for (const auto& key : keys) {
68+
front_ptr->removed_cache.erase(key);
69+
front_ptr->stored_cache.insert(key);
70+
}
71+
} else {
72+
for (const auto& key : keys) {
73+
front_ptr->removed_cache.insert(key);
74+
front_ptr->stored_cache.erase(key);
75+
}
7076
}
7177
}
7278
});
73-
74-
return evict_count;
7579
}
7680

7781
KvCacheEvent* PrefixCacheWithUpload::get_upload_kvcache_events() {
78-
db_kvcache_events_.swap();
82+
{
83+
std::lock_guard<std::mutex> lock(this->mutex_);
84+
db_kvcache_events_.swap();
85+
}
7986
if (!exited_.load()) {
8087
return db_kvcache_events_.get_back_value();
8188
} else {

xllm/core/framework/prefix_cache/prefix_cache_with_upload.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,22 @@ class PrefixCacheWithUpload final : public PrefixCache {
1818
size_t insert(const Slice<int32_t>& token_ids,
1919
std::vector<Block>& blocks) override;
2020

21+
// insert the blocks with hash key into the prefix tree
22+
size_t insert(const std::vector<Block>& blocks) override;
23+
2124
// evict blocks hold by the prefix cache
2225
// return the actual number of evicted blocks
2326
size_t evict(size_t n_blocks) override;
2427

2528
virtual KvCacheEvent* get_upload_kvcache_events() override;
2629

30+
private:
31+
void save_event_async(const bool is_insert, std::vector<Murmur3Key>& keys);
32+
2733
private:
2834
ThreadPool threadpool_;
2935

36+
std::mutex mutex_;
3037
DoubleBuffer<KvCacheEvent> db_kvcache_events_;
3138
};
3239

xllm/core/util/hash_util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ struct Murmur3Key {
4343
std::memcpy(data, input_data, MURMUR_HASH3_VALUE_LEN);
4444
}
4545

46-
std::string debug_string() {
46+
std::string debug_string() const {
4747
std::string rt;
4848
for (int i = 0; i < MURMUR_HASH3_VALUE_LEN; i++) {
4949
rt += std::to_string(int64_t(data[i])) + " ";

0 commit comments

Comments
 (0)