Skip to content

Commit 8eeee78

Browse files
committed
trtllm integration connector api
fix fix fix fix interace fix interace fix add logs async leader fix fix fix fix fix scheduled tokens fix fix fix fix add logs add logs fix fix and log fix and log fix fix fix layout fix fix fix fix fmt fix fix comments fmt fix comment
1 parent 199b9a3 commit 8eeee78

File tree

21 files changed

+2038
-154
lines changed

21 files changed

+2038
-154
lines changed

lib/bindings/python/Cargo.lock

Lines changed: 27 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/rust/llm/block_manager.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// limitations under the License.
1515

1616
use super::*;
17+
use anyhow::Result;
1718
use dynamo_llm::block_manager::block::{
1819
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
1920
};
@@ -220,3 +221,113 @@ impl BlockManager {
220221
&self.inner
221222
}
222223
}
224+
225+
#[derive(Default)]
226+
pub struct BlockManagerBuilder {
227+
worker_id: u64,
228+
leader: Option<distributed::KvbmLeader>,
229+
page_size: usize,
230+
disable_device_pool: bool,
231+
}
232+
233+
impl BlockManagerBuilder {
234+
pub fn new() -> Self {
235+
Self {
236+
page_size: 0,
237+
..Default::default()
238+
}
239+
}
240+
241+
pub fn worker_id(mut self, id: u64) -> Self {
242+
self.worker_id = id;
243+
self
244+
}
245+
pub fn page_size(mut self, ps: usize) -> Self {
246+
self.page_size = ps;
247+
self
248+
}
249+
pub fn leader(mut self, l: distributed::KvbmLeader) -> Self {
250+
self.leader = Some(l);
251+
self
252+
}
253+
pub fn disable_device_pool(mut self, yes: bool) -> Self {
254+
self.disable_device_pool = yes;
255+
self
256+
}
257+
258+
/// Async build (call from an async context).
259+
pub async fn build(self) -> Result<BlockManager> {
260+
let worker_id = self.worker_id;
261+
let leader = self.leader.ok_or_else(|| {
262+
anyhow::anyhow!("leader is required (runtime is always taken from leader)")
263+
})?;
264+
265+
// Get (inner leader handle, runtime) from the provided leader.
266+
let (leader_inner, drt) = leader.dissolve();
267+
268+
let cancel_token = CancellationToken::new();
269+
270+
// Runtime & model config
271+
let runtime_config = dynamo_llm::block_manager::KvManagerRuntimeConfig::builder()
272+
.worker_id(worker_id)
273+
.cancellation_token(cancel_token.clone())
274+
.build()?;
275+
276+
let mut config =
277+
dynamo_llm::block_manager::KvBlockManagerConfig::builder().runtime(runtime_config);
278+
279+
let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder()
280+
.num_layers(1)
281+
.outer_dim(1)
282+
.page_size(self.page_size)
283+
.inner_dim(1)
284+
.build()?;
285+
286+
config = config.model(model_config);
287+
288+
// Layouts derived from leader’s counts
289+
if !self.disable_device_pool {
290+
config = config.device_layout(
291+
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
292+
.num_blocks(leader_inner.num_device_blocks())
293+
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
294+
.build()?,
295+
);
296+
}
297+
298+
if leader_inner.num_host_blocks() > 0 {
299+
config = config.host_layout(
300+
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
301+
.num_blocks(leader_inner.num_host_blocks())
302+
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
303+
.build()?,
304+
);
305+
}
306+
307+
if leader_inner.num_disk_blocks() > 0 {
308+
config = config.disk_layout(
309+
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
310+
.num_blocks(leader_inner.num_disk_blocks())
311+
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
312+
.build()?,
313+
);
314+
}
315+
316+
let config = config.build()?;
317+
318+
let resources =
319+
DistributedLeaderWorkerResources::new(Some(leader_inner), cancel_token.child_token())?;
320+
321+
let inner = dynamo_llm::block_manager::KvBlockManager::<
322+
Logical<DistributedLeaderWorkerResources>,
323+
BasicMetadata,
324+
>::new(config, resources)
325+
.await?;
326+
327+
Ok(BlockManager {
328+
inner,
329+
drt,
330+
_controller: None,
331+
})
332+
}
333+
}

lib/bindings/python/rust/llm/block_manager/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ mod utils;
88
mod worker;
99

1010
pub use leader::KvbmLeader;
11-
pub use utils::get_barrier_id;
11+
pub use utils::get_barrier_id_prefix;
1212
pub use worker::{KvbmWorker, VllmTensor};

lib/bindings/python/rust/llm/block_manager/distributed/leader.rs

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::*;
5-
use utils::get_barrier_id;
5+
use utils::get_barrier_id_prefix;
66

77
use derive_getters::Dissolve;
8-
use llm_rs::block_manager::distributed::{KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig};
8+
use llm_rs::block_manager::distributed::{
9+
KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig, KvbmLeaderNumBlocksConfig,
10+
};
911

1012
const CPU_CACHE: &str = "DYN_KVBM_CPU_CACHE_GB";
1113
const CPU_CACHE_OVERRIDE: &str = "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS";
@@ -16,15 +18,32 @@ const DISK_CACHE_OVERRIDE: &str = "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS";
1618
const LEADER_WORKER_INIT_TIMEOUT_SECS: &str = "DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS";
1719
const DEFAULT_INIT_TIMEOUT_SECS: u64 = 120;
1820

19-
fn compute_num_blocks(cache_size_key: &str, override_key: &str, bytes_per_block: usize) -> usize {
20-
if let Ok(override_num_blocks) = std::env::var(override_key) {
21-
override_num_blocks.parse::<usize>().unwrap_or(0)
22-
} else {
23-
let cache_size_gb = std::env::var(cache_size_key)
24-
.unwrap_or_default()
25-
.parse::<f64>()
26-
.unwrap_or(0.0);
27-
((cache_size_gb * 1_000_000_000.0) / bytes_per_block as f64) as usize
21+
fn read_env_usize(key: &str) -> Option<usize> {
22+
std::env::var(key).ok()?.trim().parse::<usize>().ok()
23+
}
24+
25+
fn read_cache_size_float(key: &str) -> f64 {
26+
std::env::var(key)
27+
.unwrap_or_default()
28+
.parse::<f64>()
29+
.unwrap_or(0.0)
30+
}
31+
32+
fn get_blocks_config(cache_size_key: &str, override_key: &str) -> KvbmLeaderNumBlocksConfig {
33+
if let Some(nblocks) = read_env_usize(override_key) {
34+
// Optional: still read cache size for observability, but override takes precedence.
35+
let cache_gb: f64 = read_cache_size_float(cache_size_key);
36+
return KvbmLeaderNumBlocksConfig {
37+
cache_size_in_gb: cache_gb,
38+
num_blocks_overriden: nblocks,
39+
};
40+
}
41+
42+
// No override -> compute from cache size (in GB)
43+
let cache_gb: f64 = read_cache_size_float(cache_size_key);
44+
KvbmLeaderNumBlocksConfig {
45+
cache_size_in_gb: cache_gb,
46+
num_blocks_overriden: 0,
2847
}
2948
}
3049

@@ -51,22 +70,19 @@ impl KvbmLeader {
5170
#[pymethods]
5271
impl KvbmLeader {
5372
#[new]
54-
#[pyo3(signature = (bytes_per_block, world_size, drt))]
55-
fn new(bytes_per_block: usize, world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
56-
let num_host_blocks = compute_num_blocks(CPU_CACHE, CPU_CACHE_OVERRIDE, bytes_per_block);
57-
let num_disk_blocks = compute_num_blocks(DISK_CACHE, DISK_CACHE_OVERRIDE, bytes_per_block);
58-
59-
let barrier_id = get_barrier_id();
73+
#[pyo3(signature = (world_size, drt))]
74+
fn new(world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
75+
let barrier_id_prefix = get_barrier_id_prefix();
6076
let leader_init_timeout_sec: u64 =
6177
get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS);
6278

6379
let config = KvbmLeaderConfig::builder()
64-
.barrier_id(barrier_id)
65-
.num_host_blocks(num_host_blocks)
66-
.num_disk_blocks(num_disk_blocks)
80+
.barrier_id_prefix(barrier_id_prefix)
6781
.world_size(world_size)
6882
.leader_init_timeout_secs(leader_init_timeout_sec)
6983
.drt(drt.inner().clone())
84+
.host_blocks_config(get_blocks_config(CPU_CACHE, CPU_CACHE_OVERRIDE))
85+
.disk_blocks_config(get_blocks_config(DISK_CACHE, DISK_CACHE_OVERRIDE))
7086
.build()
7187
.map_err(to_pyerr)?;
7288

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
pub fn get_barrier_id() -> String {
5-
std::env::var("DYN_KVBM_BARRIER_ID").unwrap_or("kvbm".to_string())
4+
pub fn get_barrier_id_prefix() -> String {
5+
std::env::var("DYN_KVBM_BARRIER_ID_PREFIX").unwrap_or("kvbm".to_string())
66
}

lib/bindings/python/rust/llm/block_manager/distributed/worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use super::*;
55

66
use std::sync::Arc;
7-
use utils::get_barrier_id;
7+
use utils::get_barrier_id_prefix;
88

99
use llm_rs::block_manager::distributed::{
1010
BlockTransferHandler as RustBlockTransferHandler, KvbmWorker as KvbmWorkerImpl,
@@ -131,7 +131,7 @@ impl KvbmWorker {
131131
vllm_tensors.push(Arc::new(vllm_tensor));
132132
}
133133

134-
let barrier_id = get_barrier_id();
134+
let barrier_id_prefix = get_barrier_id_prefix();
135135

136136
let config = KvbmWorkerConfig::builder()
137137
.drt(drt)
@@ -140,7 +140,7 @@ impl KvbmWorker {
140140
.tensors(vllm_tensors)
141141
.device_id(device_id)
142142
.dtype_width_bytes(dtype_width_bytes)
143-
.barrier_id(barrier_id)
143+
.barrier_id_prefix(barrier_id_prefix)
144144
.build()
145145
.map_err(to_pyerr)?;
146146

lib/bindings/python/rust/llm/block_manager/vllm.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ fn _vllm_integration(m: &Bound<'_, PyModule>) -> PyResult<()> {
5050
m.add_class::<connector::worker::PyKvConnectorWorker>()?;
5151
m.add_class::<connector::leader::PyKvConnectorLeader>()?;
5252
m.add_class::<connector::SchedulerOutput>()?;
53+
// TODO: use TRTLLM own integration module
54+
m.add_class::<connector::trtllm_worker::PyTrtllmKvConnectorWorker>()?;
55+
m.add_class::<connector::trtllm_leader::PyTrtllmKvConnectorLeader>()?;
5356
Ok(())
5457
}
5558

lib/bindings/python/rust/llm/block_manager/vllm/connector.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use dynamo_llm::block_manager::{
77
};
88

99
pub mod leader;
10+
pub mod trtllm_leader;
11+
pub mod trtllm_worker;
1012
pub mod worker;
1113

1214
use pyo3::prelude::*;

0 commit comments

Comments
 (0)