Skip to content

Commit 5d08f1c

Browse files
committed
bugfix: resolve multi-threading conflict in kv cache events reporting.
1 parent 3e06862 commit 5d08f1c

File tree

5 files changed

+60
-28
lines changed

5 files changed

+60
-28
lines changed

xllm/core/framework/prefix_cache/prefix_cache.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ size_t PrefixCache::insert(const Slice<int32_t>& token_ids,
124124
return insert(token_ids, blocks, &insert_keys);
125125
}
126126

127+
size_t PrefixCache::insert(const std::vector<Block>& blocks) {
128+
std::vector<Murmur3Key> insert_keys;
129+
return insert(blocks, &insert_keys);
130+
}
131+
127132
size_t PrefixCache::evict(size_t n_blocks) {
128133
std::vector<Murmur3Key> evict_keys;
129134
return evict(n_blocks, &evict_keys);
@@ -192,11 +197,13 @@ size_t PrefixCache::insert(const Slice<int32_t>& token_ids,
192197
return n_tokens;
193198
}
194199

195-
size_t PrefixCache::insert(const std::vector<Block>& blocks) {
200+
size_t PrefixCache::insert(const std::vector<Block>& blocks,
201+
std::vector<Murmur3Key>* insert_keys) {
196202
const int64_t now = absl::ToUnixMicros(absl::Now());
197203
DNodeList node_list;
198204
Murmur3Key token_hash_key;
199205

206+
insert_keys->reserve(blocks.size());
200207
for (size_t i = 0; i < blocks.size(); i++) {
201208
if (!blocks[i].is_valid()) {
202209
continue;
@@ -220,6 +227,8 @@ size_t PrefixCache::insert(const std::vector<Block>& blocks) {
220227
cached_blocks_.emplace(std::make_pair(token_hash_key, new_node));
221228

222229
num_blocks_++;
230+
231+
insert_keys->emplace_back(token_hash_key.data);
223232
}
224233
}
225234

xllm/core/framework/prefix_cache/prefix_cache.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,13 @@ class PrefixCache {
6666
const Slice<int32_t>& token_ids,
6767
const Slice<Block>& existed_shared_blocks = {});
6868

69+
// insert the token ids and blocks into the prefix tree
70+
// and set hash key to the corresponding block
71+
// return the length of new inserted tokens
6972
virtual size_t insert(const Slice<int32_t>& token_ids,
7073
std::vector<Block>& blocks);
74+
75+
// insert the blocks with hash key into the prefix tree
7176
virtual size_t insert(const std::vector<Block>& blocks);
7277

7378
// evict blocks hold by the prefix cache
@@ -98,6 +103,10 @@ class PrefixCache {
98103
size_t insert(const Slice<int32_t>& token_ids,
99104
std::vector<Block>& blocks,
100105
std::vector<Murmur3Key>* insert_keys);
106+
107+
size_t insert(const std::vector<Block>& blocks,
108+
std::vector<Murmur3Key>* insert_keys);
109+
101110
size_t evict(size_t n_blocks, std::vector<Murmur3Key>* evict_keys);
102111

103112
struct Node {

xllm/core/framework/prefix_cache/prefix_cache_with_upload.cpp

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,47 +35,54 @@ size_t PrefixCacheWithUpload::insert(const Slice<int32_t>& token_ids,
3535
std::vector<Block>& blocks) {
3636
std::vector<Murmur3Key> insert_keys;
3737
auto n_tokens = PrefixCache::insert(token_ids, blocks, &insert_keys);
38+
save_event_async(true, insert_keys);
39+
return n_tokens;
40+
}
3841

39-
threadpool_.schedule([insert_keys = std::move(insert_keys), this]() {
40-
auto front_ptr = this->db_kvcache_events_.get_front_value();
41-
if (!front_ptr) {
42-
LOG(INFO) << "Front DoubleBufferKvCacheEvent is nullptr!";
43-
return;
44-
}
45-
if (!this->exited_.load()) {
46-
for (const auto& hash_id : insert_keys) {
47-
front_ptr->removed_cache.erase(hash_id);
48-
front_ptr->stored_cache.insert(hash_id);
49-
}
50-
}
51-
});
52-
42+
size_t PrefixCacheWithUpload::insert(const std::vector<Block>& blocks) {
43+
std::vector<Murmur3Key> insert_keys;
44+
auto n_tokens = PrefixCache::insert(blocks, &insert_keys);
45+
save_event_async(true, insert_keys);
5346
return n_tokens;
5447
}
5548

5649
size_t PrefixCacheWithUpload::evict(size_t n_blocks) {
5750
std::vector<Murmur3Key> evict_keys;
5851
auto evict_count = PrefixCache::evict(n_blocks, &evict_keys);
52+
save_event_async(false, evict_keys);
53+
return evict_count;
54+
}
5955

60-
threadpool_.schedule([evict_keys = std::move(evict_keys), this]() {
56+
void PrefixCacheWithUpload::save_event_async(const bool is_insert,
57+
std::vector<Murmur3Key>& keys) {
58+
threadpool_.schedule([this, is_insert = is_insert, keys = std::move(keys)]() {
59+
std::lock_guard<std::mutex> lock(this->mutex_);
6160
auto front_ptr = this->db_kvcache_events_.get_front_value();
6261
if (!front_ptr) {
6362
LOG(INFO) << "Front DoubleBufferKvCacheEvent is nullptr!";
6463
return;
6564
}
6665
if (!this->exited_.load()) {
67-
for (const auto& hash_id : evict_keys) {
68-
front_ptr->removed_cache.insert(hash_id);
69-
front_ptr->stored_cache.erase(hash_id);
66+
if (is_insert) {
67+
for (const auto& hash_id : keys) {
68+
front_ptr->removed_cache.erase(hash_id);
69+
front_ptr->stored_cache.insert(hash_id);
70+
}
71+
} else {
72+
for (const auto& hash_id : keys) {
73+
front_ptr->removed_cache.insert(hash_id);
74+
front_ptr->stored_cache.erase(hash_id);
75+
}
7076
}
7177
}
7278
});
73-
74-
return evict_count;
7579
}
7680

7781
KvCacheEvent* PrefixCacheWithUpload::get_upload_kvcache_events() {
78-
db_kvcache_events_.swap();
82+
{
83+
std::lock_guard<std::mutex> lock(this->mutex_);
84+
db_kvcache_events_.swap();
85+
}
7986
if (!exited_.load()) {
8087
return db_kvcache_events_.get_back_value();
8188
} else {

xllm/core/framework/prefix_cache/prefix_cache_with_upload.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,22 @@ class PrefixCacheWithUpload final : public PrefixCache {
1818
size_t insert(const Slice<int32_t>& token_ids,
1919
std::vector<Block>& blocks) override;
2020

21+
// insert the blocks with hash key into the prefix tree
22+
size_t insert(const std::vector<Block>& blocks) override;
23+
2124
// evict blocks hold by the prefix cache
2225
// return the actual number of evicted blocks
2326
size_t evict(size_t n_blocks) override;
2427

2528
virtual KvCacheEvent* get_upload_kvcache_events() override;
2629

30+
private:
31+
void save_event_async(const bool is_insert, std::vector<Murmur3Key>& keys);
32+
2733
private:
2834
ThreadPool threadpool_;
2935

36+
std::mutex mutex_;
3037
DoubleBuffer<KvCacheEvent> db_kvcache_events_;
3138
};
3239

xllm/xllm.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ int run() {
174174
.reasoning_parser(FLAGS_reasoning_parser)
175175
.priority_strategy(FLAGS_priority_strategy)
176176
.enable_online_preempt_offline(FLAGS_enable_online_preempt_offline)
177-
.enable_cache_upload(FLAGS_enable_prefix_cache &&
178-
FLAGS_enable_service_routing &&
179-
FLAGS_enable_cache_upload)
177+
.enable_cache_upload(
178+
(FLAGS_enable_service_routing || FLAGS_enable_disagg_pd) &&
179+
FLAGS_enable_prefix_cache && FLAGS_enable_cache_upload)
180180
.host_blocks_factor(FLAGS_host_blocks_factor)
181-
.enable_kvcache_store(FLAGS_enable_kvcache_store &&
182-
FLAGS_enable_prefix_cache &&
183-
(FLAGS_host_blocks_factor > 0.0))
181+
.enable_kvcache_store(FLAGS_enable_prefix_cache &&
182+
FLAGS_enable_kvcache_store &&
183+
(FLAGS_host_blocks_factor > 1.0))
184184
.prefetch_timeout(FLAGS_prefetch_timeout)
185185
.prefetch_bacth_size(FLAGS_prefetch_bacth_size)
186186
.layers_wise_copy_batchs(FLAGS_layers_wise_copy_batchs)

0 commit comments

Comments
 (0)