diff --git a/xllm/core/distributed_runtime/comm_channel.cpp b/xllm/core/distributed_runtime/comm_channel.cpp index df48d80d5..bfb074afa 100644 --- a/xllm/core/distributed_runtime/comm_channel.cpp +++ b/xllm/core/distributed_runtime/comm_channel.cpp @@ -372,13 +372,13 @@ void CommChannel::transfer_kv_blocks( class ClientStreamReceiver : public brpc::StreamInputHandler { private: - std::shared_ptr> termination_flag_; + std::shared_ptr> termination_flag_; std::shared_ptr> success_cnt_; std::promise close_promise_; std::atomic promise_set_{false}; public: - ClientStreamReceiver(std::shared_ptr> termination_flag, + ClientStreamReceiver(std::shared_ptr> termination_flag, std::shared_ptr> success_cnt) : termination_flag_(termination_flag), success_cnt_(success_cnt) {} @@ -398,11 +398,10 @@ class ClientStreamReceiver : public brpc::StreamInputHandler { int32_t success_cnt = std::stoi(msg_str); if (success_cnt > 0 && - !termination_flag_->load(std::memory_order_acquire)) { + termination_flag_->load(std::memory_order_acquire) > 0) { success_cnt_->fetch_add(success_cnt, std::memory_order_relaxed); } else { - termination_flag_->store(true, std::memory_order_release); - brpc::StreamClose(id); + termination_flag_->fetch_sub(1, std::memory_order_release); if (!promise_set_.exchange(true)) { close_promise_.set_value(); } @@ -427,7 +426,7 @@ class ClientStreamReceiver : public brpc::StreamInputHandler { void CommChannel::prefetch_from_storage( const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::shared_ptr> success_cnt) { proto::BlockTransferInfos pb_block_transfer_info; if (!block_transfer_info_to_proto(block_transfer_info, @@ -441,6 +440,7 @@ void CommChannel::prefetch_from_storage( brpc::StreamId stream_id; proto::Status response; stream_options.handler = &receiver; + stream_options.idle_timeout_ms = 30; if (brpc::StreamCreate(&stream_id, cntl, &stream_options) != 0) { LOG(ERROR) << "Failed to create stream"; return; @@ -449,12 +449,13 @@ void CommChannel::prefetch_from_storage( stub_->PrefetchFromStorage( &cntl, &pb_block_transfer_info, &response, nullptr); - if (cntl.Failed()) { + if (cntl.Failed() || !response.ok()) { LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText(); return; } receiver.get_close_future().wait(); + brpc::StreamClose(stream_id); } bool CommChannel::get_last_step_result_async( diff --git a/xllm/core/distributed_runtime/comm_channel.h b/xllm/core/distributed_runtime/comm_channel.h index a3867c2a2..67034d441 100644 --- a/xllm/core/distributed_runtime/comm_channel.h +++ b/xllm/core/distributed_runtime/comm_channel.h @@ -99,7 +99,7 @@ class CommChannel { virtual void prefetch_from_storage( const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::shared_ptr> success_cnt); virtual bool get_last_step_result_async( diff --git a/xllm/core/distributed_runtime/engine.h b/xllm/core/distributed_runtime/engine.h index 81c38cb0b..2a7366dce 100644 --- a/xllm/core/distributed_runtime/engine.h +++ b/xllm/core/distributed_runtime/engine.h @@ -97,7 +97,7 @@ class Engine { virtual void prefetch_from_storage( const uint32_t dp_rank, const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::vector>>* prefetch_results) { LOG(FATAL) << " prefetch_from_storage is not implemented!"; }; diff --git a/xllm/core/distributed_runtime/llm_engine.cpp b/xllm/core/distributed_runtime/llm_engine.cpp index 528f799ac..25c83bcc3 100644 --- a/xllm/core/distributed_runtime/llm_engine.cpp +++ b/xllm/core/distributed_runtime/llm_engine.cpp @@ -518,9 +518,10 @@ void LLMEngine::transfer_kv_blocks( void LLMEngine::prefetch_from_storage( const uint32_t dp_rank, const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::vector>>* prefetch_results) { prefetch_results->reserve(dp_local_tp_size_); + flag->store(dp_local_tp_size_, std::memory_order_relaxed); for (auto tp_rank = 0; tp_rank < dp_local_tp_size_; ++tp_rank) { prefetch_results->emplace_back(std::make_shared>(0)); worker_clients_[tp_rank + dp_local_tp_size_ * dp_rank] diff --git a/xllm/core/distributed_runtime/llm_engine.h b/xllm/core/distributed_runtime/llm_engine.h index 6340f32c5..9918c12f7 100644 --- a/xllm/core/distributed_runtime/llm_engine.h +++ b/xllm/core/distributed_runtime/llm_engine.h @@ -83,7 +83,7 @@ class LLMEngine : public Engine { void prefetch_from_storage( const uint32_t dp_rank, const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::vector>>* prefetch_results) override; diff --git a/xllm/core/distributed_runtime/remote_worker.cpp b/xllm/core/distributed_runtime/remote_worker.cpp index 7d646167e..e558f6ab7 100644 --- a/xllm/core/distributed_runtime/remote_worker.cpp +++ b/xllm/core/distributed_runtime/remote_worker.cpp @@ -314,7 +314,7 @@ void RemoteWorker::transfer_kv_blocks( void RemoteWorker::prefetch_from_storage( const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::shared_ptr> success_cnt) { copy_threadpool_.schedule( [this, diff --git a/xllm/core/distributed_runtime/remote_worker.h b/xllm/core/distributed_runtime/remote_worker.h index db3039344..31d837db6 100644 --- a/xllm/core/distributed_runtime/remote_worker.h +++ b/xllm/core/distributed_runtime/remote_worker.h @@ -121,7 +121,7 @@ class RemoteWorker : public WorkerClient { virtual void prefetch_from_storage( const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::shared_ptr> success_cnt) override; // Run the model and return the output. diff --git a/xllm/core/distributed_runtime/worker_service.cpp b/xllm/core/distributed_runtime/worker_service.cpp index 213e9a837..335b6e1b1 100644 --- a/xllm/core/distributed_runtime/worker_service.cpp +++ b/xllm/core/distributed_runtime/worker_service.cpp @@ -407,41 +407,6 @@ void WorkerService::TransferBlocks( return; } -class ServerStreamHandler : public brpc::StreamInputHandler { - private: - std::promise close_promise_; - std::atomic promise_set_{false}; - - public: - ~ServerStreamHandler() { - if (!promise_set_.exchange(true)) { - close_promise_.set_value(); - } - } - - std::future get_close_future() { return close_promise_.get_future(); } - - int on_received_messages(brpc::StreamId id, - butil::IOBuf* const messages[], - size_t size) override { - LOG(WARNING) << "ServerStreamHandler::on_received_messages not implement."; - return 0; - } - - void on_closed(brpc::StreamId id) override { - if (!promise_set_.exchange(true)) { - close_promise_.set_value(); - } - } - - void on_idle_timeout(brpc::StreamId id) override { - if (!promise_set_.exchange(true)) { - LOG(WARNING) << "Stream idle timeout: " << id; - close_promise_.set_value(); - } - } -}; - void WorkerService::PrefetchFromStorage( google::protobuf::RpcController* controller, const proto::BlockTransferInfos* req, @@ -450,11 +415,10 @@ void WorkerService::PrefetchFromStorage( brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(controller); - auto stream_handler = std::make_unique(); - auto stream_id = std::make_unique(); + brpc::StreamId stream_id; brpc::StreamOptions stream_options; - stream_options.handler = stream_handler.get(); - if (brpc::StreamAccept(stream_id.get(), *cntl, &stream_options) != 0) { + stream_options.idle_timeout_ms = 5 * options_.prefetch_bacth_size(); + if (brpc::StreamAccept(&stream_id, *cntl, &stream_options) != 0) { resp->set_ok(false); LOG(ERROR) << "Failed to accept stream!"; return; @@ -463,49 +427,47 @@ void WorkerService::PrefetchFromStorage( std::vector block_transfer_info; proto_to_block_transfer_info(*req, block_transfer_info); - copy_threadpool_.schedule( - [this, - block_transfer_info = std::move(block_transfer_info), - stream_id = std::move(stream_id), - stream_handler = std::move(stream_handler)]() mutable { - Slice transfer_slice{block_transfer_info}; - auto close_future = stream_handler->get_close_future(); - bool is_completed = false; - - for (size_t i = 0; i < transfer_slice.size(); - i += options_.prefetch_bacth_size()) { - auto current_slice = - transfer_slice.slice(i, - std::min(i + options_.prefetch_bacth_size(), - transfer_slice.size())); - - auto success_cnt = worker_->transfer_kv_blocks(UNINITIALIZED_BATCH_ID, - current_slice); - - if (success_cnt != current_slice.size() || - i + options_.prefetch_bacth_size() >= transfer_slice.size()) { - is_completed = true; - } + copy_threadpool_.schedule([this, + block_transfer_info = + std::move(block_transfer_info), + stream_id = std::move(stream_id)]() mutable { + Slice transfer_slice{block_transfer_info}; + bool is_completed = false; + + for (size_t i = 0; i < transfer_slice.size(); + i += options_.prefetch_bacth_size()) { + auto current_slice = transfer_slice.slice( + i, + std::min(i + options_.prefetch_bacth_size(), transfer_slice.size())); + + auto success_cnt = + worker_->transfer_kv_blocks(UNINITIALIZED_BATCH_ID, current_slice); + + if (success_cnt != current_slice.size() || + (i + options_.prefetch_bacth_size()) >= transfer_slice.size()) { + is_completed = true; + } - butil::IOBuf buf; - buf.append(std::to_string(success_cnt)); - if (brpc::StreamWrite(*stream_id.get(), buf) != 0) { - break; - } + butil::IOBuf buf; + buf.append(std::to_string(success_cnt)); + if (brpc::StreamWrite(stream_id, buf) != 0) { + brpc::StreamClose(stream_id); + return; + } - if (is_completed) { - if (success_cnt != 0) { - butil::IOBuf buf_end; - buf_end.append("0"); - brpc::StreamWrite(*stream_id.get(), buf_end); - } - break; + if (is_completed) { + if (success_cnt != 0) { + butil::IOBuf buf_end; + buf_end.append("0"); + if (brpc::StreamWrite(stream_id, buf_end) != 0) { + brpc::StreamClose(stream_id); + return; } } - - close_future.wait(); - brpc::StreamClose(*stream_id.get()); - }); + break; + } + } + }); resp->set_ok(true); return; diff --git a/xllm/core/framework/block/block.h b/xllm/core/framework/block/block.h index 5ef7c367a..e6ed411c1 100644 --- a/xllm/core/framework/block/block.h +++ b/xllm/core/framework/block/block.h @@ -67,7 +67,7 @@ class Block final { memcpy(hash_value_, hash_value, MURMUR_HASH3_VALUE_LEN); } - uint32_t get_hash_value_len() { return MURMUR_HASH3_VALUE_LEN; } + uint32_t get_hash_value_len() const { return MURMUR_HASH3_VALUE_LEN; } private: // increase reference count diff --git a/xllm/core/framework/block/block_manager.h b/xllm/core/framework/block/block_manager.h index eac76c6e2..9c36d7b32 100644 --- a/xllm/core/framework/block/block_manager.h +++ b/xllm/core/framework/block/block_manager.h @@ -54,8 +54,6 @@ class BlockManager { virtual void deallocate(const Slice& blocks) = 0; - virtual void deallocate(std::vector& blocks) = 0; - virtual std::vector allocate(size_t num_blocks) = 0; virtual std::vector allocate_shared( @@ -68,7 +66,6 @@ class BlockManager { // get merged all dp rank KVCacheEvent virtual void get_merged_kvcache_event(KvCacheEvent* event) const = 0; - virtual float get_gpu_cache_usage_perc() const = 0; virtual size_t num_blocks_in_prefix_cache() const = 0; virtual size_t num_free_blocks() const = 0; diff --git a/xllm/core/framework/block/block_manager_impl.cpp b/xllm/core/framework/block/block_manager_impl.cpp index 0aa81474e..58d969a46 100644 --- a/xllm/core/framework/block/block_manager_impl.cpp +++ b/xllm/core/framework/block/block_manager_impl.cpp @@ -70,9 +70,7 @@ void BlockManagerImpl::deallocate(const Slice& blocks) { for (const auto& block : blocks) { // the block is not shared by other sequence if (block.is_valid() && block.ref_count() <= 2) { - auto origin_num_used_blocks = - num_used_blocks_.fetch_sub(1, std::memory_order_relaxed); - if (origin_num_used_blocks < 0) { + if (num_used_blocks_ == 0) { LOG(ERROR) << "num_used_blocks_==0 cannot fetch_sub for id:" << block.id() << ", total block size: " << num_total_blocks(); @@ -86,6 +84,7 @@ void BlockManagerImpl::deallocate(const Slice& blocks) { } LOG(FATAL) << error_msg; } + num_used_blocks_.fetch_sub(1, std::memory_order_relaxed); } } } else { @@ -93,12 +92,6 @@ void BlockManagerImpl::deallocate(const Slice& blocks) { } } -void BlockManagerImpl::deallocate(std::vector& blocks) { - Slice slice(blocks); - deallocate(slice); - blocks.clear(); -} - bool BlockManagerImpl::has_enough_blocks(uint32_t num_blocks) { if (num_blocks <= num_free_blocks_) { return true; diff --git a/xllm/core/framework/block/block_manager_impl.h b/xllm/core/framework/block/block_manager_impl.h index 12e881d98..4549f51b5 100644 --- a/xllm/core/framework/block/block_manager_impl.h +++ b/xllm/core/framework/block/block_manager_impl.h @@ -35,8 +35,6 @@ class BlockManagerImpl : public BlockManager { void deallocate(const Slice& blocks) override; - void deallocate(std::vector& blocks) override; - // allocate shared blocks when enable prefix cache std::vector allocate_shared( const Slice& tokens_ids, @@ -78,10 +76,6 @@ class BlockManagerImpl : public BlockManager { } } - float get_gpu_cache_usage_perc() const override { - return 1 - static_cast(num_free_blocks_) / num_total_blocks(); - } - // call BlockManager to free block used by Block. void free(int32_t block_id) override; diff --git a/xllm/core/framework/block/block_manager_pool.cpp b/xllm/core/framework/block/block_manager_pool.cpp index a175a3d1b..e93d3cbc8 100644 --- a/xllm/core/framework/block/block_manager_pool.cpp +++ b/xllm/core/framework/block/block_manager_pool.cpp @@ -30,7 +30,9 @@ BlockManagerPool::BlockManagerPool(const Options& options, int32_t dp_size) .block_size(options_.block_size()) .enable_prefix_cache(options_.enable_prefix_cache()) .enable_disagg_pd(options_.enable_disagg_pd()) - .enable_cache_upload(options_.enable_cache_upload()); + .enable_cache_upload(options_.host_num_blocks() > 0 + ? false + : options_.enable_cache_upload()); for (int32_t i = 0; i < dp_size; ++i) { if (options.enable_disagg_pd() || options_.enable_kvcache_store()) { @@ -221,7 +223,7 @@ void BlockManagerPool::get_merged_kvcache_event(KvCacheEvent* event) const { float BlockManagerPool::get_gpu_cache_usage_perc() const { float perc = 0.0; for (int32_t i = 0; i < block_managers_.size(); ++i) { - perc += block_managers_[i]->get_gpu_cache_usage_perc(); + perc += block_managers_[i]->kv_cache_utilization(); } return perc / block_managers_.size(); } diff --git a/xllm/core/framework/block/concurrent_block_manager_impl.cpp b/xllm/core/framework/block/concurrent_block_manager_impl.cpp index d3834028b..b80267c43 100644 --- a/xllm/core/framework/block/concurrent_block_manager_impl.cpp +++ b/xllm/core/framework/block/concurrent_block_manager_impl.cpp @@ -30,11 +30,6 @@ void ConcurrentBlockManagerImpl::deallocate(const Slice& blocks) { BlockManagerImpl::deallocate(blocks); } -void ConcurrentBlockManagerImpl::deallocate(std::vector& blocks) { - std::lock_guard lock(mutex_); - BlockManagerImpl::deallocate(blocks); -} - std::vector ConcurrentBlockManagerImpl::allocate_shared( const Slice& tokens_ids, const Slice& existed_shared_blocks) { diff --git a/xllm/core/framework/block/concurrent_block_manager_impl.h b/xllm/core/framework/block/concurrent_block_manager_impl.h index 37c87ab22..68c2804c6 100644 --- a/xllm/core/framework/block/concurrent_block_manager_impl.h +++ b/xllm/core/framework/block/concurrent_block_manager_impl.h @@ -30,8 +30,6 @@ class ConcurrentBlockManagerImpl : public BlockManagerImpl { void deallocate(const Slice& blocks) override; - void deallocate(std::vector& blocks) override; - // try to share blocks among sequences with the same prefix std::vector allocate_shared( const Slice& tokens_ids, diff --git a/xllm/core/framework/block/hierarchy_block_manager_pool.cpp b/xllm/core/framework/block/hierarchy_block_manager_pool.cpp index 89496d383..dfdbace4a 100644 --- a/xllm/core/framework/block/hierarchy_block_manager_pool.cpp +++ b/xllm/core/framework/block/hierarchy_block_manager_pool.cpp @@ -34,7 +34,7 @@ HierarchyBlockManagerPool::HierarchyBlockManagerPool( .enable_prefix_cache(options_.enable_prefix_cache()) .enable_disagg_pd(options_.enable_disagg_pd()) .num_blocks(options_.host_num_blocks()) - .enable_cache_upload(false); + .enable_cache_upload(options_.enable_cache_upload()); for (int32_t i = 0; i < dp_size; ++i) { if (options.enable_disagg_pd() || options_.enable_kvcache_store()) { @@ -47,9 +47,7 @@ HierarchyBlockManagerPool::HierarchyBlockManagerPool( } load_block_transfer_infos_.resize(host_block_managers_.size()); - offload_block_transfer_infos_.resize(host_block_managers_.size()); - saved_host_blocks_.resize(host_block_managers_.size()); - saved_device_blocks_.resize(host_block_managers_.size()); + offload_block_pair_queues_.resize(host_block_managers_.size()); } void HierarchyBlockManagerPool::deallocate(Sequence* sequence) { @@ -61,44 +59,32 @@ void HierarchyBlockManagerPool::deallocate(Sequence* sequence) { auto* blocks = sequence->kv_state().mutable_kv_blocks(); auto* host_blocks = sequence->host_kv_state().mutable_kv_blocks(); - if (blocks->size() == 0 || host_blocks->size() >= blocks->size()) { + if (blocks->size() == 0 || host_blocks->size() > blocks->size()) { return; } size_t cached_block_num = sequence->host_kv_state().kv_cache_tokens_num() / options_.block_size(); - if (host_blocks->size() > 0) { - host_block_managers_[dp_rank]->cache(sequence->tokens(), *host_blocks); - } - size_t needed_block_num = sequence->num_tokens() / options_.block_size() - host_blocks->size(); - if (needed_block_num == 0) { - return; + if (needed_block_num != 0) { + sequence->host_kv_state().add_kv_blocks( + host_block_managers_[dp_rank]->allocate(needed_block_num)); } - sequence->host_kv_state().add_kv_blocks( - host_block_managers_[dp_rank]->allocate(needed_block_num)); - for (size_t i = cached_block_num; i < host_blocks->size(); i++) { if (blocks->at(i).ref_count() != 2) { continue; } host_blocks->at(i).set_hash_value(blocks->at(i).get_immutable_hash_value()); - saved_host_blocks_[dp_rank].emplace_back(std::move(host_blocks->at(i))); - saved_device_blocks_[dp_rank].emplace_back(std::move(blocks->at(i))); - offload_block_transfer_infos_[dp_rank].emplace_back(BlockTransferInfo( - saved_device_blocks_[dp_rank].back().id(), - saved_host_blocks_[dp_rank].back().id(), - saved_host_blocks_[dp_rank].back().get_immutable_hash_value(), - saved_host_blocks_[dp_rank].back().get_hash_value_len(), - TransferType::D2G)); + auto block_pair = std::make_shared( + std::move(blocks->at(i)), std::move(host_blocks->at(i))); + offload_block_pair_queues_[dp_rank].enqueue(std::move(block_pair)); } - host_block_managers_[dp_rank]->cache( - *sequence->host_kv_state().mutable_kv_blocks()); + host_block_managers_[dp_rank]->deallocate( sequence->host_kv_state().kv_blocks()); @@ -162,12 +148,14 @@ void HierarchyBlockManagerPool::prefetch_from_storage( prefill_sequence->tokens()); prefill_sequence->add_shared_host_kv_blocks(std::move(shared_blocks)); - const size_t num_blocks = prefill_sequence->host_kv_state().num_kv_blocks(); // round down to the nearest block number - const size_t block_size = options_.block_size(); + size_t shared_blocks_num = + prefill_sequence->host_kv_state().shared_kv_blocks_num(); const size_t num_additional_blocks = - prefill_sequence->num_tokens() / block_size - num_blocks; - if (num_additional_blocks <= 0) { + (prefill_sequence->num_tokens() + options_.block_size() - 1) / + options_.block_size() - + shared_blocks_num; + if (num_additional_blocks <= 1) { return; } @@ -179,20 +167,19 @@ void HierarchyBlockManagerPool::prefetch_from_storage( prefill_sequence->host_kv_state().add_kv_blocks(host_blocks); PrefixCache::compute_hash_keys( prefill_sequence->tokens(), - *prefill_sequence->host_kv_state().mutable_kv_blocks()); + *prefill_sequence->host_kv_state().mutable_kv_blocks(), + shared_blocks_num); - if (num_additional_blocks > 0) { + if (num_additional_blocks > 1) { const auto host_blocks = prefill_sequence->host_kv_state().kv_blocks(); std::vector block_transfer_infos; block_transfer_infos.reserve(num_additional_blocks); - for (int i = host_blocks.size() - num_additional_blocks; - i < host_blocks.size(); - i++) { - block_transfer_infos.emplace_back( - BlockTransferInfo(-1, - host_blocks[i].id(), - host_blocks[i].get_immutable_hash_value(), - TransferType::G2H)); + for (int i = 0; i < num_additional_blocks - 1; i++) { + block_transfer_infos.emplace_back(BlockTransferInfo( + -1, + host_blocks[shared_blocks_num + i].id(), + host_blocks[shared_blocks_num + i].get_immutable_hash_value(), + TransferType::G2H)); } engine_->prefetch_from_storage(prefill_sequence->dp_rank(), @@ -212,8 +199,21 @@ bool HierarchyBlockManagerPool::update_prefetch_result( bool prefetch_result = true; for (auto& prefill_sequence : request->sequences()) { - prefetch_result &= prefill_sequence->update_prefetch_result(timeout); + uint32_t success_cnt = 0; + prefetch_result &= + prefill_sequence->update_prefetch_result(timeout, success_cnt); + + if (prefetch_result && success_cnt > 0) { + int32_t dp_rank = BlockManagerPool::get_dp_rank(prefill_sequence.get()); + auto host_blocks = prefill_sequence->host_kv_state().kv_blocks(); + auto cached_blocks = + prefill_sequence->host_kv_state().shared_kv_blocks_num(); + + host_block_managers_[dp_rank]->cache( + host_blocks.slice(cached_blocks - success_cnt, cached_blocks)); + } } + return prefetch_result; } @@ -235,36 +235,50 @@ void HierarchyBlockManagerPool::transfer_blocks( } // offload blocks from device to host and kvcache store - for (int i = 0; i < offload_block_transfer_infos_.size(); i++) { - if (!offload_block_transfer_infos_[i].empty()) { - folly::collectAll(std::move(engine_->transfer_kv_blocks( - i, std::move(offload_block_transfer_infos_[i])))) + for (int i = 0; i < offload_block_pair_queues_.size(); i++) { + std::vector transfer_infos; + std::vector src_blocks; + std::vector dst_blocks; + + std::shared_ptr block_pair; + while (offload_block_pair_queues_[i].try_dequeue(block_pair)) { + src_blocks.emplace_back(std::move(block_pair->src)); + dst_blocks.emplace_back(std::move(block_pair->dst)); + transfer_infos.emplace_back( + BlockTransferInfo(src_blocks.back().id(), + dst_blocks.back().id(), + dst_blocks.back().get_immutable_hash_value(), + TransferType::D2G)); + block_pair.reset(); + } + + if (!transfer_infos.empty()) { + folly::collectAll( + std::move(engine_->transfer_kv_blocks(i, std::move(transfer_infos)))) .via(folly::getGlobalCPUExecutor()) - .thenValue([host_blocks = std::move(saved_host_blocks_[i]), - device_blocks = std::move(saved_device_blocks_[i]), - host_block_mgr_ptr = host_block_managers_[i].get(), - device_block_mgr_ptr = block_managers_[i].get()]( - std::vector>&& results) { + .thenValue([device_blocks = std::move(src_blocks), + host_blocks = std::move(dst_blocks), + device_block_mgr_ptr = block_managers_[i].get(), + host_block_mgr_ptr = host_block_managers_[i].get()]( + std::vector>&& results) mutable { for (auto&& result : results) { if (result.value() != host_blocks.size()) { LOG(FATAL) << "Offload copy fail, expected " << host_blocks.size() << ", got " << result.value(); } } + + device_block_mgr_ptr->deallocate({device_blocks}); + device_blocks.clear(); + host_block_mgr_ptr->cache(host_blocks); host_block_mgr_ptr->deallocate({host_blocks}); - device_block_mgr_ptr->deallocate({device_blocks}); + host_blocks.clear(); + return 0; }); } } - - offload_block_transfer_infos_.clear(); - saved_host_blocks_.clear(); - saved_device_blocks_.clear(); - offload_block_transfer_infos_.resize(host_block_managers_.size()); - saved_host_blocks_.resize(host_block_managers_.size()); - saved_device_blocks_.resize(host_block_managers_.size()); } void HierarchyBlockManagerPool::get_merged_kvcache_event( diff --git a/xllm/core/framework/block/hierarchy_block_manager_pool.h b/xllm/core/framework/block/hierarchy_block_manager_pool.h index 5ecd4c301..071faffea 100644 --- a/xllm/core/framework/block/hierarchy_block_manager_pool.h +++ b/xllm/core/framework/block/hierarchy_block_manager_pool.h @@ -17,13 +17,31 @@ limitations under the License. #include "block_manager_pool.h" #include "distributed_runtime/engine.h" +#include "util/blockingconcurrentqueue.h" namespace xllm { class Engine; +struct OffloadBlockPair { + OffloadBlockPair(Block& s, Block& d) : src(s), dst(d) {} + + OffloadBlockPair(Block&& s, Block&& d) + : src(std::move(s)), dst(std::move(d)) {} + + OffloadBlockPair(Block& s) : src(s) {} + + OffloadBlockPair(Block&& s) : src(std::move(s)) {} + + Block src; + Block dst; +}; + class HierarchyBlockManagerPool : public BlockManagerPool { public: + using OffloadBlockPairQueue = + moodycamel::BlockingConcurrentQueue>; + explicit HierarchyBlockManagerPool(const BlockManagerPool::Options& options, Engine* engine, int32_t dp_size = 1); @@ -51,9 +69,7 @@ class HierarchyBlockManagerPool : public BlockManagerPool { // BlockTransferInfo per step std::vector> load_block_transfer_infos_; - std::vector> offload_block_transfer_infos_; - std::vector> saved_host_blocks_; - std::vector> saved_device_blocks_; + std::vector offload_block_pair_queues_; }; } // namespace xllm diff --git a/xllm/core/framework/model/model_input_params.h b/xllm/core/framework/model/model_input_params.h index cd2f1eb40..eea2ef86c 100644 --- a/xllm/core/framework/model/model_input_params.h +++ b/xllm/core/framework/model/model_input_params.h @@ -24,6 +24,7 @@ limitations under the License. #include "framework/batch/batch_forward_type.h" #include "framework/request/mm_data.h" #include "npu_dp_ep_padding.h" +#include "util/hash_util.h" #include "util/tensor_helper.h" namespace xllm { @@ -38,47 +39,62 @@ enum class TransferType : uint8_t { struct BlockTransferInfo { int32_t src_block_id = -1; int32_t dst_block_id = -1; - uint8_t* hash_key = nullptr; + uint8_t hash_key[MURMUR_HASH3_VALUE_LEN]; TransferType transfer_type; - uint32_t hash_key_len = -1; BlockTransferInfo(int32_t src_block_id, int32_t dst_block_id) { this->src_block_id = src_block_id; this->dst_block_id = dst_block_id; } - BlockTransferInfo(int32_t src_block_id, - int32_t dst_block_id, - const uint8_t* hash_key, - TransferType transfer_type) { - this->src_block_id = src_block_id; - this->dst_block_id = dst_block_id; - this->hash_key = const_cast(hash_key); - this->transfer_type = transfer_type; + BlockTransferInfo(int32_t src_id, + int32_t dst_id, + const uint8_t* key, + TransferType type) + : src_block_id(src_id), dst_block_id(dst_id), transfer_type(type) { + memcpy(hash_key, key, MURMUR_HASH3_VALUE_LEN); } - BlockTransferInfo(int32_t src_block_id, - int32_t dst_block_id, - const uint8_t* hash_key, - uint32_t hash_key_len, - TransferType transfer_type) { - this->src_block_id = src_block_id; - this->dst_block_id = dst_block_id; - this->hash_key = new uint8_t[hash_key_len]; - memcpy(this->hash_key, hash_key, hash_key_len); - this->transfer_type = transfer_type; + BlockTransferInfo(const BlockTransferInfo& other) + : src_block_id(other.src_block_id), + dst_block_id(other.dst_block_id), + transfer_type(other.transfer_type) { + memcpy(hash_key, other.hash_key, MURMUR_HASH3_VALUE_LEN); } - ~BlockTransferInfo() { - if (hash_key_len != -1 && hash_key != nullptr) { - delete[] hash_key; - } + BlockTransferInfo(BlockTransferInfo&& other) + : src_block_id(other.src_block_id), + dst_block_id(other.dst_block_id), + transfer_type(other.transfer_type) { + memcpy(hash_key, other.hash_key, MURMUR_HASH3_VALUE_LEN); + + other.src_block_id = -1; + other.dst_block_id = -1; + } + + BlockTransferInfo& operator=(const BlockTransferInfo& other) { + src_block_id = other.src_block_id; + dst_block_id = other.dst_block_id; + transfer_type = other.transfer_type; + memcpy(hash_key, other.hash_key, MURMUR_HASH3_VALUE_LEN); + return *this; + } + + BlockTransferInfo& operator=(BlockTransferInfo&& other) { + src_block_id = other.src_block_id; + dst_block_id = other.dst_block_id; + transfer_type = other.transfer_type; + memcpy(hash_key, other.hash_key, MURMUR_HASH3_VALUE_LEN); + + other.src_block_id = -1; + other.dst_block_id = -1; + return *this; } std::string to_string() const { std::string rt = ", has_key:"; for (int i = 0; i < 16; i++) { - rt += std::to_string(int64_t(*(hash_key + i))) + " "; + rt += std::to_string(int64_t(hash_key[i])) + " "; } return std::to_string(src_block_id) + "->" + std::to_string(dst_block_id) + ", " + std::to_string(uint32_t(transfer_type)) + rt; diff --git a/xllm/core/framework/prefix_cache/prefix_cache.cpp b/xllm/core/framework/prefix_cache/prefix_cache.cpp index f7c251d54..485dcf9b3 100644 --- a/xllm/core/framework/prefix_cache/prefix_cache.cpp +++ b/xllm/core/framework/prefix_cache/prefix_cache.cpp @@ -124,6 +124,16 @@ size_t PrefixCache::insert(const Slice& token_ids, return insert(token_ids, blocks, &insert_keys); } +size_t PrefixCache::insert(const std::vector& blocks) { + Slice slice(blocks); + return insert(slice); +} + +size_t PrefixCache::insert(Slice& blocks) { + std::vector insert_keys; + return insert(blocks, &insert_keys); +} + size_t PrefixCache::evict(size_t n_blocks) { std::vector evict_keys; return evict(n_blocks, &evict_keys); @@ -192,11 +202,13 @@ size_t PrefixCache::insert(const Slice& token_ids, return n_tokens; } -size_t PrefixCache::insert(const std::vector& blocks) { +size_t PrefixCache::insert(Slice& blocks, + std::vector* insert_keys) { const int64_t now = absl::ToUnixMicros(absl::Now()); DNodeList node_list; Murmur3Key token_hash_key; + insert_keys->reserve(blocks.size()); for (size_t i = 0; i < blocks.size(); i++) { if (!blocks[i].is_valid()) { continue; @@ -220,6 +232,8 @@ size_t PrefixCache::insert(const std::vector& blocks) { cached_blocks_.emplace(std::make_pair(token_hash_key, new_node)); num_blocks_++; + + insert_keys->emplace_back(token_hash_key.data); } } @@ -270,7 +284,8 @@ size_t PrefixCache::evict(size_t n_blocks, } uint32_t PrefixCache::compute_hash_keys(const Slice& token_ids, - std::vector& blocks) { + std::vector& blocks, + const size_t cached_blocks) { if (blocks.size() == 0) { return 0; } @@ -280,8 +295,10 @@ uint32_t PrefixCache::compute_hash_keys(const Slice& token_ids, LOG(ERROR) << "token ids do not cover the allocate block."; return 0; } + size_t full_block_size = + std::min(token_ids.size() / block_size, blocks.size()); - for (size_t i = 0; i < token_ids.size() / block_size; i++) { + for (size_t i = cached_blocks; i < full_block_size; i++) { if (i == 0) { murmur_hash3(nullptr, token_ids.slice(i * block_size, (i + 1) * block_size), @@ -293,7 +310,7 @@ uint32_t PrefixCache::compute_hash_keys(const Slice& token_ids, } } - return token_ids.size() / block_size; + return full_block_size; } } // namespace xllm diff --git a/xllm/core/framework/prefix_cache/prefix_cache.h b/xllm/core/framework/prefix_cache/prefix_cache.h index 875f6d65a..3de2fdc6a 100644 --- a/xllm/core/framework/prefix_cache/prefix_cache.h +++ b/xllm/core/framework/prefix_cache/prefix_cache.h @@ -66,8 +66,14 @@ class PrefixCache { const Slice& token_ids, const Slice& existed_shared_blocks = {}); + // insert the token ids and blocks into the prefix tree + // and set hash key to the corresponding block + // return the length of new inserted tokens virtual size_t insert(const Slice& token_ids, std::vector& blocks); + + // insert the blocks with hash key into the prefix tree + virtual size_t insert(Slice& blocks); virtual size_t insert(const std::vector& blocks); // evict blocks hold by the prefix cache @@ -92,12 +98,16 @@ class PrefixCache { virtual KvCacheEvent* get_upload_kvcache_events() { return nullptr; } static uint32_t compute_hash_keys(const Slice& token_ids, - std::vector& blocks); + std::vector& blocks, + const size_t cached_blocks = 0); protected: size_t insert(const Slice& token_ids, std::vector& blocks, std::vector* insert_keys); + + size_t insert(Slice& blocks, std::vector* insert_keys); + size_t evict(size_t n_blocks, std::vector* evict_keys); struct Node { diff --git a/xllm/core/framework/prefix_cache/prefix_cache_with_upload.cpp b/xllm/core/framework/prefix_cache/prefix_cache_with_upload.cpp index 175149655..bcf83409c 100644 --- a/xllm/core/framework/prefix_cache/prefix_cache_with_upload.cpp +++ b/xllm/core/framework/prefix_cache/prefix_cache_with_upload.cpp @@ -35,47 +35,59 @@ size_t PrefixCacheWithUpload::insert(const Slice& token_ids, std::vector& blocks) { std::vector insert_keys; auto n_tokens = PrefixCache::insert(token_ids, blocks, &insert_keys); + save_event_async(true, insert_keys); + return n_tokens; +} - threadpool_.schedule([insert_keys = std::move(insert_keys), this]() { - auto front_ptr = this->db_kvcache_events_.get_front_value(); - if (!front_ptr) { - LOG(INFO) << "Front DoubleBufferKvCacheEvent is nullptr!"; - return; - } - if (!this->exited_.load()) { - for (const auto& hash_id : insert_keys) { - front_ptr->removed_cache.erase(hash_id); - front_ptr->stored_cache.insert(hash_id); - } - } - }); +size_t PrefixCacheWithUpload::insert(const std::vector& blocks) { + Slice slice(blocks); + return insert(slice); +} +size_t PrefixCacheWithUpload::insert(Slice& blocks) { + std::vector insert_keys; + auto n_tokens = PrefixCache::insert(blocks, &insert_keys); + save_event_async(true, insert_keys); return n_tokens; } size_t PrefixCacheWithUpload::evict(size_t n_blocks) { std::vector evict_keys; auto evict_count = PrefixCache::evict(n_blocks, &evict_keys); + save_event_async(false, evict_keys); + return evict_count; +} - threadpool_.schedule([evict_keys = std::move(evict_keys), this]() { +void PrefixCacheWithUpload::save_event_async(const bool is_insert, + std::vector& keys) { + threadpool_.schedule([this, is_insert = is_insert, keys = std::move(keys)]() { + std::lock_guard lock(this->mutex_); auto front_ptr = this->db_kvcache_events_.get_front_value(); if (!front_ptr) { LOG(INFO) << "Front DoubleBufferKvCacheEvent is nullptr!"; return; } if (!this->exited_.load()) { - for (const auto& hash_id : evict_keys) { - front_ptr->removed_cache.insert(hash_id); - front_ptr->stored_cache.erase(hash_id); + if (is_insert) { + for (const auto& key : keys) { + front_ptr->removed_cache.erase(key); + front_ptr->stored_cache.insert(key); + } + } else { + for (const auto& key : keys) { + front_ptr->removed_cache.insert(key); + front_ptr->stored_cache.erase(key); + } } } }); - - return evict_count; } KvCacheEvent* PrefixCacheWithUpload::get_upload_kvcache_events() { - db_kvcache_events_.swap(); + { + std::lock_guard lock(this->mutex_); + db_kvcache_events_.swap(); + } if (!exited_.load()) { return db_kvcache_events_.get_back_value(); } else { diff --git a/xllm/core/framework/prefix_cache/prefix_cache_with_upload.h b/xllm/core/framework/prefix_cache/prefix_cache_with_upload.h index 4610165e0..dfd7d447a 100644 --- a/xllm/core/framework/prefix_cache/prefix_cache_with_upload.h +++ b/xllm/core/framework/prefix_cache/prefix_cache_with_upload.h @@ -18,15 +18,23 @@ class PrefixCacheWithUpload final : public PrefixCache { size_t insert(const Slice& token_ids, std::vector& blocks) override; + // insert the blocks with hash key into the prefix tree + size_t insert(const std::vector& blocks) override; + size_t insert(Slice& blocks) override; + // evict blocks hold by the prefix cache // return the actual number of evicted blocks size_t evict(size_t n_blocks) override; virtual KvCacheEvent* get_upload_kvcache_events() override; + private: + void save_event_async(const bool is_insert, std::vector& keys); + private: ThreadPool threadpool_; + std::mutex mutex_; DoubleBuffer db_kvcache_events_; }; diff --git a/xllm/core/framework/request/sequence.cpp b/xllm/core/framework/request/sequence.cpp index 26578aa4b..346705d5b 100644 --- a/xllm/core/framework/request/sequence.cpp +++ b/xllm/core/framework/request/sequence.cpp @@ -45,7 +45,7 @@ Sequence::Sequence(size_t index, latest_generate_time_(absl::Now()), sequence_params_(seq_params), decoder_(std::move(decoder)), - termination_flag_(std::make_shared>(false)) { + termination_flag_(std::make_shared>(INT32_MAX)) { CHECK(!prompt_token_ids.empty()) << "empty prompt token ids"; auto capacity = sequence_params_.seq_capacity; CHECK_GT(capacity, prompt_token_ids.size()) << "capacity too small"; @@ -95,7 +95,7 @@ Sequence::Sequence(const Sequence& other) cur_generated_token_idx_(other.cur_generated_token_idx_), first_token_(other.first_token_), is_pre_scheduled_step_prefill_(other.is_pre_scheduled_step_prefill_), - termination_flag_(std::make_shared>(false)) { + termination_flag_(std::make_shared>(INT32_MAX)) { logprob_state_ = std::make_unique(*other.logprob_state_); } @@ -459,12 +459,12 @@ Slice Sequence::get_generated_tokens() const { return {tokens_.data(), 0}; } -bool Sequence::update_prefetch_result(uint32_t timeout) { +bool Sequence::update_prefetch_result(uint32_t timeout, uint32_t& success_cnt) { if (prefetch_results_.empty()) { return true; } - if (timeout != 0 && !termination_flag_->load(std::memory_order_acquire)) { + if (timeout != 0 && termination_flag_->load(std::memory_order_acquire) > 0) { if (!is_timeout_set_) { timer_.reset(); is_timeout_set_ = true; @@ -476,14 +476,15 @@ bool Sequence::update_prefetch_result(uint32_t timeout) { } } - termination_flag_->store(true, std::memory_order_release); - uint32_t success_cnt = host_kv_state_.kv_blocks().size(); + termination_flag_->store(0, std::memory_order_release); + success_cnt = host_kv_state_.kv_blocks().size(); for (auto& cnt : prefetch_results_) { success_cnt = std::min(success_cnt, cnt->load()); } if (success_cnt > 0) { host_kv_state_.incr_kv_cache_tokens_num( success_cnt * host_kv_state_.kv_blocks()[0].size()); + host_kv_state_.incr_shared_kv_blocks_num(success_cnt); } prefetch_results_.clear(); return true; diff --git a/xllm/core/framework/request/sequence.h b/xllm/core/framework/request/sequence.h index ee9b3f210..520929d04 100644 --- a/xllm/core/framework/request/sequence.h +++ b/xllm/core/framework/request/sequence.h @@ -245,14 +245,14 @@ class Sequence final { const Tokenizer& tokenizer, std::optional>& out_logprobs); - std::shared_ptr> get_termination_flag() { + std::shared_ptr> get_termination_flag() { return termination_flag_; } std::vector>>* get_prefetch_results() { return &prefetch_results_; } - bool update_prefetch_result(uint32_t timeout); + bool update_prefetch_result(uint32_t timeout, uint32_t& success_cnt); void reset(); @@ -364,7 +364,7 @@ class Sequence final { std::atomic cancelled_{false}; // kvcache store copy async result - std::shared_ptr> termination_flag_; + std::shared_ptr> termination_flag_; std::vector>> prefetch_results_; Timer timer_; diff --git a/xllm/core/framework/request/sequence_kv_state.cpp b/xllm/core/framework/request/sequence_kv_state.cpp index 53b9e6376..75c86a7a0 100644 --- a/xllm/core/framework/request/sequence_kv_state.cpp +++ b/xllm/core/framework/request/sequence_kv_state.cpp @@ -53,6 +53,11 @@ void KVCacheState::add_kv_blocks(const std::vector& new_blocks) { blocks_.insert(blocks_.end(), new_blocks.begin(), new_blocks.end()); } +void KVCacheState::incr_shared_kv_blocks_num(size_t num) { + CHECK(num_owned_shared_blocks_ + num <= num_kv_blocks()); + num_owned_shared_blocks_ += num; +} + void KVCacheState::add_shared_kv_blocks(std::vector&& blocks, size_t current_total_num_tokens) { if (blocks.empty()) { diff --git a/xllm/core/framework/request/sequence_kv_state.h b/xllm/core/framework/request/sequence_kv_state.h index 8b4401595..2a5579038 100644 --- a/xllm/core/framework/request/sequence_kv_state.h +++ b/xllm/core/framework/request/sequence_kv_state.h @@ -36,6 +36,7 @@ class KVCacheState { void add_kv_blocks(const std::vector& new_blocks); void add_shared_kv_blocks(std::vector&& blocks, size_t current_total_num_tokens); + void incr_shared_kv_blocks_num(size_t num); size_t current_max_tokens_capacity() const; diff --git a/xllm/core/runtime/params_utils.cpp b/xllm/core/runtime/params_utils.cpp index 8e0a30b09..2339ebd87 100644 --- a/xllm/core/runtime/params_utils.cpp +++ b/xllm/core/runtime/params_utils.cpp @@ -723,7 +723,6 @@ uint64_t proto_to_block_transfer_info( pb_block_transfer_info.transfer_infos(i).dst_block_id(), reinterpret_cast( pb_block_transfer_info.transfer_infos(i).hash_key().data()), - pb_block_transfer_info.transfer_infos(i).hash_key().size(), TransferType(pb_block_transfer_info.transfer_type())); } @@ -737,11 +736,6 @@ bool block_transfer_info_to_proto( block_transfer_info.size()); auto transfer_type = block_transfer_info[0].transfer_type; for (const BlockTransferInfo info : block_transfer_info) { - if (info.hash_key == nullptr) { - LOG(ERROR) << "Convert to BlockTransferInfos fail, hash key is nullptr!"; - return false; - } - if (transfer_type != info.transfer_type) { LOG(ERROR) << "Convert to BlockTransferInfos fail, TransferType must be " "same, but got " diff --git a/xllm/core/runtime/worker_client.cpp b/xllm/core/runtime/worker_client.cpp index b047ed75b..b68b317e9 100644 --- a/xllm/core/runtime/worker_client.cpp +++ b/xllm/core/runtime/worker_client.cpp @@ -167,7 +167,7 @@ folly::SemiFuture WorkerClient::transfer_kv_blocks( void WorkerClient::prefetch_from_storage( const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::shared_ptr> success_cnt) { LOG(FATAL) << "WorkerClient Method prefetch_from_storage is UnImplemented."; } diff --git a/xllm/core/runtime/worker_client.h b/xllm/core/runtime/worker_client.h index 842bb46ee..27986973d 100644 --- a/xllm/core/runtime/worker_client.h +++ b/xllm/core/runtime/worker_client.h @@ -118,7 +118,7 @@ class WorkerClient { virtual void prefetch_from_storage( const std::vector& block_transfer_info, - std::shared_ptr> flag, + std::shared_ptr> flag, std::shared_ptr> success_cnt); // Run the model on the given input. async call diff --git a/xllm/core/scheduler/continuous_scheduler.cpp b/xllm/core/scheduler/continuous_scheduler.cpp index 8a37df3ac..a7df12678 100644 --- a/xllm/core/scheduler/continuous_scheduler.cpp +++ b/xllm/core/scheduler/continuous_scheduler.cpp @@ -188,8 +188,9 @@ void ContinuousScheduler::handle_prefill_requests( bool blocks_exhausted = false; while (!waiting_priority_queue.empty() && remaining_seq_budget > 0 && remaining_token_budget > 0 && latency_budget > estimate_latency) { - if (kv_cache_manager_->kv_cache_utilization() >= - FLAGS_prefill_scheduling_memory_usage_threshold) { + if (!options_.enable_disagg_pd() && + kv_cache_manager_->kv_cache_utilization() >= + FLAGS_prefill_scheduling_memory_usage_threshold) { blocks_exhausted = true; break; } diff --git a/xllm/core/scheduler/disagg_pd_scheduler.cpp b/xllm/core/scheduler/disagg_pd_scheduler.cpp index 4068e483e..2ce179ac6 100644 --- a/xllm/core/scheduler/disagg_pd_scheduler.cpp +++ b/xllm/core/scheduler/disagg_pd_scheduler.cpp @@ -267,6 +267,8 @@ bool DisaggPDScheduler::add_request(std::shared_ptr& request) { CHECK(request != nullptr); CHECK(!request->sequences().empty()); + kv_cache_manager_->prefetch_from_storage(request); + if (request->offline()) { // offline request, push to offline queue prefill_request_queue_offline_.enqueue(request); diff --git a/xllm/core/util/hash_util.h b/xllm/core/util/hash_util.h index ecfdde5c8..0c3ce6fc0 100644 --- a/xllm/core/util/hash_util.h +++ b/xllm/core/util/hash_util.h @@ -43,7 +43,7 @@ struct Murmur3Key { std::memcpy(data, input_data, MURMUR_HASH3_VALUE_LEN); } - std::string debug_string() { + std::string debug_string() const { std::string rt; for (int i = 0; i < MURMUR_HASH3_VALUE_LEN; i++) { rt += std::to_string(int64_t(data[i])) + " "; diff --git a/xllm/xllm.cpp b/xllm/xllm.cpp index 378cba58d..7fb310e51 100755 --- a/xllm/xllm.cpp +++ b/xllm/xllm.cpp @@ -200,18 +200,19 @@ int run() { .enable_schedule_overlap(FLAGS_enable_schedule_overlap) .kv_cache_transfer_mode(FLAGS_kv_cache_transfer_mode) .etcd_addr(FLAGS_etcd_addr) - .enable_service_routing(FLAGS_enable_service_routing) + .enable_service_routing(FLAGS_enable_service_routing || + FLAGS_enable_disagg_pd) .tool_call_parser(FLAGS_tool_call_parser) .reasoning_parser(FLAGS_reasoning_parser) .priority_strategy(FLAGS_priority_strategy) .enable_online_preempt_offline(FLAGS_enable_online_preempt_offline) - .enable_cache_upload(FLAGS_enable_prefix_cache && - FLAGS_enable_service_routing && - FLAGS_enable_cache_upload) + .enable_cache_upload( + (FLAGS_enable_service_routing || FLAGS_enable_disagg_pd) && + FLAGS_enable_prefix_cache && FLAGS_enable_cache_upload) .host_blocks_factor(FLAGS_host_blocks_factor) .enable_kvcache_store(FLAGS_enable_kvcache_store && FLAGS_enable_prefix_cache && - (FLAGS_host_blocks_factor > 0.0)) + (FLAGS_host_blocks_factor > 1.0)) .prefetch_timeout(FLAGS_prefetch_timeout) .prefetch_bacth_size(FLAGS_prefetch_bacth_size) .layers_wise_copy_batchs(FLAGS_layers_wise_copy_batchs)