-
Notifications
You must be signed in to change notification settings - Fork 602
feat: Create worker selection pipeline #3080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Anna Tchernych <[email protected]>
WalkthroughIntroduces a new worker selection pipeline for LLM requests, exposing it via a new module declaration and implementing pipeline construction, routing integration, and a helper to extract worker selection data from output streams. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Frontend
participant Preprocessor
participant Backend
participant Migration
participant Router
Client->>Frontend: Request
Frontend->>Preprocessor: Forward edge
Preprocessor->>Backend: Prepared request
Backend->>Migration: Routed candidate(s)
Migration->>Router: Query routing decision
Router-->>Migration: Annotations (worker_instance_id, token_data)
Migration-->>Frontend: Streamed LLMEngineOutput (annotated)
Frontend-->>Client: Stream (comments carry selection data)
note over Router,Backend: RouterMode\n- Random/RoundRobin/Direct → ServiceBackend from Router\n- KV → KvPushRouter backend
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
lib/llm/src/entrypoint/input/worker_selection_pipeline.rs (8)
37-37
: Remove unused import.The
serde_json
import is already available in the local scope where it's used (line 178). Consider removing this redundant top-level import.-use serde_json; -
62-79
: Consider simplifying the complex trait bounds.The trait bounds for
OpenAIPreprocessor
as anOperator
are repeated across multiple functions. Consider creating a trait alias or helper type to reduce duplication and improve maintainability.// Add at module level type PreprocessorOperator<Req> = dyn Operator< Context<Req>, Pin<Box<dyn AsyncEngineStream<Annotated<LLMEngineOutput>>>>, Context<PreprocessedRequest>, Pin<Box<dyn AsyncEngineStream<Annotated<LLMEngineOutput>>>>, >;Then simplify the where clause to:
where Req: dynamo_runtime::engine::Data, - OpenAIPreprocessor: Operator< - Context<Req>, - Pin<Box<dyn AsyncEngineStream<Annotated<LLMEngineOutput>>>>, - Context<PreprocessedRequest>, - Pin<Box<dyn AsyncEngineStream<Annotated<LLMEngineOutput>>>>, - >, + OpenAIPreprocessor: PreprocessorOperator<Req>,
159-160
: Consider using more descriptive default values.Using
0
as the default worker_id and an empty vector for tokens might mask extraction failures. Consider usingOption<i64>
andOption<Vec<u32>>
to distinguish between "not found" and actual values, or return an error when expected annotations are missing.- let mut worker_id = 0i64; - let mut tokens = Vec::<u32>::new(); + let mut worker_id = None::<i64>; + let mut tokens = None::<Vec<u32>>; while let Some(response) = stream.next().await { if let Some(event) = &response.event { match event.as_str() { "worker_instance_id" => { - worker_id = response + worker_id = Some(response .comment .as_ref() .and_then(|comments| comments.first()) .and_then(|v| v.parse::<i64>().ok()) - .unwrap_or(0); + .ok_or_else(|| anyhow::anyhow!("Failed to parse worker_instance_id"))?); } "token_data" => { - tokens = response + tokens = Some(response .comment .as_ref() .and_then(|comments| comments.first()) .and_then(|v| serde_json::from_str::<Vec<u32>>(v).ok()) - .unwrap_or_default(); + .ok_or_else(|| anyhow::anyhow!("Failed to parse token_data"))?); } _ => {} } } } - Ok((worker_id, tokens)) + match (worker_id, tokens) { + (Some(id), Some(t)) => Ok((id, t)), + _ => Err(anyhow::anyhow!("Missing required annotations in stream")), + }
166-172
: Improve error handling for parsing failures.The current implementation silently falls back to
0
if parsing fails. Consider logging parsing errors or propagating them to help with debugging."worker_instance_id" => { worker_id = response .comment .as_ref() .and_then(|comments| comments.first()) - .and_then(|v| v.parse::<i64>().ok()) - .unwrap_or(0); + .ok_or_else(|| anyhow::anyhow!("worker_instance_id annotation missing comment"))? + .parse::<i64>() + .map_err(|e| anyhow::anyhow!("Failed to parse worker_instance_id: {}", e))?; }
194-214
: Consider implementing the test or removing it.The test is marked as
#[ignore]
with a comment about requiring a full distributed setup. Consider either:
- Implementing a unit test with mocked components
- Moving this to an integration test suite
- Removing the placeholder if it won't be implemented soon
Would you like me to help implement a unit test with mocked components that doesn't require a full distributed setup?
217-335
: Large commented-out code block should be removed or documented.This 118-line commented block contains a complete implementation of
create_worker_selection_pipeline_from_c_params
. If this functionality is planned for future use, consider:
- Moving it to a separate file with a clear TODO/ticket reference
- Creating a GitHub issue to track its implementation
- Removing it if it's no longer needed
Keeping large blocks of commented code reduces maintainability and can cause confusion.
280-289
: Security concern: Unsafe C string handling.The commented-out code shows unsafe C string conversions without null checks. While this code is not active, if it's uncommented in the future, ensure proper validation:
- Check for null pointers before calling
CStr::from_ptr
- Validate string encoding
- Consider using safer alternatives like
CString
with proper ownership
303-304
: Incomplete code in commented section.Line 303 appears to be an incomplete statement - the
match
expression for loading the ModelDeploymentCard is cut off. This suggests the commented code was not fully implemented or was partially edited.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
lib/llm/src/entrypoint/input.rs
(1 hunks)lib/llm/src/entrypoint/input/worker_selection_pipeline.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
lib/llm/src/entrypoint/input/worker_selection_pipeline.rs (6)
lib/bindings/python/src/dynamo/_core.pyi (4)
KvPushRouter
(1217-1318)ModelDeploymentCard
(458-463)Client
(244-285)RouterMode
(854-856)components/backends/sglang/src/dynamo/sglang/protocol.py (1)
PreprocessedRequest
(36-43)lib/runtime/src/pipeline/network/egress/push_router.rs (1)
from_client_with_threshold
(104-135)lib/runtime/src/pipeline/nodes/sinks/pipeline.rs (1)
from_engine
(8-13)lib/llm/src/preprocessor.rs (1)
new_with_parts
(160-182)lib/bindings/python/rust/lib.rs (2)
event
(1013-1015)comments
(1017-1019)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Build and Test - sglang
- GitHub Check: Build and Test - vllm
- GitHub Check: Build and Test - dynamo
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/bindings/python)
🔇 Additional comments (3)
lib/llm/src/entrypoint/input/worker_selection_pipeline.rs (2)
85-91
: Good use of the threshold-aware router construction.The code correctly uses
from_client_with_threshold
to create a router that can monitor worker busyness when a threshold is provided.
93-104
: Verify KV router initialization requirements.The code correctly validates that a
chooser
is required forRouterMode::KV
. This prevents runtime errors when KV routing is selected without the necessary components.lib/llm/src/entrypoint/input.rs (1)
24-24
: LGTM! Module declaration follows the existing pattern.The new
worker_selection_pipeline
module is properly declared as public and follows the same pattern as other modules in this file.
Signed-off-by: Anna Tchernych <[email protected]>
Signed-off-by: Anna Tchernych <[email protected]>
Signed-off-by: Anna Tchernych <[email protected]>
Overview:
DEP-357
Details:
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit