Skip to content

Commit 3d06cb8

Browse files
committed
resolve comments
Signed-off-by: richardhuo-nv <[email protected]>
1 parent 36e44c1 commit 3d06cb8

File tree

3 files changed

+25
-34
lines changed

3 files changed

+25
-34
lines changed

lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl KvConnectorLeader {
9999

100100
let block_manager = match BlockManagerBuilder::new()
101101
.worker_id(worker_id)
102-
.leader(leader_py) // your distributed::KvbmLeader
102+
.leader(leader_py)
103103
.page_size(page_size)
104104
.disable_device_pool(false)
105105
.build()
@@ -177,11 +177,6 @@ impl Leader for KvConnectorLeader {
177177
.lock()
178178
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
179179

180-
if slot.state() == SlotState::Prefilling {
181-
tracing::warn!("slot is in the Prefilled state; this seems like we need to reset the slot and start over");
182-
slot.reset();
183-
}
184-
185180
// early exit if we cannot match full block
186181
if (slot.sequence().total_tokens() - num_computed_tokens) < self.block_size {
187182
let total_tokens = slot.sequence().total_tokens();
@@ -412,7 +407,7 @@ impl Leader for KvConnectorLeader {
412407
.remove(&request_id);
413408

414409
// if the slot has finished, we can return false to trtllm, indicating all gpu blocks are free to be reused
415-
// otherwise, we return false, which means there are still outstanding operations on gpu blocks which
410+
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which
416411
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
417412
// of the connector api which will be used to inform trtllm that the request is finished.
418413
if let SlotState::Finished = slot.state() {

lib/llm/src/block_manager/distributed/leader.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,17 @@ impl KvbmLeader {
123123
};
124124

125125
let cancel_token = tokio_util::sync::CancellationToken::new();
126-
leader.spawn_barrier_task(
127-
drt,
126+
127+
// The leader_sockets struct cannot be cloned,
128+
// so we use a tuple to "struct" the two urls
129+
let leader_urls = (
128130
leader_sockets.pub_url.clone(),
129131
leader_sockets.ack_url.clone(),
130132
);
133+
leader.spawn_barrier_task(
134+
drt,
135+
leader_urls
136+
);
131137
leader.spawn_zmq_task(leader_sockets, cancel_token);
132138

133139
Ok(leader)
@@ -136,8 +142,7 @@ impl KvbmLeader {
136142
fn spawn_barrier_task(
137143
&self,
138144
drt: DistributedRuntime,
139-
leader_sockets_pub_url: String,
140-
leader_sockets_ack_url: String,
145+
leader_urls: (String, String),
141146
) {
142147
let state = self.state.clone();
143148
let leader_config = self.config.clone();
@@ -148,8 +153,7 @@ impl KvbmLeader {
148153
tokio::spawn(async move {
149154
match KvbmLeader::run_barrier_sync(
150155
drt,
151-
leader_sockets_pub_url,
152-
leader_sockets_ack_url,
156+
leader_urls,
153157
leader_config,
154158
)
155159
.await
@@ -180,8 +184,7 @@ impl KvbmLeader {
180184

181185
async fn run_barrier_sync(
182186
drt: DistributedRuntime,
183-
leader_sockets_pub_url: String,
184-
leader_sockets_ack_url: String,
187+
leader_urls: (String, String),
185188
leader_config: KvbmLeaderConfig,
186189
) -> anyhow::Result<(usize, usize, usize)> {
187190
let barrier_id_worker_to_leader =
@@ -191,24 +194,18 @@ impl KvbmLeader {
191194
leader_config.world_size,
192195
barrier_id_worker_to_leader
193196
);
194-
let zmq_data_worker_to_leader: Arc<KvbmLeaderData> = Arc::new(KvbmLeaderData {
195-
pub_url: leader_sockets_pub_url.clone(),
196-
ack_url: leader_sockets_ack_url.clone(),
197-
num_host_blocks: 0, // doesn't matter for worker to leader sync
198-
num_disk_blocks: 0, // doesn't matter for worker to leader sync
199-
});
200197

201198
// Build our leader barrier and publish the data.
202199
// TODO: Use a separate timeout parameter from the ZMQ connection timeout
203-
let worker_to_leader_barrier: LeaderBarrier<KvbmLeaderData, worker::KvbmWorkerData> =
200+
let worker_to_leader_barrier: LeaderBarrier<(), worker::KvbmWorkerData> =
204201
LeaderBarrier::new(
205202
barrier_id_worker_to_leader.clone(),
206203
leader_config.world_size,
207204
Some(Duration::from_secs(leader_config.leader_init_timeout_secs)),
208205
);
209206

210207
let worker_data = worker_to_leader_barrier
211-
.sync(&drt, zmq_data_worker_to_leader.as_ref())
208+
.sync(&drt, &())
212209
.await
213210
.map_err(|e| anyhow::anyhow!("Failed to sync worker to leader barrier: {:?}", e))?;
214211

@@ -245,14 +242,15 @@ impl KvbmLeader {
245242
barrier_id_leader_to_worker
246243
);
247244

245+
let (leader_pub_url, leader_ack_url) = leader_urls;
248246
let zmq_data_leader_to_worker = Arc::new(KvbmLeaderData {
249-
pub_url: leader_sockets_pub_url.clone(),
250-
ack_url: leader_sockets_ack_url.clone(),
247+
pub_url: leader_pub_url,
248+
ack_url: leader_ack_url,
251249
num_host_blocks,
252250
num_disk_blocks,
253251
});
254252

255-
let leader_to_worker_barrier: LeaderBarrier<KvbmLeaderData, worker::KvbmWorkerData> =
253+
let leader_to_worker_barrier: LeaderBarrier<KvbmLeaderData, ()> =
256254
LeaderBarrier::new(
257255
barrier_id_leader_to_worker.clone(),
258256
leader_config.world_size,

lib/llm/src/block_manager/distributed/worker.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl KvbmWorker {
334334
barrier_id_worker_to_leader
335335
);
336336

337-
let worker_to_leader_barrier = WorkerBarrier::<KvbmLeaderData, KvbmWorkerData>::new(
337+
let worker_to_leader_barrier = WorkerBarrier::<(), KvbmWorkerData>::new(
338338
barrier_id_worker_to_leader,
339339
worker_id.to_string(),
340340
);
@@ -344,8 +344,7 @@ impl KvbmWorker {
344344
bytes_per_block,
345345
};
346346

347-
// leader_data is not important in the worker to leader phase
348-
let _leader_data = tokio::select! {
347+
tokio::select! {
349348
_ = cancel_token.cancelled() => {
350349
return Err(anyhow::anyhow!("Cancelled"))
351350
}
@@ -356,9 +355,8 @@ impl KvbmWorker {
356355
.map_err(|e| anyhow::anyhow!("Failed to sync worker to leader barrier: {:?}", e))?;
357356

358357
tracing::debug!(
359-
"Worker {} received leader data: {:?} in worker to leader phase",
360-
worker_id,
361-
_leader_data
358+
"Worker {} sent the worker data in worker to leader phase",
359+
worker_id
362360
);
363361

364362
let barrier_id_leader_to_worker =
@@ -369,7 +367,7 @@ impl KvbmWorker {
369367
barrier_id_leader_to_worker
370368
);
371369

372-
let leader_to_worker_barrier = WorkerBarrier::<KvbmLeaderData, KvbmWorkerData>::new(
370+
let leader_to_worker_barrier = WorkerBarrier::<KvbmLeaderData, ()>::new(
373371
barrier_id_leader_to_worker,
374372
worker_id.to_string(),
375373
);
@@ -378,7 +376,7 @@ impl KvbmWorker {
378376
_ = cancel_token.cancelled() => {
379377
return Err(anyhow::anyhow!("Cancelled"))
380378
}
381-
leader_data = leader_to_worker_barrier.sync(&drt, &worker_data) => {
379+
leader_data = leader_to_worker_barrier.sync(&drt, &()) => {
382380
leader_data
383381
}
384382
}

0 commit comments

Comments
 (0)