Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions xllm/core/distributed_runtime/comm_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,13 @@ void CommChannel::transfer_kv_blocks(

class ClientStreamReceiver : public brpc::StreamInputHandler {
private:
std::shared_ptr<std::atomic<bool>> termination_flag_;
std::shared_ptr<std::atomic<int32_t>> termination_flag_;
std::shared_ptr<std::atomic<uint32_t>> success_cnt_;
std::promise<void> close_promise_;
std::atomic<bool> promise_set_{false};

public:
ClientStreamReceiver(std::shared_ptr<std::atomic<bool>> termination_flag,
ClientStreamReceiver(std::shared_ptr<std::atomic<int32_t>> termination_flag,
std::shared_ptr<std::atomic<uint32_t>> success_cnt)
: termination_flag_(termination_flag), success_cnt_(success_cnt) {}

Expand All @@ -398,10 +398,10 @@ class ClientStreamReceiver : public brpc::StreamInputHandler {
int32_t success_cnt = std::stoi(msg_str);

if (success_cnt > 0 &&
!termination_flag_->load(std::memory_order_acquire)) {
termination_flag_->load(std::memory_order_acquire) > 0) {
success_cnt_->fetch_add(success_cnt, std::memory_order_relaxed);
} else {
termination_flag_->store(true, std::memory_order_release);
termination_flag_->fetch_sub(1, std::memory_order_release);
brpc::StreamClose(id);
if (!promise_set_.exchange(true)) {
close_promise_.set_value();
Expand All @@ -427,7 +427,7 @@ class ClientStreamReceiver : public brpc::StreamInputHandler {

void CommChannel::prefetch_from_storage(
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::shared_ptr<std::atomic<uint32_t>> success_cnt) {
proto::BlockTransferInfos pb_block_transfer_info;
if (!block_transfer_info_to_proto(block_transfer_info,
Expand Down
2 changes: 1 addition & 1 deletion xllm/core/distributed_runtime/comm_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class CommChannel {

virtual void prefetch_from_storage(
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::shared_ptr<std::atomic<uint32_t>> success_cnt);

virtual bool get_last_step_result_async(
Expand Down
2 changes: 1 addition & 1 deletion xllm/core/distributed_runtime/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Engine {
virtual void prefetch_from_storage(
const uint32_t dp_rank,
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* prefetch_results) {
LOG(FATAL) << " prefetch_from_storage is not implemented!";
};
Expand Down
3 changes: 2 additions & 1 deletion xllm/core/distributed_runtime/llm_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,10 @@ void LLMEngine::transfer_kv_blocks(
void LLMEngine::prefetch_from_storage(
const uint32_t dp_rank,
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* prefetch_results) {
prefetch_results->reserve(dp_local_tp_size_);
flag->store(dp_local_tp_size_, std::memory_order_acquire);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use relaxed/release to store is better.

for (auto tp_rank = 0; tp_rank < dp_local_tp_size_; ++tp_rank) {
prefetch_results->emplace_back(std::make_shared<std::atomic<uint32_t>>(0));
worker_clients_[tp_rank + dp_local_tp_size_ * dp_rank]
Expand Down
2 changes: 1 addition & 1 deletion xllm/core/distributed_runtime/llm_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class LLMEngine : public Engine {
void prefetch_from_storage(
const uint32_t dp_rank,
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::vector<std::shared_ptr<std::atomic<uint32_t>>>* prefetch_results)
override;

Expand Down
2 changes: 1 addition & 1 deletion xllm/core/distributed_runtime/remote_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ void RemoteWorker::transfer_kv_blocks(

void RemoteWorker::prefetch_from_storage(
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::shared_ptr<std::atomic<uint32_t>> success_cnt) {
copy_threadpool_.schedule(
[this,
Expand Down
2 changes: 1 addition & 1 deletion xllm/core/distributed_runtime/remote_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class RemoteWorker : public WorkerClient {

virtual void prefetch_from_storage(
const std::vector<BlockTransferInfo>& block_transfer_info,
std::shared_ptr<std::atomic<bool>> flag,
std::shared_ptr<std::atomic<int32_t>> flag,
std::shared_ptr<std::atomic<uint32_t>> success_cnt) override;

// Run the model and return the output.
Expand Down
2 changes: 1 addition & 1 deletion xllm/core/framework/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Block final {
memcpy(hash_value_, hash_value, MURMUR_HASH3_VALUE_LEN);
}

uint32_t get_hash_value_len() { return MURMUR_HASH3_VALUE_LEN; }
uint32_t get_hash_value_len() const { return MURMUR_HASH3_VALUE_LEN; }

private:
// increase reference count
Expand Down
3 changes: 0 additions & 3 deletions xllm/core/framework/block/block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class BlockManager {

virtual void deallocate(const Slice<Block>& blocks) = 0;

virtual void deallocate(std::vector<Block>& blocks) = 0;

virtual std::vector<Block> allocate(size_t num_blocks) = 0;

virtual std::vector<Block> allocate_shared(
Expand All @@ -68,7 +66,6 @@ class BlockManager {

// get merged all dp rank KVCacheEvent
virtual void get_merged_kvcache_event(KvCacheEvent* event) const = 0;
virtual float get_gpu_cache_usage_perc() const = 0;

virtual size_t num_blocks_in_prefix_cache() const = 0;
virtual size_t num_free_blocks() const = 0;
Expand Down
11 changes: 2 additions & 9 deletions xllm/core/framework/block/block_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ void BlockManagerImpl::deallocate(const Slice<Block>& blocks) {
for (const auto& block : blocks) {
// the block is not shared by other sequence
if (block.is_valid() && block.ref_count() <= 2) {
auto origin_num_used_blocks =
num_used_blocks_.fetch_sub(1, std::memory_order_relaxed);
if (origin_num_used_blocks < 0) {
if (num_used_blocks_ == 0) {
LOG(ERROR) << "num_used_blocks_==0 cannot fetch_sub for id:"
<< block.id()
<< ", total block size: " << num_total_blocks();
Expand All @@ -86,19 +84,14 @@ void BlockManagerImpl::deallocate(const Slice<Block>& blocks) {
}
LOG(FATAL) << error_msg;
}
num_used_blocks_.fetch_sub(1, std::memory_order_relaxed);
}
}
} else {
num_used_blocks_.fetch_sub(blocks.size(), std::memory_order_relaxed);
}
}

void BlockManagerImpl::deallocate(std::vector<Block>& blocks) {
Slice<Block> slice(blocks);
deallocate(slice);
blocks.clear();
}

bool BlockManagerImpl::has_enough_blocks(uint32_t num_blocks) {
if (num_blocks <= num_free_blocks_) {
return true;
Expand Down
6 changes: 0 additions & 6 deletions xllm/core/framework/block/block_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class BlockManagerImpl : public BlockManager {

void deallocate(const Slice<Block>& blocks) override;

void deallocate(std::vector<Block>& blocks) override;

// allocate shared blocks when enable prefix cache
std::vector<Block> allocate_shared(
const Slice<int32_t>& tokens_ids,
Expand Down Expand Up @@ -78,10 +76,6 @@ class BlockManagerImpl : public BlockManager {
}
}

float get_gpu_cache_usage_perc() const override {
return 1 - static_cast<float>(num_free_blocks_) / num_total_blocks();
}

// call BlockManager to free block used by Block.
void free(int32_t block_id) override;

Expand Down
6 changes: 4 additions & 2 deletions xllm/core/framework/block/block_manager_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size)
.block_size(options_.block_size())
.enable_prefix_cache(options_.enable_prefix_cache())
.enable_disagg_pd(options_.enable_disagg_pd())
.enable_cache_upload(options_.enable_cache_upload());
.enable_cache_upload(options_.host_num_blocks() > 0
? false
: options_.enable_cache_upload());

for (int32_t i = 0; i < dp_size; ++i) {
if (options.enable_disagg_pd() || options_.enable_kvcache_store()) {
Expand Down Expand Up @@ -221,7 +223,7 @@ void BlockManagerPool::get_merged_kvcache_event(KvCacheEvent* event) const {
float BlockManagerPool::get_gpu_cache_usage_perc() const {
float perc = 0.0;
for (int32_t i = 0; i < block_managers_.size(); ++i) {
perc += block_managers_[i]->get_gpu_cache_usage_perc();
perc += block_managers_[i]->kv_cache_utilization();
}
return perc / block_managers_.size();
}
Expand Down
5 changes: 0 additions & 5 deletions xllm/core/framework/block/concurrent_block_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ void ConcurrentBlockManagerImpl::deallocate(const Slice<Block>& blocks) {
BlockManagerImpl::deallocate(blocks);
}

void ConcurrentBlockManagerImpl::deallocate(std::vector<Block>& blocks) {
std::lock_guard<std::mutex> lock(mutex_);
BlockManagerImpl::deallocate(blocks);
}

std::vector<Block> ConcurrentBlockManagerImpl::allocate_shared(
const Slice<int32_t>& tokens_ids,
const Slice<Block>& existed_shared_blocks) {
Expand Down
2 changes: 0 additions & 2 deletions xllm/core/framework/block/concurrent_block_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class ConcurrentBlockManagerImpl : public BlockManagerImpl {

void deallocate(const Slice<Block>& blocks) override;

void deallocate(std::vector<Block>& blocks) override;

// try to share blocks among sequences with the same prefix
std::vector<Block> allocate_shared(
const Slice<int32_t>& tokens_ids,
Expand Down
Loading