From fbf1199fc1bb7255eee6b05b4776a19d63aa0f53 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Tue, 16 Sep 2025 18:28:00 -0700 Subject: [PATCH 01/18] Create worker selection pipeline Signed-off-by: Anna Tchernych --- lib/llm/src/entrypoint/input.rs | 1 + .../input/worker_selection_pipeline.rs | 335 ++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100644 lib/llm/src/entrypoint/input/worker_selection_pipeline.rs diff --git a/lib/llm/src/entrypoint/input.rs b/lib/llm/src/entrypoint/input.rs index 517c8b46c2..5555b3c7b1 100644 --- a/lib/llm/src/entrypoint/input.rs +++ b/lib/llm/src/entrypoint/input.rs @@ -21,6 +21,7 @@ pub mod endpoint; pub mod grpc; pub mod http; pub mod text; +pub mod worker_selection_pipeline; use dynamo_runtime::protocols::ENDPOINT_SCHEME; use either::Either; diff --git a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs new file mode 100644 index 0000000000..caad9789f6 --- /dev/null +++ b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs @@ -0,0 +1,335 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Worker selection pipeline for querying routing decisions. +//! +//! This module provides a simplified pipeline that gets worker selection information +//! without performing full inference. When a request contains the "query_instance_id" +//! annotation, the router returns: +//! +//! 1. A "worker_instance_id" annotation with the selected worker ID +//! 2. A "token_data" annotation with the tokenized prompt +//! +//! Usage: +//! ```rust,ignore +//! let engine = build_worker_selection_pipeline(...).await?; +//! let request = create_request_with_annotation("query_instance_id"); +//! let response_stream = engine.generate(request).await?; +//! +//! // Option 1: Use the helper function (recommended) +//! let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await?; +//! +//! // Option 2: Manual extraction +//! let mut stream = engine.generate(request).await?; +//! while let Some(response) = stream.next().await { +//! if let Some(event) = &response.event { +//! match event.as_str() { +//! "worker_instance_id" => { /* extract worker ID from comment field */ } +//! "token_data" => { /* extract tokens from comment field */ } +//! _ => {} +//! } +//! } +//! } +//! ``` + +use std::{pin::Pin, sync::Arc}; + +use serde_json; + +use crate::{ + backend::Backend, + kv_router::{KvPushRouter, KvRouter}, + migration::Migration, + model_card::ModelDeploymentCard, + preprocessor::OpenAIPreprocessor, + protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest}, + types::Annotated, +}; + +use dynamo_runtime::{ + component::Client, + engine::AsyncEngineStream, + pipeline::{ + Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend, + ServiceEngine, SingleIn, Source, + }, +}; + +/// Build a worker selection pipeline that gets routing decisions from the router +/// +/// This pipeline: frontend -> preprocessor -> backend -> migration -> router +/// The router handles query_instance_id annotations and returns worker_instance_id and token_data annotations. +pub async fn build_worker_selection_pipeline_with_preprocessor( + card: &ModelDeploymentCard, + client: &Client, + router_mode: RouterMode, + busy_threshold: Option, + chooser: Option>, + preprocessor: Arc, + hf_tokenizer: tokenizers::Tokenizer, +) -> anyhow::Result, ManyOut>>> +where + Req: dynamo_runtime::engine::Data, + OpenAIPreprocessor: Operator< + Context, + Pin>>>, + Context, + Pin>>>, + >, +{ + let frontend = SegmentSource::, ManyOut>>::new(); + let preprocessor_op = preprocessor.into_operator(); + let backend = Backend::from_tokenizer(hf_tokenizer).into_operator(); + let migration = Migration::from_mdc(card).into_operator(); + + let router = + PushRouter::>::from_client_with_threshold( + client.clone(), + router_mode, + busy_threshold, + ) + .await?; + + let service_backend = match router_mode { + RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => { + ServiceBackend::from_engine(Arc::new(router)) + } + RouterMode::KV => { + let Some(chooser) = chooser else { + anyhow::bail!("RouterMode::KV requires KvRouter to not be null"); + }; + let kv_push_router = KvPushRouter::new(router, chooser); + ServiceBackend::from_engine(Arc::new(kv_push_router)) + } + }; + + // Build pipeline - forward path only (router handles query_instance_id and returns annotations) + frontend + .link(preprocessor_op.forward_edge())? + .link(backend.forward_edge())? + .link(migration.forward_edge())? + .link(service_backend)?; + + Ok(frontend) +} + +/// Convenience function that creates a preprocessor and calls build_worker_selection_pipeline_with_preprocessor +pub async fn build_worker_selection_pipeline( + card: &ModelDeploymentCard, + client: &Client, + router_mode: RouterMode, + busy_threshold: Option, + chooser: Option>, + hf_tokenizer: tokenizers::Tokenizer, +) -> anyhow::Result, ManyOut>>> +where + Req: dynamo_runtime::engine::Data, + OpenAIPreprocessor: Operator< + Context, + Pin>>>, + Context, + Pin>>>, + >, +{ + use crate::preprocessor::prompt::PromptFormatter; + + let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?; + let preprocessor = + OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?; + + build_worker_selection_pipeline_with_preprocessor( + card, + client, + router_mode, + busy_threshold, + chooser, + preprocessor, + hf_tokenizer, + ) + .await +} + +/// Helper function to extract worker selection information from the annotation stream +/// This demonstrates how to process the annotations returned by the router +pub async fn extract_worker_selection_from_stream( + mut stream: Pin>>>, +) -> anyhow::Result<(i64, Vec)> { + use futures::StreamExt; + + let mut worker_id = 0i64; + let mut tokens = Vec::::new(); + + while let Some(response) = stream.next().await { + if let Some(event) = &response.event { + match event.as_str() { + "worker_instance_id" => { + worker_id = response + .comment + .as_ref() + .and_then(|comments| comments.first()) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + } + "token_data" => { + tokens = response + .comment + .as_ref() + .and_then(|comments| comments.first()) + .and_then(|v| serde_json::from_str::>(v).ok()) + .unwrap_or_default(); + } + _ => {} + } + } + } + + Ok((worker_id, tokens)) +} + +#[cfg(test)] +mod tests { + #[allow(unused_imports)] + use super::*; + + #[tokio::test] + #[ignore] // Requires full distributed setup + async fn test_worker_selection_pipeline() { + // This test would require: + // - A real ModelDeploymentCard + // - A Component client connected to workers + // - A KvRouter with actual worker state + + // Example test structure: + // let engine = build_worker_selection_pipeline(...).await.unwrap(); + // + // // Create a request with query_instance_id annotation + // let request = create_test_request_with_annotation("query_instance_id"); + // let response_stream = engine.generate(request).await.unwrap(); + // + // // Use the helper function to extract worker selection information + // let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await.unwrap(); + // + // assert!(worker_id > 0); + // assert!(!tokens.is_empty()); + } +} + +/* +/// Helper function to create worker selection pipeline from C string parameters +/// +/// This function demonstrates how to create all the necessary parameters for +/// `build_worker_selection_pipeline` when you only have namespace, component, +/// and model information from C FFI. +/// +/// # Parameters +/// - `namespace_c_str`: C string pointer to namespace name +/// - `component_c_str`: C string pointer to component name +/// - `endpoint_name`: Name of the endpoint to connect to (e.g., "inference") +/// - `model_name`: Name/slug of the model to load +/// - `router_mode`: How to route requests (KV, RoundRobin, etc.) +/// - `busy_threshold`: Optional threshold for busy worker detection +/// +/// # Returns +/// A configured worker selection pipeline ready to use +/// +/// # Example Usage in C FFI context: +/// ```rust,ignore +/// let pipeline = create_worker_selection_pipeline_from_c_params( +/// namespace_c_str, +/// component_c_str, +/// "inference", +/// "llama3-8b-instruct", +/// RouterMode::KV, +/// Some(0.8) +/// ).await?; +/// +/// // Use pipeline to get worker selection +/// let request = create_request_with_annotation("query_instance_id"); +/// let response_stream = pipeline.generate(request).await?; +/// let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await?; +/// ``` +pub async fn create_worker_selection_pipeline_from_c_params( + namespace_c_str: *const std::os::raw::c_char, + component_c_str: *const std::os::raw::c_char, + endpoint_name: &str, + model_name: &str, + router_mode: RouterMode, + busy_threshold: Option, +) -> anyhow::Result, ManyOut>>> +where + Req: dynamo_runtime::engine::Data, + OpenAIPreprocessor: Operator< + Context, + Pin>>>, + Context, + Pin>>>, + >, +{ + use std::ffi::CStr; + use dynamo_runtime::{ + Runtime, DistributedRuntime, + storage::{EtcdStorage, KeyValueStoreManager}, + slug::Slug, + }; + use crate::{ + model_card::{ModelDeploymentCard, ROOT_PATH as MODEL_ROOT_PATH}, + discovery::ModelManager, + }; + + // 1. Convert C strings to Rust strings + let namespace_str = unsafe { + CStr::from_ptr(namespace_c_str) + .to_str() + .map_err(|e| anyhow::anyhow!("Invalid namespace string: {}", e))? + }; + let component_str = unsafe { + CStr::from_ptr(component_c_str) + .to_str() + .map_err(|e| anyhow::anyhow!("Invalid component string: {}", e))? + }; + + // 2. Create Runtime and DistributedRuntime + let runtime = Runtime::new().await?; + let distributed_runtime = DistributedRuntime::from_settings(runtime).await?; + + // 3. Create Component and Client + let namespace = distributed_runtime.namespace(namespace_str)?; + let component = namespace.component(component_str)?; + let endpoint = component.endpoint(endpoint_name); + let client = endpoint.client().await?; + + // 4. Load ModelDeploymentCard + let model_slug = Slug::from_string(model_name); + let card = match ModelDeploymentCard::load_from_store(&model_slug, &runtime) + + // 5. Get HuggingFace tokenizer from the model card + let hf_tokenizer = card.tokenizer_hf() + .with_context(|| format!("Failed to load tokenizer for model: {}", model_name))?; + + // 6. Create KV chooser if using KV routing mode + let chooser = if router_mode == RouterMode::KV { + let model_manager = std::sync::Arc::new(ModelManager::new()); + Some( + model_manager.kv_chooser_for( + &card.display_name, + &component, + card.kv_cache_block_size, + None, // Use default KV router config + ).await? + ) + } else { + None + }; + + // 7. Build and return the worker selection pipeline + build_worker_selection_pipeline( + &card, + &client, + router_mode, + busy_threshold, + chooser, + hf_tokenizer, + ).await +} + +*/ From 558da48c0d912cc7fa525115ee11ff0c1fe909eb Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 17 Sep 2025 13:03:55 -0700 Subject: [PATCH 02/18] refactor Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 358 +++++++++++++++++- .../input/worker_selection_pipeline.rs | 331 ++++++++++------ .../tests/test_worker_selection_pipeline.rs | 133 +++++++ 3 files changed, 701 insertions(+), 121 deletions(-) create mode 100644 lib/llm/tests/test_worker_selection_pipeline.rs diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 68007fb1dc..729eee390a 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -7,10 +7,18 @@ use once_cell::sync::OnceCell; use std::ffi::CStr; use std::sync::atomic::{AtomicU32, Ordering}; +use dynamo_llm::entrypoint::input::worker_selection_pipeline::{ + create_worker_selection_pipeline, query_worker_selection_and_annotate, +}; use dynamo_llm::kv_router::{ indexer::compute_block_hash_for_seq, protocols::*, publisher::KvEventPublisher, }; -use dynamo_runtime::{DistributedRuntime, Worker}; +use dynamo_llm::protocols::common::llm_backend::LLMEngineOutput; +use dynamo_llm::types::{Annotated, openai::chat_completions::NvCreateChatCompletionRequest}; +use dynamo_runtime::{ + DistributedRuntime, Worker, + pipeline::{ManyOut, RouterMode, ServiceEngine, SingleIn}, +}; static WK: OnceCell = OnceCell::new(); static DRT: AsyncOnceCell = AsyncOnceCell::new(); // [FIXME] shouldn't the publisher be instance passing between API calls? @@ -293,7 +301,6 @@ pub extern "C" fn dynamo_kv_event_publish_removed( } } } - // Need to setup etcd and nats to run these tests // #[cfg(test)] // mod tests { @@ -324,3 +331,350 @@ pub extern "C" fn dynamo_kv_event_publish_removed( // assert_eq!(shutdown_result as u32, DynamoLlmResult::OK as u32); // } // } + +// +// Apis related to the best worker selection. +// Worker selection pipeline handle containing the actual pipeline +pub struct WorkerSelectionPipeline { + pipeline: + ServiceEngine, ManyOut>>, +} + +/// C FFI wrapper for creating a worker selection pipeline +/// +/// Returns a pipeline handle that can be used repeatedly for queries. +/// Call dynamo_destroy_worker_selection_pipeline when done. +/// +/// # Safety +/// The namespace_c_str, component_c_str, endpoint_name_c_str, and model_name_c_str +/// are passed as pointers to C strings +#[unsafe(no_mangle)] +pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( + namespace_c_str: *const c_char, + component_c_str: *const c_char, + endpoint_name_c_str: *const c_char, + model_name_c_str: *const c_char, + use_kv_routing: bool, + busy_threshold: f64, // Use negative value to indicate None + pipeline_out: *mut *mut WorkerSelectionPipeline, +) -> DynamoLlmResult { + let wk = match WK.get() { + Some(wk) => wk, + None => { + eprintln!("Runtime not initialized - call dynamo_llm_init first"); + return DynamoLlmResult::ERR; + } + }; + + let rt = wk.runtime(); + let secondary = rt.secondary().clone(); + + let result = secondary.block_on(async { + // Convert C strings to Rust strings + let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to convert namespace C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to convert component C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + let endpoint_name = match unsafe { CStr::from_ptr(endpoint_name_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to convert endpoint_name C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to convert model_name C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Determine router mode and busy threshold + let router_mode = if use_kv_routing { + RouterMode::KV + } else { + RouterMode::RoundRobin + }; + + let busy_threshold_opt = if busy_threshold < 0.0 { + None + } else { + Some(busy_threshold) + }; + + // Create the worker selection pipeline + let pipeline = match create_worker_selection_pipeline::( + namespace, + component_name, + endpoint_name, + model_name, + router_mode, + busy_threshold_opt, + ) + .await + { + Ok(pipeline) => pipeline, + Err(e) => { + eprintln!("Failed to create worker selection pipeline: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Wrap pipeline in handle struct and box it + let pipeline_handle = WorkerSelectionPipeline { pipeline }; + let boxed_pipeline = Box::new(pipeline_handle); + let pipeline_ptr = Box::into_raw(boxed_pipeline); + + unsafe { + *pipeline_out = pipeline_ptr; + } + + DynamoLlmResult::OK + }); + + result +} + +/// Run a query on an existing worker selection pipeline +/// +/// # Safety +/// The pipeline pointer must be valid and prompt_c_str must be a valid C string +#[unsafe(no_mangle)] +pub unsafe extern "C" fn dynamo_worker_selection_pipeline_query( + pipeline: *mut WorkerSelectionPipeline, + prompt_c_str: *const c_char, + worker_instance_id_out: *mut i64, + token_ids_out: *mut *mut u32, + token_count_out: *mut usize, +) -> DynamoLlmResult { + if pipeline.is_null() { + eprintln!("Pipeline pointer is null"); + return DynamoLlmResult::ERR; + } + + let wk = match WK.get() { + Some(wk) => wk, + None => { + eprintln!("Runtime not initialized - call dynamo_llm_init first"); + return DynamoLlmResult::ERR; + } + }; + + let rt = wk.runtime(); + let secondary = rt.secondary().clone(); + + let result = secondary.block_on(async { + // Convert C string to Rust string + let prompt = match unsafe { CStr::from_ptr(prompt_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to convert prompt C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // TODO: Actually use the pipeline to run the query + // For now, return placeholder values + eprintln!( + "Query '{}' on pipeline - TODO: implement actual query", + prompt + ); + + unsafe { + *worker_instance_id_out = 1; + *token_ids_out = std::ptr::null_mut(); + *token_count_out = 0; + } + + DynamoLlmResult::OK + }); + + result +} + +/// Destroy a worker selection pipeline and free its memory +/// +/// # Safety +/// The pipeline pointer must be valid and should not be used after this call +#[unsafe(no_mangle)] +pub unsafe extern "C" fn dynamo_destroy_worker_selection_pipeline( + pipeline: *mut WorkerSelectionPipeline, +) -> DynamoLlmResult { + if pipeline.is_null() { + eprintln!("Pipeline pointer is null"); + return DynamoLlmResult::ERR; + } + + // Convert back to Box with correct type and let it drop to free memory + let _boxed_pipeline: Box = unsafe { Box::from_raw(pipeline) }; + + DynamoLlmResult::OK +} + +/// Query worker selection and return annotated request +/// +/// This function takes an original OpenAI request (as JSON), adds query_instance_id annotation, +/// runs it through the pipeline to get worker selection, then returns the worker_id, tokens, +/// and the original request annotated with worker_instance_id and token_data. +/// +/// # Safety +/// All pointer parameters must be valid. The request_json_c_str must be a valid C string +/// containing valid JSON. The caller is responsible for freeing the allocated memory +/// for token_ids_out and annotated_request_json_out. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn dynamo_query_worker_selection_and_annotate( + pipeline: *mut WorkerSelectionPipeline, + request_json_c_str: *const c_char, + worker_instance_id_out: *mut i64, + token_ids_out: *mut *mut u32, + token_count_out: *mut usize, + annotated_request_json_out: *mut *mut c_char, +) -> DynamoLlmResult { + if pipeline.is_null() { + eprintln!("Pipeline pointer is null"); + return DynamoLlmResult::ERR; + } + + let wk = match WK.get() { + Some(wk) => wk, + None => { + eprintln!("Runtime not initialized - call dynamo_llm_init first"); + return DynamoLlmResult::ERR; + } + }; + + let rt = wk.runtime(); + let secondary = rt.secondary().clone(); + + let result = secondary.block_on(async { + // Convert C string to Rust string + let request_json = match unsafe { CStr::from_ptr(request_json_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to convert request JSON C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Parse JSON into NvCreateChatCompletionRequest + let original_request: NvCreateChatCompletionRequest = + match serde_json::from_str(request_json) { + Ok(req) => req, + Err(e) => { + eprintln!("Failed to parse request JSON: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Get pipeline reference + let pipeline_ref = unsafe { &*pipeline }; + + // Call the wrapper function + let (worker_id, tokens, annotated_request) = + match query_worker_selection_and_annotate(&pipeline_ref.pipeline, original_request) + .await + { + Ok(result) => result, + Err(e) => { + eprintln!("Failed to query worker selection: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Convert annotated request back to JSON + let annotated_json = match serde_json::to_string(&annotated_request) { + Ok(json) => json, + Err(e) => { + eprintln!("Failed to serialize annotated request: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Allocate memory for tokens array + let tokens_ptr = if tokens.is_empty() { + std::ptr::null_mut() + } else { + let tokens_len = tokens.len(); + let layout = std::alloc::Layout::array::(tokens_len).unwrap(); + let ptr = std::alloc::alloc(layout) as *mut u32; + if ptr.is_null() { + eprintln!("Failed to allocate memory for tokens"); + return DynamoLlmResult::ERR; + } + // Copy tokens to allocated memory + std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len); + ptr + }; + + // Allocate memory for annotated request JSON string + let json_cstring = match std::ffi::CString::new(annotated_json) { + Ok(cstr) => cstr, + Err(e) => { + eprintln!("Failed to create C string for annotated JSON: {:?}", e); + if !tokens_ptr.is_null() { + let layout = std::alloc::Layout::array::(tokens.len()).unwrap(); + unsafe { std::alloc::dealloc(tokens_ptr as *mut u8, layout) }; + } + return DynamoLlmResult::ERR; + } + }; + + // Set output parameters + unsafe { + *worker_instance_id_out = worker_id; + *token_ids_out = tokens_ptr; + *token_count_out = tokens.len(); + *annotated_request_json_out = json_cstring.into_raw(); + } + + DynamoLlmResult::OK + }); + + result +} + +/// Free memory allocated by dynamo_query_worker_selection_and_annotate +/// +/// # Safety +/// The pointers must have been allocated by dynamo_query_worker_selection_and_annotate +/// and should not be used after this call. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn dynamo_free_worker_selection_result( + token_ids: *mut u32, + token_count: usize, + annotated_request_json: *mut c_char, +) -> DynamoLlmResult { + // Free tokens array if not null + if !token_ids.is_null() && token_count > 0 { + let layout = match std::alloc::Layout::array::(token_count) { + Ok(layout) => layout, + Err(_) => { + eprintln!("Invalid layout for tokens array"); + return DynamoLlmResult::ERR; + } + }; + unsafe { std::alloc::dealloc(token_ids as *mut u8, layout) }; + } + + // Free JSON string if not null + if !annotated_request_json.is_null() { + let _cstring = unsafe { std::ffi::CString::from_raw(annotated_request_json) }; + // CString will be automatically freed when it goes out of scope + } + + DynamoLlmResult::OK +} diff --git a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs index caad9789f6..463484a388 100644 --- a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs +++ b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs @@ -42,7 +42,10 @@ use crate::{ migration::Migration, model_card::ModelDeploymentCard, preprocessor::OpenAIPreprocessor, - protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest}, + protocols::{ + common::llm_backend::{LLMEngineOutput, PreprocessedRequest}, + openai::{chat_completions::NvCreateChatCompletionRequest, nvext::NvExt}, + }, types::Annotated, }; @@ -59,13 +62,12 @@ use dynamo_runtime::{ /// /// This pipeline: frontend -> preprocessor -> backend -> migration -> router /// The router handles query_instance_id annotations and returns worker_instance_id and token_data annotations. -pub async fn build_worker_selection_pipeline_with_preprocessor( +pub async fn build_worker_selection_pipeline( card: &ModelDeploymentCard, client: &Client, router_mode: RouterMode, busy_threshold: Option, chooser: Option>, - preprocessor: Arc, hf_tokenizer: tokenizers::Tokenizer, ) -> anyhow::Result, ManyOut>>> where @@ -77,6 +79,12 @@ where Pin>>>, >, { + use crate::preprocessor::prompt::PromptFormatter; + + let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?; + let preprocessor = + OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?; + let frontend = SegmentSource::, ManyOut>>::new(); let preprocessor_op = preprocessor.into_operator(); let backend = Backend::from_tokenizer(hf_tokenizer).into_operator(); @@ -113,42 +121,6 @@ where Ok(frontend) } -/// Convenience function that creates a preprocessor and calls build_worker_selection_pipeline_with_preprocessor -pub async fn build_worker_selection_pipeline( - card: &ModelDeploymentCard, - client: &Client, - router_mode: RouterMode, - busy_threshold: Option, - chooser: Option>, - hf_tokenizer: tokenizers::Tokenizer, -) -> anyhow::Result, ManyOut>>> -where - Req: dynamo_runtime::engine::Data, - OpenAIPreprocessor: Operator< - Context, - Pin>>>, - Context, - Pin>>>, - >, -{ - use crate::preprocessor::prompt::PromptFormatter; - - let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?; - let preprocessor = - OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?; - - build_worker_selection_pipeline_with_preprocessor( - card, - client, - router_mode, - busy_threshold, - chooser, - preprocessor, - hf_tokenizer, - ) - .await -} - /// Helper function to extract worker selection information from the annotation stream /// This demonstrates how to process the annotations returned by the router pub async fn extract_worker_selection_from_stream( @@ -186,44 +158,172 @@ pub async fn extract_worker_selection_from_stream( Ok((worker_id, tokens)) } -#[cfg(test)] -mod tests { - #[allow(unused_imports)] - use super::*; - - #[tokio::test] - #[ignore] // Requires full distributed setup - async fn test_worker_selection_pipeline() { - // This test would require: - // - A real ModelDeploymentCard - // - A Component client connected to workers - // - A KvRouter with actual worker state - - // Example test structure: - // let engine = build_worker_selection_pipeline(...).await.unwrap(); - // - // // Create a request with query_instance_id annotation - // let request = create_test_request_with_annotation("query_instance_id"); - // let response_stream = engine.generate(request).await.unwrap(); - // - // // Use the helper function to extract worker selection information - // let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await.unwrap(); - // - // assert!(worker_id > 0); - // assert!(!tokens.is_empty()); +/// Utility function to add the "query_instance_id" annotation to an OpenAI request +/// +/// This function modifies the request to include the annotation that signals the KV router +/// to return worker selection information (worker_instance_id and token_data) instead of +/// performing actual inference. +/// +/// # Parameters +/// - `request`: Mutable reference to the OpenAI chat completion request +/// +/// # Returns +/// The same request with the "query_instance_id" annotation added +pub fn add_query_instance_id( + request: &mut NvCreateChatCompletionRequest, +) -> &mut NvCreateChatCompletionRequest { + // Create or modify the nvext field to include the query_instance_id annotation + match request.nvext.as_mut() { + Some(nvext) => { + // NvExt already exists, add annotation to it + match nvext.annotations.as_mut() { + Some(annotations) => { + // Annotations vector exists, add if not already present + if !annotations.contains(&"query_instance_id".to_string()) { + annotations.push("query_instance_id".to_string()); + } + } + None => { + // No annotations vector, create one with our annotation + nvext.annotations = Some(vec!["query_instance_id".to_string()]); + } + } + } + None => { + // No nvext field, create one with our annotation + request.nvext = Some( + NvExt::builder() + .add_annotation("query_instance_id") + .build() + .expect("NvExt builder should not fail"), + ); + } + } + + request +} + +/// Utility function to add worker_instance_id annotation to an OpenAI request +pub fn add_worker_instance_id_annotation( + request: &mut NvCreateChatCompletionRequest, + worker_id: i64, +) -> &mut NvCreateChatCompletionRequest { + let worker_id_str = worker_id.to_string(); + + match request.nvext.as_mut() { + Some(nvext) => { + match nvext.annotations.as_mut() { + Some(annotations) => { + // Remove existing worker_instance_id if present + annotations.retain(|ann| !ann.starts_with("worker_instance_id:")); + annotations.push(format!("worker_instance_id:{}", worker_id_str)); + } + None => { + nvext.annotations = Some(vec![format!("worker_instance_id:{}", worker_id_str)]); + } + } + } + None => { + request.nvext = Some( + NvExt::builder() + .add_annotation(format!("worker_instance_id:{}", worker_id_str)) + .build() + .expect("NvExt builder should not fail"), + ); + } + } + + request +} + +/// Utility function to add token_data annotation to an OpenAI request +pub fn add_token_data_annotation<'a>( + request: &'a mut NvCreateChatCompletionRequest, + tokens: &[u32], +) -> &'a mut NvCreateChatCompletionRequest { + let tokens_json = serde_json::to_string(tokens).unwrap_or_default(); + + match request.nvext.as_mut() { + Some(nvext) => { + match nvext.annotations.as_mut() { + Some(annotations) => { + // Remove existing token_data if present + annotations.retain(|ann| !ann.starts_with("token_data:")); + annotations.push(format!("token_data:{}", tokens_json)); + } + None => { + nvext.annotations = Some(vec![format!("token_data:{}", tokens_json)]); + } + } + } + None => { + request.nvext = Some( + NvExt::builder() + .add_annotation(format!("token_data:{}", tokens_json)) + .build() + .expect("NvExt builder should not fail"), + ); + } } + + request } -/* -/// Helper function to create worker selection pipeline from C string parameters +/// Wrapper function that queries worker selection and annotates the original request /// -/// This function demonstrates how to create all the necessary parameters for -/// `build_worker_selection_pipeline` when you only have namespace, component, -/// and model information from C FFI. +/// This function performs the complete flow: +/// 1. Clones the original request and adds "query_instance_id" annotation +/// 2. Calls engine.generate() with the modified request +/// 3. Extracts worker_instance_id and tokens from the response stream +/// 4. Adds worker_instance_id and token_data annotations to the original request +/// 5. Returns (worker_id, tokens, annotated_original_request) /// /// # Parameters -/// - `namespace_c_str`: C string pointer to namespace name -/// - `component_c_str`: C string pointer to component name +/// - `engine`: The worker selection pipeline engine +/// - `original_request`: The original OpenAI request to process +/// +/// # Returns +/// A tuple containing (worker_instance_id, tokens, modified_original_request) +/// where the modified_original_request has worker_instance_id and token_data annotations added +/// +/// # Example +/// ```rust,ignore +/// let (worker_id, tokens, annotated_request) = +/// query_worker_selection_and_annotate(&engine, original_request).await?; +/// ``` +pub async fn query_worker_selection_and_annotate( + engine: &ServiceEngine< + SingleIn, + ManyOut>, + >, + mut original_request: NvCreateChatCompletionRequest, +) -> anyhow::Result<(i64, Vec, NvCreateChatCompletionRequest)> { + // Clone the request and add query_instance_id annotation + let mut query_request = original_request.clone(); + add_query_instance_id(&mut query_request); + + // Create SingleIn and generate + let single_in = SingleIn::new(query_request); + let response_stream = engine.generate(single_in).await?; + + // Extract worker selection from stream + let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await?; + + // Add worker_instance_id and tokens to original request's nvext + add_worker_instance_id_annotation(&mut original_request, worker_id); + add_token_data_annotation(&mut original_request, &tokens); + + Ok((worker_id, tokens, original_request)) +} + +/// Helper function to create worker selection pipeline from string parameters +/// +/// This function creates all the necessary parameters for `build_worker_selection_pipeline` +/// when you have namespace, component, and model information as strings. +/// +/// # Parameters +/// - `namespace`: namespace name +/// - `component_name`: component name /// - `endpoint_name`: Name of the endpoint to connect to (e.g., "inference") /// - `model_name`: Name/slug of the model to load /// - `router_mode`: How to route requests (KV, RoundRobin, etc.) @@ -232,11 +332,11 @@ mod tests { /// # Returns /// A configured worker selection pipeline ready to use /// -/// # Example Usage in C FFI context: +/// # Example Usage: /// ```rust,ignore -/// let pipeline = create_worker_selection_pipeline_from_c_params( -/// namespace_c_str, -/// component_c_str, +/// let pipeline = create_worker_selection_pipeline( +/// "my-namespace", +/// "backend", /// "inference", /// "llama3-8b-instruct", /// RouterMode::KV, @@ -248,9 +348,9 @@ mod tests { /// let response_stream = pipeline.generate(request).await?; /// let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await?; /// ``` -pub async fn create_worker_selection_pipeline_from_c_params( - namespace_c_str: *const std::os::raw::c_char, - component_c_str: *const std::os::raw::c_char, +pub async fn create_worker_selection_pipeline( + namespace: &str, + component_name: &str, endpoint_name: &str, model_name: &str, router_mode: RouterMode, @@ -265,63 +365,57 @@ where Pin>>>, >, { - use std::ffi::CStr; + use crate::{discovery::ModelManager, model_card::ModelDeploymentCard}; + use anyhow::Context; use dynamo_runtime::{ - Runtime, DistributedRuntime, - storage::{EtcdStorage, KeyValueStoreManager}, - slug::Slug, - }; - use crate::{ - model_card::{ModelDeploymentCard, ROOT_PATH as MODEL_ROOT_PATH}, - discovery::ModelManager, + DistributedRuntime, Runtime, distributed::DistributedConfig, slug::Slug, + traits::DistributedRuntimeProvider, }; - // 1. Convert C strings to Rust strings - let namespace_str = unsafe { - CStr::from_ptr(namespace_c_str) - .to_str() - .map_err(|e| anyhow::anyhow!("Invalid namespace string: {}", e))? - }; - let component_str = unsafe { - CStr::from_ptr(component_c_str) - .to_str() - .map_err(|e| anyhow::anyhow!("Invalid component string: {}", e))? - }; + // Create DistributedRuntime + let runtime = Runtime::from_settings()?; + let dst_config = DistributedConfig::from_settings(true); + let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?; - // 2. Create Runtime and DistributedRuntime - let runtime = Runtime::new().await?; - let distributed_runtime = DistributedRuntime::from_settings(runtime).await?; - - // 3. Create Component and Client - let namespace = distributed_runtime.namespace(namespace_str)?; - let component = namespace.component(component_str)?; + // Create Component and Client + let ns = distributed_runtime.namespace(namespace)?; + let component = ns.component(component_name)?; let endpoint = component.endpoint(endpoint_name); let client = endpoint.client().await?; - // 4. Load ModelDeploymentCard + // Load ModelDeploymentCard let model_slug = Slug::from_string(model_name); - let card = match ModelDeploymentCard::load_from_store(&model_slug, &runtime) + let card = match ModelDeploymentCard::load_from_store(&model_slug, component.drt()).await { + Ok(Some(card)) => card, + Ok(None) => anyhow::bail!("ModelDeploymentCard not found for model: {}", model_name), + Err(err) => anyhow::bail!( + "Error fetching ModelDeploymentCard from storage under key {model_slug}. {err}" + ), + }; - // 5. Get HuggingFace tokenizer from the model card - let hf_tokenizer = card.tokenizer_hf() + // Get tokenizer from the model card + let hf_tokenizer = card + .tokenizer_hf() .with_context(|| format!("Failed to load tokenizer for model: {}", model_name))?; - // 6. Create KV chooser if using KV routing mode + // Create KV chooser if using KV routing mode let chooser = if router_mode == RouterMode::KV { let model_manager = std::sync::Arc::new(ModelManager::new()); Some( - model_manager.kv_chooser_for( - &card.display_name, - &component, - card.kv_cache_block_size, - None, // Use default KV router config - ).await? + model_manager + .kv_chooser_for( + &card.display_name, + &component, + card.kv_cache_block_size, + None, // Use default KV router config + ) + .await?, ) } else { None }; - // 7. Build and return the worker selection pipeline + // Build and return the worker selection pipeline build_worker_selection_pipeline( &card, &client, @@ -329,7 +423,6 @@ where busy_threshold, chooser, hf_tokenizer, - ).await + ) + .await } - -*/ diff --git a/lib/llm/tests/test_worker_selection_pipeline.rs b/lib/llm/tests/test_worker_selection_pipeline.rs new file mode 100644 index 0000000000..d3ba7e835d --- /dev/null +++ b/lib/llm/tests/test_worker_selection_pipeline.rs @@ -0,0 +1,133 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Tests for worker selection pipeline functionality + +use dynamo_async_openai::types::CreateChatCompletionRequest; +use dynamo_llm::entrypoint::input::worker_selection_pipeline::*; +use dynamo_llm::protocols::openai::{ + chat_completions::NvCreateChatCompletionRequest, common_ext::CommonExt, nvext::NvExt, +}; +use dynamo_runtime::protocols::annotated::AnnotationsProvider; + +#[tokio::test] +#[ignore] // Requires full distributed setup +async fn test_worker_selection_pipeline() { + // This test would require: + // - A real ModelDeploymentCard + // - A Component client connected to workers + // - A KvRouter with actual worker state + + // Example test structure: + // let engine = build_worker_selection_pipeline(...).await.unwrap(); + // + // // Create a request with query_instance_id annotation + // let request = create_test_request_with_annotation("query_instance_id"); + // let response_stream = engine.generate(request).await.unwrap(); + // + // // Use the helper function to extract worker selection information + // let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await.unwrap(); + // + // assert!(worker_id > 0); + // assert!(!tokens.is_empty()); +} + +#[test] +fn test_add_query_instance_id() { + // Test adding annotation to request without nvext + let mut request = NvCreateChatCompletionRequest { + inner: CreateChatCompletionRequest::default(), + common: CommonExt::default(), + nvext: None, + chat_template_args: None, + }; + + // Initially should not have the annotation + assert!(!request.has_annotation("query_instance_id")); + + // Add the annotation + add_query_instance_id(&mut request); + + // Now should have the annotation + assert!(request.has_annotation("query_instance_id")); + + // Test adding annotation to request that already has nvext but no annotations + let mut request2 = NvCreateChatCompletionRequest { + inner: CreateChatCompletionRequest::default(), + common: CommonExt::default(), + nvext: Some(NvExt::builder().build().unwrap()), + chat_template_args: None, + }; + + assert!(!request2.has_annotation("query_instance_id")); + add_query_instance_id(&mut request2); + assert!(request2.has_annotation("query_instance_id")); + + // Test adding annotation to request that already has annotations + let mut request3 = NvCreateChatCompletionRequest { + inner: CreateChatCompletionRequest::default(), + common: CommonExt::default(), + nvext: Some( + NvExt::builder() + .add_annotation("some_other_annotation") + .build() + .unwrap(), + ), + chat_template_args: None, + }; + + assert!(request3.has_annotation("some_other_annotation")); + assert!(!request3.has_annotation("query_instance_id")); + + add_query_instance_id(&mut request3); + + assert!(request3.has_annotation("some_other_annotation")); + assert!(request3.has_annotation("query_instance_id")); + + // Test that adding the same annotation twice doesn't duplicate it + add_query_instance_id(&mut request3); + + let annotations = request3.annotations().unwrap(); + let query_instance_id_count = annotations + .iter() + .filter(|&ann| ann == "query_instance_id") + .count(); + assert_eq!( + query_instance_id_count, 1, + "query_instance_id should appear only once" + ); +} + +#[test] +fn test_annotation_helper_functions() { + // Test adding worker_instance_id annotation + let mut request = NvCreateChatCompletionRequest { + inner: CreateChatCompletionRequest::default(), + common: CommonExt::default(), + nvext: None, + chat_template_args: None, + }; + + add_worker_instance_id_annotation(&mut request, 42); + + let annotations = request.annotations().unwrap(); + assert!(annotations.contains(&"worker_instance_id:42".to_string())); + + // Test adding token_data annotation + let tokens = vec![1, 2, 3, 4, 5]; + add_token_data_annotation(&mut request, &tokens); + + let annotations = request.annotations().unwrap(); + assert!(annotations.contains(&"token_data:[1,2,3,4,5]".to_string())); + assert!(annotations.contains(&"worker_instance_id:42".to_string())); + + // Test updating worker_instance_id (should replace existing) + add_worker_instance_id_annotation(&mut request, 99); + + let annotations = request.annotations().unwrap(); + assert!(annotations.contains(&"worker_instance_id:99".to_string())); + assert!(!annotations.contains(&"worker_instance_id:42".to_string())); + + // Verify token_data is still there + assert!(annotations.contains(&"token_data:[1,2,3,4,5]".to_string())); +} From d33a14965ddf7acbd7d5ceac64929d13b8ee0df7 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 17 Sep 2025 18:05:28 -0700 Subject: [PATCH 03/18] Added router config Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 108 +++++++----------- .../input/worker_selection_pipeline.rs | 27 +++-- .../tests/test_worker_selection_pipeline.rs | 22 ---- 3 files changed, 59 insertions(+), 98 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 729eee390a..2939d9149a 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -344,18 +344,28 @@ pub struct WorkerSelectionPipeline { /// /// Returns a pipeline handle that can be used repeatedly for queries. /// Call dynamo_destroy_worker_selection_pipeline when done. +/// Uses the "generate" endpoint by default. /// /// # Safety -/// The namespace_c_str, component_c_str, endpoint_name_c_str, and model_name_c_str +/// The namespace_c_str, component_c_str, and model_name_c_str /// are passed as pointers to C strings +/// +/// # KV Router Configuration Parameters +/// - overlap_score_weight: Weight for KV cache overlap in worker selection (use negative value for default) +/// - router_temperature: Randomness in worker selection, 0.0 = deterministic (use negative value for default) +/// - use_kv_events: Whether to use KV cache events for tracking +/// - router_replica_sync: Whether to synchronize router state across replicas #[unsafe(no_mangle)] pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( namespace_c_str: *const c_char, component_c_str: *const c_char, - endpoint_name_c_str: *const c_char, model_name_c_str: *const c_char, use_kv_routing: bool, - busy_threshold: f64, // Use negative value to indicate None + busy_threshold: f64, // Use negative value to indicate None + overlap_score_weight: f64, // Use negative value for default + router_temperature: f64, // Use negative value for default + use_kv_events: bool, + router_replica_sync: bool, pipeline_out: *mut *mut WorkerSelectionPipeline, ) -> DynamoLlmResult { let wk = match WK.get() { @@ -387,14 +397,6 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( } }; - let endpoint_name = match unsafe { CStr::from_ptr(endpoint_name_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert endpoint_name C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { Ok(s) => s, Err(e) => { @@ -416,14 +418,39 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( Some(busy_threshold) }; + // Create KV router config if using KV routing + let kv_router_config = if use_kv_routing { + use dynamo_llm::kv_router::KvRouterConfig; + + Some(KvRouterConfig::new( + if overlap_score_weight < 0.0 { + None + } else { + Some(overlap_score_weight) + }, + if router_temperature < 0.0 { + None + } else { + Some(router_temperature) + }, + Some(use_kv_events), + Some(router_replica_sync), + None, // max_num_batched_tokens - use default + None, // router_snapshot_threshold - use default + None, // router_reset_states - use default + )) + } else { + None + }; + // Create the worker selection pipeline let pipeline = match create_worker_selection_pipeline::( namespace, component_name, - endpoint_name, model_name, router_mode, busy_threshold_opt, + kv_router_config, ) .await { @@ -449,63 +476,6 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( result } -/// Run a query on an existing worker selection pipeline -/// -/// # Safety -/// The pipeline pointer must be valid and prompt_c_str must be a valid C string -#[unsafe(no_mangle)] -pub unsafe extern "C" fn dynamo_worker_selection_pipeline_query( - pipeline: *mut WorkerSelectionPipeline, - prompt_c_str: *const c_char, - worker_instance_id_out: *mut i64, - token_ids_out: *mut *mut u32, - token_count_out: *mut usize, -) -> DynamoLlmResult { - if pipeline.is_null() { - eprintln!("Pipeline pointer is null"); - return DynamoLlmResult::ERR; - } - - let wk = match WK.get() { - Some(wk) => wk, - None => { - eprintln!("Runtime not initialized - call dynamo_llm_init first"); - return DynamoLlmResult::ERR; - } - }; - - let rt = wk.runtime(); - let secondary = rt.secondary().clone(); - - let result = secondary.block_on(async { - // Convert C string to Rust string - let prompt = match unsafe { CStr::from_ptr(prompt_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert prompt C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - - // TODO: Actually use the pipeline to run the query - // For now, return placeholder values - eprintln!( - "Query '{}' on pipeline - TODO: implement actual query", - prompt - ); - - unsafe { - *worker_instance_id_out = 1; - *token_ids_out = std::ptr::null_mut(); - *token_count_out = 0; - } - - DynamoLlmResult::OK - }); - - result -} - /// Destroy a worker selection pipeline and free its memory /// /// # Safety diff --git a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs index 463484a388..f2d15dce42 100644 --- a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs +++ b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs @@ -36,9 +36,12 @@ use std::{pin::Pin, sync::Arc}; use serde_json; +// Constants +const GENERATE_ENDPOINT: &str = "generate"; + use crate::{ backend::Backend, - kv_router::{KvPushRouter, KvRouter}, + kv_router::{KvPushRouter, KvRouter, KvRouterConfig}, migration::Migration, model_card::ModelDeploymentCard, preprocessor::OpenAIPreprocessor, @@ -320,27 +323,37 @@ pub async fn query_worker_selection_and_annotate( /// /// This function creates all the necessary parameters for `build_worker_selection_pipeline` /// when you have namespace, component, and model information as strings. +/// Uses the "generate" endpoint by default. /// /// # Parameters /// - `namespace`: namespace name /// - `component_name`: component name -/// - `endpoint_name`: Name of the endpoint to connect to (e.g., "inference") /// - `model_name`: Name/slug of the model to load /// - `router_mode`: How to route requests (KV, RoundRobin, etc.) /// - `busy_threshold`: Optional threshold for busy worker detection +/// - `kv_router_config`: Optional KV router configuration (only used when router_mode is KV) /// /// # Returns /// A configured worker selection pipeline ready to use /// /// # Example Usage: /// ```rust,ignore +/// let kv_config = KvRouterConfig::new( +/// Some(1.5), // overlap_score_weight +/// Some(0.2), // temperature +/// None, // use_kv_events (default) +/// None, // replica_sync (default) +/// None, // max_num_batched_tokens (default) +/// None, // router_snapshot_threshold (default) +/// None, // router_reset_states (default) +/// ); /// let pipeline = create_worker_selection_pipeline( /// "my-namespace", /// "backend", -/// "inference", /// "llama3-8b-instruct", /// RouterMode::KV, -/// Some(0.8) +/// Some(0.8), +/// Some(kv_config), /// ).await?; /// /// // Use pipeline to get worker selection @@ -351,10 +364,10 @@ pub async fn query_worker_selection_and_annotate( pub async fn create_worker_selection_pipeline( namespace: &str, component_name: &str, - endpoint_name: &str, model_name: &str, router_mode: RouterMode, busy_threshold: Option, + kv_router_config: Option, ) -> anyhow::Result, ManyOut>>> where Req: dynamo_runtime::engine::Data, @@ -380,7 +393,7 @@ where // Create Component and Client let ns = distributed_runtime.namespace(namespace)?; let component = ns.component(component_name)?; - let endpoint = component.endpoint(endpoint_name); + let endpoint = component.endpoint(GENERATE_ENDPOINT); let client = endpoint.client().await?; // Load ModelDeploymentCard @@ -407,7 +420,7 @@ where &card.display_name, &component, card.kv_cache_block_size, - None, // Use default KV router config + kv_router_config, ) .await?, ) diff --git a/lib/llm/tests/test_worker_selection_pipeline.rs b/lib/llm/tests/test_worker_selection_pipeline.rs index d3ba7e835d..86a6704b6b 100644 --- a/lib/llm/tests/test_worker_selection_pipeline.rs +++ b/lib/llm/tests/test_worker_selection_pipeline.rs @@ -10,28 +10,6 @@ use dynamo_llm::protocols::openai::{ }; use dynamo_runtime::protocols::annotated::AnnotationsProvider; -#[tokio::test] -#[ignore] // Requires full distributed setup -async fn test_worker_selection_pipeline() { - // This test would require: - // - A real ModelDeploymentCard - // - A Component client connected to workers - // - A KvRouter with actual worker state - - // Example test structure: - // let engine = build_worker_selection_pipeline(...).await.unwrap(); - // - // // Create a request with query_instance_id annotation - // let request = create_test_request_with_annotation("query_instance_id"); - // let response_stream = engine.generate(request).await.unwrap(); - // - // // Use the helper function to extract worker selection information - // let (worker_id, tokens) = extract_worker_selection_from_stream(response_stream).await.unwrap(); - // - // assert!(worker_id > 0); - // assert!(!tokens.is_empty()); -} - #[test] fn test_add_query_instance_id() { // Test adding annotation to request without nvext From a3860831658d1a9688ab5ddb16e44b3d533f0423 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 18 Sep 2025 12:52:11 -0700 Subject: [PATCH 04/18] refactor with 2 impl to avoid templating Signed-off-by: Anna Tchernych --- lib/bindings/c/Cargo.toml | 2 +- lib/bindings/c/src/lib.rs | 8 +- .../input/worker_selection_pipeline.rs | 150 +++++++++++++++++- 3 files changed, 154 insertions(+), 6 deletions(-) diff --git a/lib/bindings/c/Cargo.toml b/lib/bindings/c/Cargo.toml index da67ee3196..ad5f6bb44b 100644 --- a/lib/bindings/c/Cargo.toml +++ b/lib/bindings/c/Cargo.toml @@ -24,7 +24,7 @@ repository.workspace = true [lib] name = "dynamo_llm_capi" -crate-type = ["cdylib"] +crate-type = ["cdylib", "staticlib"] [build-dependencies] cbindgen = "0.27" diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 2939d9149a..e43ca0b00f 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -8,7 +8,7 @@ use std::ffi::CStr; use std::sync::atomic::{AtomicU32, Ordering}; use dynamo_llm::entrypoint::input::worker_selection_pipeline::{ - create_worker_selection_pipeline, query_worker_selection_and_annotate, + create_worker_selection_pipeline_chat, query_worker_selection_and_annotate, }; use dynamo_llm::kv_router::{ indexer::compute_block_hash_for_seq, protocols::*, publisher::KvEventPublisher, @@ -444,7 +444,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( }; // Create the worker selection pipeline - let pipeline = match create_worker_selection_pipeline::( + let pipeline = match create_worker_selection_pipeline_chat( namespace, component_name, model_name, @@ -580,13 +580,13 @@ pub unsafe extern "C" fn dynamo_query_worker_selection_and_annotate( } else { let tokens_len = tokens.len(); let layout = std::alloc::Layout::array::(tokens_len).unwrap(); - let ptr = std::alloc::alloc(layout) as *mut u32; + let ptr = unsafe { std::alloc::alloc(layout) as *mut u32 }; if ptr.is_null() { eprintln!("Failed to allocate memory for tokens"); return DynamoLlmResult::ERR; } // Copy tokens to allocated memory - std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len); + unsafe { std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len) }; ptr }; diff --git a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs index f2d15dce42..bbe14c5116 100644 --- a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs +++ b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs @@ -319,7 +319,155 @@ pub async fn query_worker_selection_and_annotate( Ok((worker_id, tokens, original_request)) } -/// Helper function to create worker selection pipeline from string parameters +/// Build a worker selection pipeline specifically for Chat Completion requests +/// +/// This pipeline: frontend -> preprocessor -> backend -> migration -> router +/// The router handles query_instance_id annotations and returns worker_instance_id and token_data annotations. +pub async fn build_worker_selection_pipeline_chat( + card: &ModelDeploymentCard, + client: &Client, + router_mode: RouterMode, + busy_threshold: Option, + chooser: Option>, + hf_tokenizer: tokenizers::Tokenizer, +) -> anyhow::Result< + ServiceEngine, ManyOut>>, +> { + use crate::backend::Backend; + use crate::migration::Migration; + use crate::preprocessor::prompt::PromptFormatter; + + let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?; + let preprocessor = + OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?; + + let frontend = SegmentSource::< + SingleIn, + ManyOut>, + >::new(); + let preprocessor_op = preprocessor.into_operator(); + let backend = Backend::from_tokenizer(hf_tokenizer).into_operator(); + let migration = Migration::from_mdc(card).into_operator(); + + let router = + PushRouter::>::from_client_with_threshold( + client.clone(), + router_mode, + busy_threshold, + ) + .await?; + + let service_backend = match router_mode { + RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => { + ServiceBackend::from_engine(Arc::new(router)) + } + RouterMode::KV => { + let Some(chooser) = chooser else { + anyhow::bail!("RouterMode::KV requires KvRouter to not be null"); + }; + let kv_push_router = KvPushRouter::new(router, chooser); + ServiceBackend::from_engine(Arc::new(kv_push_router)) + } + }; + + // Build pipeline - forward path only (router handles query_instance_id and returns annotations) + frontend + .link(preprocessor_op.forward_edge())? + .link(backend.forward_edge())? + .link(migration.forward_edge())? + .link(service_backend)?; + + Ok(frontend) +} + +/// Helper function to create worker selection pipeline for OpenAI Chat Completion requests +/// +/// This is a concrete implementation that works specifically with NvCreateChatCompletionRequest +/// and is designed for use with C bindings. Uses the "generate" endpoint by default. +/// +/// # Parameters +/// - `namespace`: namespace name +/// - `component_name`: component name +/// - `model_name`: Name/slug of the model to load +/// - `router_mode`: How to route requests (KV, RoundRobin, etc.) +/// - `busy_threshold`: Optional threshold for busy worker detection +/// - `kv_router_config`: Optional KV router configuration (only used when router_mode is KV) +/// +/// # Returns +/// A configured worker selection pipeline ready to use +pub async fn create_worker_selection_pipeline_chat( + namespace: &str, + component_name: &str, + model_name: &str, + router_mode: RouterMode, + busy_threshold: Option, + kv_router_config: Option, +) -> anyhow::Result< + ServiceEngine, ManyOut>>, +> { + use crate::{discovery::ModelManager, model_card::ModelDeploymentCard}; + use anyhow::Context; + use dynamo_runtime::{ + DistributedRuntime, Runtime, distributed::DistributedConfig, slug::Slug, + traits::DistributedRuntimeProvider, + }; + + // Create DistributedRuntime + let runtime = Runtime::from_settings()?; + let dst_config = DistributedConfig::from_settings(true); + let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?; + + // Create Component and Client + let ns = distributed_runtime.namespace(namespace)?; + let component = ns.component(component_name)?; + let endpoint = component.endpoint(GENERATE_ENDPOINT); + let client = endpoint.client().await?; + + // Load ModelDeploymentCard + let model_slug = Slug::from_string(model_name); + let card = match ModelDeploymentCard::load_from_store(&model_slug, component.drt()).await { + Ok(Some(card)) => card, + Ok(None) => anyhow::bail!("ModelDeploymentCard not found for model: {}", model_name), + Err(err) => anyhow::bail!( + "Error fetching ModelDeploymentCard from storage under key {model_slug}. {err}" + ), + }; + + // Get tokenizer from the model card + let hf_tokenizer = card + .tokenizer_hf() + .with_context(|| format!("Failed to load tokenizer for model: {}", model_name))?; + + // Create KV chooser if using KV routing mode + let chooser = if router_mode == RouterMode::KV { + let model_manager = std::sync::Arc::new(ModelManager::new()); + Some( + model_manager + .kv_chooser_for( + &card.display_name, + &component, + card.kv_cache_block_size, + kv_router_config, + ) + .await?, + ) + } else { + None + }; + + // Build and return the worker selection pipeline + build_worker_selection_pipeline_chat( + &card, + &client, + router_mode, + busy_threshold, + chooser, + hf_tokenizer, + ) + .await +} + +/// Generic helper function to create worker selection pipeline from string parameters /// /// This function creates all the necessary parameters for `build_worker_selection_pipeline` /// when you have namespace, component, and model information as strings. From a769341c751d15ee41e297ac3d7ac217b807c935 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Sun, 21 Sep 2025 16:49:20 -0700 Subject: [PATCH 05/18] add config Signed-off-by: Anna Tchernych --- deploy/inference-gateway/values-epp-aware.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/deploy/inference-gateway/values-epp-aware.yaml b/deploy/inference-gateway/values-epp-aware.yaml index 4ff3c58119..c5173d5f03 100644 --- a/deploy/inference-gateway/values-epp-aware.yaml +++ b/deploy/inference-gateway/values-epp-aware.yaml @@ -15,7 +15,7 @@ eppAware: enabled: true - eppImage: docker.io/lambda108/epp-inference-extension-dynamo:v0.5.1-1 + eppImage: gitlab-master.nvidia.com:5005/dl/ai-dynamo/dynamo/epp-lib-7 imagePullSecrets: - docker-imagepullsecret @@ -24,3 +24,7 @@ epp: extraEnv: - name: USE_STREAMING value: "true" + - name: DYNAMO_COMPONENT + value: "backend" + - name: DYNAMO_KV_BLOCK_SIZE + value: 512 From 2180566cf6d4a9507eded4d492b020b5b59ec6c7 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Sun, 21 Sep 2025 17:39:32 -0700 Subject: [PATCH 06/18] Add missing functions to the cbingen.toml Signed-off-by: Anna Tchernych --- lib/bindings/c/cbindgen.toml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/bindings/c/cbindgen.toml b/lib/bindings/c/cbindgen.toml index dd7f5d01b7..bf1744c55b 100644 --- a/lib/bindings/c/cbindgen.toml +++ b/lib/bindings/c/cbindgen.toml @@ -25,7 +25,19 @@ enum_class = false [export] -include = ["DynamoLlmResult", "dynamo_llm_init", "dynamo_llm_shutdown"] +include = [ + "DynamoLlmResult", + "dynamo_llm_init", + "dynamo_llm_shutdown", + "dynamo_llm_load_publisher_create", + "dynamo_kv_event_publish_stored", + "dynamo_kv_event_publish_removed", + "WorkerSelectionPipeline", + "dynamo_create_worker_selection_pipeline", + "dynamo_destroy_worker_selection_pipeline", + "dynamo_query_worker_selection_and_annotate", + "dynamo_free_worker_selection_result" +] [export.rename] "DynamoLlmResult" = "dynamo_llm_result_t" From aa8e5017c8f2a69bf24c3b1c11b0dbc37a4b4fb7 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Sun, 21 Sep 2025 21:28:17 -0700 Subject: [PATCH 07/18] Fix ../../lib/bindings/c/src/lib.rs Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index e43ca0b00f..041081064e 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -104,18 +104,22 @@ pub unsafe extern "C" fn dynamo_llm_init( } } +use std::panic::{AssertUnwindSafe, catch_unwind}; + #[unsafe(no_mangle)] pub extern "C" fn dynamo_llm_shutdown() -> DynamoLlmResult { - let wk = match WK.get() { - Some(wk) => wk, - None => { - eprintln!("Runtime not initialized"); - return DynamoLlmResult::ERR; - } + let Some(wk) = WK.get().cloned() else { + eprintln!("Runtime not initialized"); + return DynamoLlmResult::ERR; }; - - wk.runtime().shutdown(); - + let res = catch_unwind(AssertUnwindSafe(|| { + // bounce to a plain thread to avoid Tokio’s guard + let _ = std::thread::spawn(move || wk.runtime().shutdown()).join(); + })); + if res.is_err() { + eprintln!("Runtime shutdown panicked; ignoring"); + return DynamoLlmResult::ERR; + } DynamoLlmResult::OK } From 5bbd15cdb77c96055fc96af566617e43c9fa332c Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Sun, 21 Sep 2025 22:01:48 -0700 Subject: [PATCH 08/18] more clenaup Signed-off-by: Anna Tchernych --- .../inference-gateway/values-epp-aware.yaml | 4 + lib/bindings/c/src/lib.rs | 629 ++++++++++-------- 2 files changed, 343 insertions(+), 290 deletions(-) diff --git a/deploy/inference-gateway/values-epp-aware.yaml b/deploy/inference-gateway/values-epp-aware.yaml index c5173d5f03..a1f7a6cf27 100644 --- a/deploy/inference-gateway/values-epp-aware.yaml +++ b/deploy/inference-gateway/values-epp-aware.yaml @@ -28,3 +28,7 @@ epp: value: "backend" - name: DYNAMO_KV_BLOCK_SIZE value: 512 + - name: RUST_BACKTRACE + value: 1 + - name: RUST_LOG + value: tokio=trace diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 041081064e..6b0dac65f9 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -19,6 +19,9 @@ use dynamo_runtime::{ DistributedRuntime, Worker, pipeline::{ManyOut, RouterMode, ServiceEngine, SingleIn}, }; + +use std::panic::{AssertUnwindSafe, catch_unwind}; + static WK: OnceCell = OnceCell::new(); static DRT: AsyncOnceCell = AsyncOnceCell::new(); // [FIXME] shouldn't the publisher be instance passing between API calls? @@ -42,6 +45,39 @@ pub enum DynamoLlmResult { ERR = 1, } +/* ---------- small helpers: panic guard + local runtime ---------- */ + +#[inline] +fn ffi_guard DynamoLlmResult>(f: F) -> DynamoLlmResult { + match catch_unwind(AssertUnwindSafe(f)) { + Ok(r) => r, + Err(_) => { + eprintln!("Rust panic crossed FFI; returning ERR"); + DynamoLlmResult::ERR + } + } +} + +/// Run async work on a fresh single-threaded Tokio runtime on a plain OS thread. +/// This avoids Tokio's "cannot drop a runtime from within an async context" panic. +fn run_on_local_runtime(op: impl FnOnce() -> Fut + Send + 'static) -> Result +where + Fut: std::future::Future> + Send + 'static, + T: Send + 'static, +{ + std::thread::spawn(move || -> Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("build local runtime: {e:?}"))?; + rt.block_on(op()) + }) + .join() + .map_err(|_| "panic in local runtime thread".to_string())? +} + +/* ------------------------------ init ------------------------------ */ + /// # Safety /// the namespace_c_str and component_c_str are passed as pointers to C strings #[unsafe(no_mangle)] @@ -51,76 +87,79 @@ pub unsafe extern "C" fn dynamo_llm_init( worker_id: i64, kv_block_size: u32, ) -> DynamoLlmResult { - initialize_tracing(); - let wk = match WK.get_or_try_init(Worker::from_settings) { - Ok(wk) => wk.clone(), - Err(e) => { - eprintln!("Failed to initialize runtime: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - let rt = wk.runtime(); - let secondary = rt.secondary().clone(); - let result = secondary.block_on(async { - // Initialize the distributed runtime - match DRT - .get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) - .await - { - Ok(_) => Ok(()), + ffi_guard(|| { + initialize_tracing(); + + let wk = match WK.get_or_try_init(Worker::from_settings) { + Ok(wk) => wk.clone(), Err(e) => { - eprintln!("Failed to initialize distributed runtime: {:?}", e); - Err(DynamoLlmResult::ERR) + eprintln!("Failed to initialize runtime: {:?}", e); + return DynamoLlmResult::ERR; } - } - }); - let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { - Ok(s) => s.to_string(), - Err(e) => { - eprintln!("Failed to convert C string to Rust string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + }; - let component = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { - Ok(s) => s.to_string(), - Err(e) => { - eprintln!("Failed to convert C string to Rust string: {:?}", e); + // Convert C strings to owned Rust Strings before we jump threads. + let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { + Ok(s) => s.to_string(), + Err(e) => { + eprintln!("Failed to convert C string to Rust string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + let component = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { + Ok(s) => s.to_string(), + Err(e) => { + eprintln!("Failed to convert C string to Rust string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Initialize DistributedRuntime on an isolated runtime/thread. + let rt = wk.runtime().clone(); + let drt_init_res = run_on_local_runtime(move || async move { + DRT.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) + .await + .map(|_| ()) + .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}")) + }); + + if let Err(msg) = drt_init_res { + eprintln!("{msg}"); return DynamoLlmResult::ERR; } - }; - match result { - Ok(_) => match KV_PUB.get_or_try_init(move || { + // Initialize the KV publisher once. + match KV_PUB.get_or_try_init(move || { dynamo_create_kv_publisher(namespace, component, worker_id, kv_block_size) }) { Ok(_) => DynamoLlmResult::OK, Err(e) => { - eprintln!("Failed to initialize distributed runtime: {:?}", e); + eprintln!("Failed to create KV publisher: {:?}", e); DynamoLlmResult::ERR } - }, - Err(e) => e, - } + } + }) } -use std::panic::{AssertUnwindSafe, catch_unwind}; +/* ---------------------------- shutdown ---------------------------- */ #[unsafe(no_mangle)] pub extern "C" fn dynamo_llm_shutdown() -> DynamoLlmResult { - let Some(wk) = WK.get().cloned() else { - eprintln!("Runtime not initialized"); - return DynamoLlmResult::ERR; - }; - let res = catch_unwind(AssertUnwindSafe(|| { - // bounce to a plain thread to avoid Tokio’s guard - let _ = std::thread::spawn(move || wk.runtime().shutdown()).join(); - })); - if res.is_err() { - eprintln!("Runtime shutdown panicked; ignoring"); - return DynamoLlmResult::ERR; - } - DynamoLlmResult::OK + ffi_guard(|| { + let Some(wk) = WK.get().cloned() else { + eprintln!("Runtime not initialized"); + return DynamoLlmResult::ERR; + }; + let res = catch_unwind(AssertUnwindSafe(|| { + // bounce to a plain thread to avoid Tokio’s guard + let _ = std::thread::spawn(move || wk.runtime().shutdown()).join(); + })); + if res.is_err() { + eprintln!("Runtime shutdown panicked; ignoring"); + return DynamoLlmResult::ERR; + } + DynamoLlmResult::OK + }) } #[unsafe(no_mangle)] @@ -128,12 +167,7 @@ pub extern "C" fn dynamo_llm_load_publisher_create() -> DynamoLlmResult { DynamoLlmResult::OK } -// instantiate a kv publisher -// this will bring up the task to publish and the channels to await publishing events -// the [`dynamo_kv_publish_store_event`] call will use a handle to the publisher to send events -// store and the [`dynamo_kv_event_create_removed`] will create remove events -// these call mus be driving by external c++ threads that are consuming the kv events from the -// c++ executor api +/* ---------------------- KV publisher helpers ---------------------- */ fn dynamo_create_kv_publisher( namespace: String, @@ -249,6 +283,8 @@ pub struct DynamoKvStoredEventParams { pub lora_id: u64, } +/* --------------------------- KV FFI --------------------------- */ + /// # Safety /// parent_hash is passed as pointer to indicate whether the blocks /// has a parent hash or not. nullptr is used to represent no parent hash @@ -262,31 +298,39 @@ pub unsafe extern "C" fn dynamo_kv_event_publish_stored( parent_hash: *const u64, lora_id: u64, ) -> DynamoLlmResult { - let parent_hash = { - if parent_hash.is_null() { - None - } else { - Some(unsafe { *parent_hash }) - } - }; - let kv_params = DynamoKvStoredEventParams { - event_id, - token_ids, - num_block_tokens, - block_ids, - num_blocks, - parent_hash, - lora_id, - }; - let publisher = KV_PUB.get().unwrap(); - let event = kv_event_create_stored_from_parts(kv_params, publisher.kv_block_size()); - match publisher.publish(event) { - Ok(_) => DynamoLlmResult::OK, - Err(e) => { - eprintln!("Error publishing stored kv event {:?}", e); - DynamoLlmResult::ERR + ffi_guard(|| { + let parent_hash = { + if parent_hash.is_null() { + None + } else { + Some(unsafe { *parent_hash }) + } + }; + let kv_params = DynamoKvStoredEventParams { + event_id, + token_ids, + num_block_tokens, + block_ids, + num_blocks, + parent_hash, + lora_id, + }; + let publisher = match KV_PUB.get() { + Some(p) => p, + None => { + eprintln!("KV publisher not initialized"); + return DynamoLlmResult::ERR; + } + }; + let event = kv_event_create_stored_from_parts(kv_params, publisher.kv_block_size()); + match publisher.publish(event) { + Ok(_) => DynamoLlmResult::OK, + Err(e) => { + eprintln!("Error publishing stored kv event {:?}", e); + DynamoLlmResult::ERR + } } - } + }) } #[unsafe(no_mangle)] @@ -295,49 +339,27 @@ pub extern "C" fn dynamo_kv_event_publish_removed( block_ids: *const u64, num_blocks: usize, ) -> DynamoLlmResult { - let publisher = KV_PUB.get().unwrap(); - let event = kv_event_create_removed_from_parts(event_id, block_ids, num_blocks); - match publisher.publish(event) { - Ok(_) => DynamoLlmResult::OK, - Err(e) => { - eprintln!("Error publishing removed kv event {:?}", e); - DynamoLlmResult::ERR + ffi_guard(|| { + let publisher = match KV_PUB.get() { + Some(p) => p, + None => { + eprintln!("KV publisher not initialized"); + return DynamoLlmResult::ERR; + } + }; + let event = kv_event_create_removed_from_parts(event_id, block_ids, num_blocks); + match publisher.publish(event) { + Ok(_) => DynamoLlmResult::OK, + Err(e) => { + eprintln!("Error publishing removed kv event {:?}", e); + DynamoLlmResult::ERR + } } - } + }) } -// Need to setup etcd and nats to run these tests -// #[cfg(test)] -// mod tests { -// use super::*; -// use std::ffi::CString; - -// #[test] -// fn test_dynamo_llm_init() { -// // Create C-compatible strings -// let namespace = CString::new("test_namespace").unwrap(); -// let component = CString::new("test_component").unwrap(); - -// // Call the init function -// let result = unsafe { -// dynamo_llm_init( -// namespace.as_ptr(), -// component.as_ptr(), -// 1, // worker_id -// 32, // kv_block_size -// ) -// }; - -// assert_eq!(result as u32, DynamoLlmResult::OK as u32); - -// assert!(WK.get().is_some()); - -// let shutdown_result = dynamo_llm_shutdown(); -// assert_eq!(shutdown_result as u32, DynamoLlmResult::OK as u32); -// } -// } - -// -// Apis related to the best worker selection. + +/* ----------------- worker selection pipeline ----------------- */ + // Worker selection pipeline handle containing the actual pipeline pub struct WorkerSelectionPipeline { pipeline: @@ -372,6 +394,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( router_replica_sync: bool, pipeline_out: *mut *mut WorkerSelectionPipeline, ) -> DynamoLlmResult { + // Ensure runtime initialized let wk = match WK.get() { Some(wk) => wk, None => { @@ -380,104 +403,126 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( } }; - let rt = wk.runtime(); - let secondary = rt.secondary().clone(); + let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { + Ok(s) => s.to_owned(), + Err(e) => { + eprintln!("Failed to convert namespace C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; - let result = secondary.block_on(async { - // Convert C strings to Rust strings - let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert namespace C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { + Ok(s) => s.to_owned(), + Err(e) => { + eprintln!("Failed to convert component C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; - let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert component C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { + Ok(s) => s.to_owned(), + Err(e) => { + eprintln!("Failed to convert model_name C string: {:?}", e); + return DynamoLlmResult::ERR; + } + }; - let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert model_name C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + // Capture values we need inside the async closure + let use_kv_routing_c = use_kv_routing; + let busy_threshold_c = busy_threshold; + let overlap_score_weight_c = overlap_score_weight; + let router_temperature_c = router_temperature; + let use_kv_events_c = use_kv_events; + let router_replica_sync_c = router_replica_sync; + + // Clone the runtime handle so we can ensure DRT inside the closure + let rt = wk.runtime().clone(); + + // Run the async creation on a local runtime thread, returning a usize (Send) instead of a raw ptr + let join_res: Result = run_on_local_runtime(move || async move { + // Ensure DRT exists (idempotent) + DRT.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) + .await + .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}"))?; - // Determine router mode and busy threshold - let router_mode = if use_kv_routing { + // Router mode / thresholds + let router_mode = if use_kv_routing_c { RouterMode::KV } else { RouterMode::RoundRobin }; - let busy_threshold_opt = if busy_threshold < 0.0 { + let busy_threshold_opt = if busy_threshold_c < 0.0 { None } else { - Some(busy_threshold) + Some(busy_threshold_c) }; - // Create KV router config if using KV routing - let kv_router_config = if use_kv_routing { + // Optional KV router config + let kv_router_config = if use_kv_routing_c { use dynamo_llm::kv_router::KvRouterConfig; - Some(KvRouterConfig::new( - if overlap_score_weight < 0.0 { + if overlap_score_weight_c < 0.0 { None } else { - Some(overlap_score_weight) + Some(overlap_score_weight_c) }, - if router_temperature < 0.0 { + if router_temperature_c < 0.0 { None } else { - Some(router_temperature) + Some(router_temperature_c) }, - Some(use_kv_events), - Some(router_replica_sync), - None, // max_num_batched_tokens - use default - None, // router_snapshot_threshold - use default - None, // router_reset_states - use default + Some(use_kv_events_c), + Some(router_replica_sync_c), + None, // max_num_batched_tokens (default) + None, // router_snapshot_threshold (default) + None, // router_reset_states (default) )) } else { None }; - // Create the worker selection pipeline - let pipeline = match create_worker_selection_pipeline_chat( - namespace, - component_name, - model_name, + // Build the pipeline + let pipeline = create_worker_selection_pipeline_chat( + &namespace, + &component_name, + &model_name, router_mode, busy_threshold_opt, kv_router_config, ) .await - { - Ok(pipeline) => pipeline, - Err(e) => { - eprintln!("Failed to create worker selection pipeline: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + .map_err(|e| format!("Failed to create worker selection pipeline: {e:?}"))?; - // Wrap pipeline in handle struct and box it - let pipeline_handle = WorkerSelectionPipeline { pipeline }; - let boxed_pipeline = Box::new(pipeline_handle); - let pipeline_ptr = Box::into_raw(boxed_pipeline); + // Wrap and leak to raw pointer; return as usize (Send) + let handle = WorkerSelectionPipeline { pipeline }; + let raw: *mut WorkerSelectionPipeline = Box::into_raw(Box::new(handle)); + Ok(raw as usize) + }); + + // Cast usize back to the pointer type and return it + let pipeline_ptr: *mut WorkerSelectionPipeline = match join_res { + Ok(raw) => raw as *mut WorkerSelectionPipeline, + Err(msg) => { + eprintln!("{msg}"); + return DynamoLlmResult::ERR; + } + }; + if pipeline_out.is_null() { + eprintln!("pipeline_out pointer is null"); + // Avoid leaking the just-created handle if caller gave us a null out-param unsafe { - *pipeline_out = pipeline_ptr; + drop(Box::from_raw(pipeline_ptr)); } + return DynamoLlmResult::ERR; + } - DynamoLlmResult::OK - }); + unsafe { + *pipeline_out = pipeline_ptr; + } - result + DynamoLlmResult::OK } /// Destroy a worker selection pipeline and free its memory @@ -488,15 +533,15 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( pub unsafe extern "C" fn dynamo_destroy_worker_selection_pipeline( pipeline: *mut WorkerSelectionPipeline, ) -> DynamoLlmResult { - if pipeline.is_null() { - eprintln!("Pipeline pointer is null"); - return DynamoLlmResult::ERR; - } - - // Convert back to Box with correct type and let it drop to free memory - let _boxed_pipeline: Box = unsafe { Box::from_raw(pipeline) }; - - DynamoLlmResult::OK + ffi_guard(|| { + if pipeline.is_null() { + eprintln!("Pipeline pointer is null"); + return DynamoLlmResult::ERR; + } + // Convert back to Box with correct type and let it drop to free memory + let _boxed_pipeline: Box = unsafe { Box::from_raw(pipeline) }; + DynamoLlmResult::OK + }) } /// Query worker selection and return annotated request @@ -518,107 +563,109 @@ pub unsafe extern "C" fn dynamo_query_worker_selection_and_annotate( token_count_out: *mut usize, annotated_request_json_out: *mut *mut c_char, ) -> DynamoLlmResult { - if pipeline.is_null() { - eprintln!("Pipeline pointer is null"); - return DynamoLlmResult::ERR; - } - - let wk = match WK.get() { - Some(wk) => wk, - None => { - eprintln!("Runtime not initialized - call dynamo_llm_init first"); + ffi_guard(|| { + if pipeline.is_null() { + eprintln!("Pipeline pointer is null"); return DynamoLlmResult::ERR; } - }; - - let rt = wk.runtime(); - let secondary = rt.secondary().clone(); - let result = secondary.block_on(async { - // Convert C string to Rust string - let request_json = match unsafe { CStr::from_ptr(request_json_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert request JSON C string: {:?}", e); + let wk = match WK.get() { + Some(wk) => wk, + None => { + eprintln!("Runtime not initialized - call dynamo_llm_init first"); return DynamoLlmResult::ERR; } }; - // Parse JSON into NvCreateChatCompletionRequest - let original_request: NvCreateChatCompletionRequest = - match serde_json::from_str(request_json) { - Ok(req) => req, + let rt = wk.runtime(); + let secondary = rt.secondary().clone(); + + let result = secondary.block_on(async { + // Convert C string to Rust string + let request_json = match unsafe { CStr::from_ptr(request_json_c_str) }.to_str() { + Ok(s) => s, Err(e) => { - eprintln!("Failed to parse request JSON: {:?}", e); + eprintln!("Failed to convert request JSON C string: {:?}", e); return DynamoLlmResult::ERR; } }; - // Get pipeline reference - let pipeline_ref = unsafe { &*pipeline }; - - // Call the wrapper function - let (worker_id, tokens, annotated_request) = - match query_worker_selection_and_annotate(&pipeline_ref.pipeline, original_request) - .await - { - Ok(result) => result, + // Parse JSON into NvCreateChatCompletionRequest + let original_request: NvCreateChatCompletionRequest = + match serde_json::from_str(request_json) { + Ok(req) => req, + Err(e) => { + eprintln!("Failed to parse request JSON: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Get pipeline reference + let pipeline_ref = unsafe { &*pipeline }; + + // Call the wrapper function + let (worker_id, tokens, annotated_request) = + match query_worker_selection_and_annotate(&pipeline_ref.pipeline, original_request) + .await + { + Ok(result) => result, + Err(e) => { + eprintln!("Failed to query worker selection: {:?}", e); + return DynamoLlmResult::ERR; + } + }; + + // Convert annotated request back to JSON + let annotated_json = match serde_json::to_string(&annotated_request) { + Ok(json) => json, Err(e) => { - eprintln!("Failed to query worker selection: {:?}", e); + eprintln!("Failed to serialize annotated request: {:?}", e); return DynamoLlmResult::ERR; } }; - // Convert annotated request back to JSON - let annotated_json = match serde_json::to_string(&annotated_request) { - Ok(json) => json, - Err(e) => { - eprintln!("Failed to serialize annotated request: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - - // Allocate memory for tokens array - let tokens_ptr = if tokens.is_empty() { - std::ptr::null_mut() - } else { - let tokens_len = tokens.len(); - let layout = std::alloc::Layout::array::(tokens_len).unwrap(); - let ptr = unsafe { std::alloc::alloc(layout) as *mut u32 }; - if ptr.is_null() { - eprintln!("Failed to allocate memory for tokens"); - return DynamoLlmResult::ERR; - } - // Copy tokens to allocated memory - unsafe { std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len) }; - ptr - }; + // Allocate memory for tokens array + let tokens_ptr = if tokens.is_empty() { + std::ptr::null_mut() + } else { + let tokens_len = tokens.len(); + let layout = std::alloc::Layout::array::(tokens_len).unwrap(); + let ptr = unsafe { std::alloc::alloc(layout) as *mut u32 }; + if ptr.is_null() { + eprintln!("Failed to allocate memory for tokens"); + return DynamoLlmResult::ERR; + } + // Copy tokens to allocated memory + unsafe { std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len) }; + ptr + }; - // Allocate memory for annotated request JSON string - let json_cstring = match std::ffi::CString::new(annotated_json) { - Ok(cstr) => cstr, - Err(e) => { - eprintln!("Failed to create C string for annotated JSON: {:?}", e); - if !tokens_ptr.is_null() { - let layout = std::alloc::Layout::array::(tokens.len()).unwrap(); - unsafe { std::alloc::dealloc(tokens_ptr as *mut u8, layout) }; + // Allocate memory for annotated request JSON string + let json_cstring = match std::ffi::CString::new(annotated_json) { + Ok(cstr) => cstr, + Err(e) => { + eprintln!("Failed to create C string for annotated JSON: {:?}", e); + if !tokens_ptr.is_null() { + let layout = std::alloc::Layout::array::(tokens.len()).unwrap(); + unsafe { std::alloc::dealloc(tokens_ptr as *mut u8, layout) }; + } + return DynamoLlmResult::ERR; } - return DynamoLlmResult::ERR; - } - }; + }; - // Set output parameters - unsafe { - *worker_instance_id_out = worker_id; - *token_ids_out = tokens_ptr; - *token_count_out = tokens.len(); - *annotated_request_json_out = json_cstring.into_raw(); - } + // Set output parameters + unsafe { + *worker_instance_id_out = worker_id; + *token_ids_out = tokens_ptr; + *token_count_out = tokens.len(); + *annotated_request_json_out = json_cstring.into_raw(); + } - DynamoLlmResult::OK - }); + DynamoLlmResult::OK + }); - result + result + }) } /// Free memory allocated by dynamo_query_worker_selection_and_annotate @@ -632,23 +679,25 @@ pub unsafe extern "C" fn dynamo_free_worker_selection_result( token_count: usize, annotated_request_json: *mut c_char, ) -> DynamoLlmResult { - // Free tokens array if not null - if !token_ids.is_null() && token_count > 0 { - let layout = match std::alloc::Layout::array::(token_count) { - Ok(layout) => layout, - Err(_) => { - eprintln!("Invalid layout for tokens array"); - return DynamoLlmResult::ERR; - } - }; - unsafe { std::alloc::dealloc(token_ids as *mut u8, layout) }; - } + ffi_guard(|| { + // Free tokens array if not null + if !token_ids.is_null() && token_count > 0 { + let layout = match std::alloc::Layout::array::(token_count) { + Ok(layout) => layout, + Err(_) => { + eprintln!("Invalid layout for tokens array"); + return DynamoLlmResult::ERR; + } + }; + unsafe { std::alloc::dealloc(token_ids as *mut u8, layout) }; + } - // Free JSON string if not null - if !annotated_request_json.is_null() { - let _cstring = unsafe { std::ffi::CString::from_raw(annotated_request_json) }; - // CString will be automatically freed when it goes out of scope - } + // Free JSON string if not null + if !annotated_request_json.is_null() { + let _cstring = unsafe { std::ffi::CString::from_raw(annotated_request_json) }; + // CString will be automatically freed when it goes out of scope + } - DynamoLlmResult::OK + DynamoLlmResult::OK + }) } From aed14baa5e5954e7adf1fc0d78c433fe8f51fb01 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Sun, 21 Sep 2025 22:20:20 -0700 Subject: [PATCH 09/18] more Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 6b0dac65f9..d690a98c86 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -59,7 +59,8 @@ fn ffi_guard DynamoLlmResult>(f: F) -> DynamoLlmResult { } /// Run async work on a fresh single-threaded Tokio runtime on a plain OS thread. -/// This avoids Tokio's "cannot drop a runtime from within an async context" panic. +/// Uses `shutdown_background()` to avoid the "Cannot drop a runtime where blocking +/// is not allowed" panic during runtime drop. fn run_on_local_runtime(op: impl FnOnce() -> Fut + Send + 'static) -> Result where Fut: std::future::Future> + Send + 'static, @@ -70,7 +71,14 @@ where .enable_all() .build() .map_err(|e| format!("build local runtime: {e:?}"))?; - rt.block_on(op()) + + // run the future + let out = rt.block_on(op()); + + // IMPORTANT: avoid blocking shutdown on this thread + rt.shutdown_background(); + + out }) .join() .map_err(|_| "panic in local runtime thread".to_string())? From e2803ae89aef80cb645ab912b0ecdcaf71af20dd Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Sun, 21 Sep 2025 22:39:00 -0700 Subject: [PATCH 10/18] option 1 Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 281 +++++++++++++++++++++++++++++--------- 1 file changed, 214 insertions(+), 67 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index d690a98c86..f4cc472d0d 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -395,9 +395,9 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( component_c_str: *const c_char, model_name_c_str: *const c_char, use_kv_routing: bool, - busy_threshold: f64, // Use negative value to indicate None - overlap_score_weight: f64, // Use negative value for default - router_temperature: f64, // Use negative value for default + busy_threshold: f64, // negative => None + overlap_score_weight: f64, // negative => default + router_temperature: f64, // negative => default use_kv_events: bool, router_replica_sync: bool, pipeline_out: *mut *mut WorkerSelectionPipeline, @@ -411,6 +411,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( } }; + // Convert C strings up-front to owned Strings we can move across threads. let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { Ok(s) => s.to_owned(), Err(e) => { @@ -418,7 +419,6 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( return DynamoLlmResult::ERR; } }; - let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { Ok(s) => s.to_owned(), Err(e) => { @@ -426,7 +426,6 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( return DynamoLlmResult::ERR; } }; - let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { Ok(s) => s.to_owned(), Err(e) => { @@ -435,7 +434,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( } }; - // Capture values we need inside the async closure + // Capture flags let use_kv_routing_c = use_kv_routing; let busy_threshold_c = busy_threshold; let overlap_score_weight_c = overlap_score_weight; @@ -443,73 +442,94 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( let use_kv_events_c = use_kv_events; let router_replica_sync_c = router_replica_sync; - // Clone the runtime handle so we can ensure DRT inside the closure - let rt = wk.runtime().clone(); - - // Run the async creation on a local runtime thread, returning a usize (Send) instead of a raw ptr - let join_res: Result = run_on_local_runtime(move || async move { - // Ensure DRT exists (idempotent) - DRT.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) - .await - .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}"))?; - - // Router mode / thresholds - let router_mode = if use_kv_routing_c { - RouterMode::KV - } else { - RouterMode::RoundRobin - }; - - let busy_threshold_opt = if busy_threshold_c < 0.0 { - None - } else { - Some(busy_threshold_c) - }; + // We’ll use the global worker’s runtime just to host a spawn_blocking. + // Inside that blocking thread we build a tiny Tokio runtime to run the async work. + let host_rt = wk.runtime().clone(); + let secondary = host_rt.secondary().clone(); + + // Do the heavy work on a blocking thread to allow blocking drops. + let res: Result = secondary.block_on(async move { + tokio::task::spawn_blocking(move || -> Result { + // Tiny throwaway runtime dedicated to the async creation + let rt2 = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("build inner runtime: {e:?}"))?; + + // Run the async creation inside rt2 + let out: Result = rt2.block_on(async move { + // Ensure DRT exists (idempotent) + DRT.get_or_try_init(async { + DistributedRuntime::from_settings(host_rt.clone()).await + }) + .await + .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}"))?; - // Optional KV router config - let kv_router_config = if use_kv_routing_c { - use dynamo_llm::kv_router::KvRouterConfig; - Some(KvRouterConfig::new( - if overlap_score_weight_c < 0.0 { - None + // Router mode / thresholds + let router_mode = if use_kv_routing_c { + RouterMode::KV } else { - Some(overlap_score_weight_c) - }, - if router_temperature_c < 0.0 { + RouterMode::RoundRobin + }; + let busy_threshold_opt = if busy_threshold_c < 0.0 { None } else { - Some(router_temperature_c) - }, - Some(use_kv_events_c), - Some(router_replica_sync_c), - None, // max_num_batched_tokens (default) - None, // router_snapshot_threshold (default) - None, // router_reset_states (default) - )) - } else { - None - }; + Some(busy_threshold_c) + }; - // Build the pipeline - let pipeline = create_worker_selection_pipeline_chat( - &namespace, - &component_name, - &model_name, - router_mode, - busy_threshold_opt, - kv_router_config, - ) - .await - .map_err(|e| format!("Failed to create worker selection pipeline: {e:?}"))?; + // Optional KV router config + let kv_router_config = if use_kv_routing_c { + use dynamo_llm::kv_router::KvRouterConfig; + Some(KvRouterConfig::new( + if overlap_score_weight_c < 0.0 { + None + } else { + Some(overlap_score_weight_c) + }, + if router_temperature_c < 0.0 { + None + } else { + Some(router_temperature_c) + }, + Some(use_kv_events_c), + Some(router_replica_sync_c), + None, // max_num_batched_tokens + None, // router_snapshot_threshold + None, // router_reset_states + )) + } else { + None + }; + + // Build the pipeline + let pipeline = create_worker_selection_pipeline_chat( + &namespace, + &component_name, + &model_name, + router_mode, + busy_threshold_opt, + kv_router_config, + ) + .await + .map_err(|e| format!("Failed to create worker selection pipeline: {e:?}"))?; - // Wrap and leak to raw pointer; return as usize (Send) - let handle = WorkerSelectionPipeline { pipeline }; - let raw: *mut WorkerSelectionPipeline = Box::into_raw(Box::new(handle)); - Ok(raw as usize) + // Wrap and return pointer as usize (Send) + let handle = WorkerSelectionPipeline { pipeline }; + let raw: *mut WorkerSelectionPipeline = Box::into_raw(Box::new(handle)); + Ok(raw as usize) + }); + + // Make sure *this* runtime’s blocking shutdown happens off the async path + rt2.shutdown_background(); + + out + }) + .await + .map_err(|e| format!("spawn_blocking join error: {e:?}"))? }); - // Cast usize back to the pointer type and return it - let pipeline_ptr: *mut WorkerSelectionPipeline = match join_res { + // Unpack result + let pipeline_ptr = match res { Ok(raw) => raw as *mut WorkerSelectionPipeline, Err(msg) => { eprintln!("{msg}"); @@ -519,13 +539,11 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( if pipeline_out.is_null() { eprintln!("pipeline_out pointer is null"); - // Avoid leaking the just-created handle if caller gave us a null out-param unsafe { drop(Box::from_raw(pipeline_ptr)); } return DynamoLlmResult::ERR; } - unsafe { *pipeline_out = pipeline_ptr; } @@ -533,6 +551,135 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( DynamoLlmResult::OK } +// Below is the simplest fix +// pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( +// namespace_c_str: *const c_char, +// component_c_str: *const c_char, +// model_name_c_str: *const c_char, +// use_kv_routing: bool, +// busy_threshold: f64, // < 0 => None +// overlap_score_weight: f64, // < 0 => default +// router_temperature: f64, // < 0 => default +// use_kv_events: bool, +// router_replica_sync: bool, +// pipeline_out: *mut *mut WorkerSelectionPipeline, +// ) -> DynamoLlmResult { +// // Ensure runtime initialized +// let wk = match WK.get() { +// Some(wk) => wk, +// None => { +// eprintln!("Runtime not initialized - call dynamo_llm_init first"); +// return DynamoLlmResult::ERR; +// } +// }; + +// // Convert inputs to owned Strings up front (we're crossing FFI) +// let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { +// Ok(s) => s.to_owned(), +// Err(e) => { +// eprintln!("Failed to convert namespace C string: {:?}", e); +// return DynamoLlmResult::ERR; +// } +// }; +// let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { +// Ok(s) => s.to_owned(), +// Err(e) => { +// eprintln!("Failed to convert component C string: {:?}", e); +// return DynamoLlmResult::ERR; +// } +// }; +// let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { +// Ok(s) => s.to_owned(), +// Err(e) => { +// eprintln!("Failed to convert model_name C string: {:?}", e); +// return DynamoLlmResult::ERR; +// } +// }; + +// // Use the long-lived runtime (no ad-hoc runtime here). +// let rt = wk.runtime(); +// let secondary = rt.secondary().clone(); + +// let result = secondary.block_on(async { +// // Make sure DistributedRuntime is initialized (idempotent). +// if let Err(e) = DRT +// .get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) +// .await +// { +// eprintln!("Failed to initialize distributed runtime: {:?}", e); +// return DynamoLlmResult::ERR; +// } + +// // Router mode / thresholds +// let router_mode = if use_kv_routing { +// RouterMode::KV +// } else { +// RouterMode::RoundRobin +// }; +// let busy_threshold_opt = if busy_threshold < 0.0 { +// None +// } else { +// Some(busy_threshold) +// }; + +// // Optional KV router config +// let kv_router_config = if use_kv_routing { +// use dynamo_llm::kv_router::KvRouterConfig; +// Some(KvRouterConfig::new( +// if overlap_score_weight < 0.0 { +// None +// } else { +// Some(overlap_score_weight) +// }, +// if router_temperature < 0.0 { +// None +// } else { +// Some(router_temperature) +// }, +// Some(use_kv_events), +// Some(router_replica_sync), +// None, // max_num_batched_tokens +// None, // router_snapshot_threshold +// None, // router_reset_states +// )) +// } else { +// None +// }; + +// // Build the pipeline on the persistent runtime. +// let pipeline = match create_worker_selection_pipeline_chat( +// &namespace, +// &component_name, +// &model_name, +// router_mode, +// busy_threshold_opt, +// kv_router_config, +// ) +// .await +// { +// Ok(p) => p, +// Err(e) => { +// eprintln!("Failed to create worker selection pipeline: {:?}", e); +// return DynamoLlmResult::ERR; +// } +// }; + +// // Hand back an opaque pointer +// let handle = WorkerSelectionPipeline { pipeline }; +// let raw = Box::into_raw(Box::new(handle)); +// if pipeline_out.is_null() { +// // avoid leaking if caller passed null out-arg +// unsafe { drop(Box::from_raw(raw)) }; +// eprintln!("pipeline_out pointer is null"); +// return DynamoLlmResult::ERR; +// } +// unsafe { *pipeline_out = raw }; +// DynamoLlmResult::OK +// }); + +// result +// } + /// Destroy a worker selection pipeline and free its memory /// /// # Safety From 23c7928b49234fdc140b5cdaade6732269bc1465 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 10:29:22 -0700 Subject: [PATCH 11/18] last attempt for async Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 43 ++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index f4cc472d0d..10cc3b64ce 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -411,7 +411,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( } }; - // Convert C strings up-front to owned Strings we can move across threads. + // Convert incoming C strings up front. let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { Ok(s) => s.to_owned(), Err(e) => { @@ -426,6 +426,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( return DynamoLlmResult::ERR; } }; + let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { Ok(s) => s.to_owned(), Err(e) => { @@ -434,7 +435,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( } }; - // Capture flags + // Flags to move into closures. let use_kv_routing_c = use_kv_routing; let busy_threshold_c = busy_threshold; let overlap_score_weight_c = overlap_score_weight; @@ -442,30 +443,26 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( let use_kv_events_c = use_kv_events; let router_replica_sync_c = router_replica_sync; - // We’ll use the global worker’s runtime just to host a spawn_blocking. - // Inside that blocking thread we build a tiny Tokio runtime to run the async work. - let host_rt = wk.runtime().clone(); - let secondary = host_rt.secondary().clone(); + // Use the worker’s runtime to host the operation. + let rt = wk.runtime(); + let secondary = rt.secondary().clone(); - // Do the heavy work on a blocking thread to allow blocking drops. + // Run on the runtime, but mark the region as “blocking allowed”. let res: Result = secondary.block_on(async move { - tokio::task::spawn_blocking(move || -> Result { - // Tiny throwaway runtime dedicated to the async creation + tokio::task::block_in_place(|| -> Result { + // Tiny throwaway runtime to drive the async builder. let rt2 = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|e| format!("build inner runtime: {e:?}"))?; - // Run the async creation inside rt2 - let out: Result = rt2.block_on(async move { - // Ensure DRT exists (idempotent) - DRT.get_or_try_init(async { - DistributedRuntime::from_settings(host_rt.clone()).await - }) - .await - .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}"))?; + // Run async pipeline creation inside rt2. + let out = rt2.block_on(async { + // Ensure global DRT (so builder won’t spin its own transient one). + DRT.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) + .await + .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}"))?; - // Router mode / thresholds let router_mode = if use_kv_routing_c { RouterMode::KV } else { @@ -477,7 +474,6 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( Some(busy_threshold_c) }; - // Optional KV router config let kv_router_config = if use_kv_routing_c { use dynamo_llm::kv_router::KvRouterConfig; Some(KvRouterConfig::new( @@ -501,7 +497,6 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( None }; - // Build the pipeline let pipeline = create_worker_selection_pipeline_chat( &namespace, &component_name, @@ -513,23 +508,19 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( .await .map_err(|e| format!("Failed to create worker selection pipeline: {e:?}"))?; - // Wrap and return pointer as usize (Send) let handle = WorkerSelectionPipeline { pipeline }; let raw: *mut WorkerSelectionPipeline = Box::into_raw(Box::new(handle)); Ok(raw as usize) }); - // Make sure *this* runtime’s blocking shutdown happens off the async path + // Drop the inner runtime in a context where blocking is allowed. rt2.shutdown_background(); out }) - .await - .map_err(|e| format!("spawn_blocking join error: {e:?}"))? }); - // Unpack result - let pipeline_ptr = match res { + let pipeline_ptr: *mut WorkerSelectionPipeline = match res { Ok(raw) => raw as *mut WorkerSelectionPipeline, Err(msg) => { eprintln!("{msg}"); From 0a19553aae7424e42595b0a43876368b84fb29a1 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 11:05:31 -0700 Subject: [PATCH 12/18] Make the code blocking Signed-off-by: Anna Tchernych --- .../src/entrypoint/input/worker_selection_pipeline.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs index bbe14c5116..71e6d8de8c 100644 --- a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs +++ b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs @@ -412,10 +412,17 @@ pub async fn create_worker_selection_pipeline_chat( traits::DistributedRuntimeProvider, }; - // Create DistributedRuntime + // --- IMPORTANT CHANGE --- + // Create a fresh Runtime + DistributedRuntime, then *leak* the DistributedRuntime + // so it won't be dropped inside an async context (which triggers Tokio's panic). + // + // This is acceptable here because this function is typically called once at startup; + // if it's called multiple times, you'll leak once per call. If that's a concern, + // switch to a global OnceCell/OnceLock to cache one instance instead. let runtime = Runtime::from_settings()?; let dst_config = DistributedConfig::from_settings(true); - let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?; + let drt_owned = DistributedRuntime::new(runtime, dst_config).await?; + let distributed_runtime: &'static DistributedRuntime = Box::leak(Box::new(drt_owned)); // Create Component and Client let ns = distributed_runtime.namespace(namespace)?; From aa1edc3f9f5c8494de70ef4e4af6c4d3dcc04246 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 11:30:09 -0700 Subject: [PATCH 13/18] fix model card error Signed-off-by: Anna Tchernych --- lib/llm/src/entrypoint/input/worker_selection_pipeline.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs index 71e6d8de8c..fc2b6599e4 100644 --- a/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs +++ b/lib/llm/src/entrypoint/input/worker_selection_pipeline.rs @@ -420,7 +420,7 @@ pub async fn create_worker_selection_pipeline_chat( // if it's called multiple times, you'll leak once per call. If that's a concern, // switch to a global OnceCell/OnceLock to cache one instance instead. let runtime = Runtime::from_settings()?; - let dst_config = DistributedConfig::from_settings(true); + let dst_config = DistributedConfig::from_settings(false); let drt_owned = DistributedRuntime::new(runtime, dst_config).await?; let distributed_runtime: &'static DistributedRuntime = Box::leak(Box::new(drt_owned)); @@ -542,7 +542,7 @@ where // Create DistributedRuntime let runtime = Runtime::from_settings()?; - let dst_config = DistributedConfig::from_settings(true); + let dst_config = DistributedConfig::from_settings(false); let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?; // Create Component and Client From 6a0fc2caf15a5c4fda5a010f2bb3ec4eda3d4532 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 15:52:36 -0700 Subject: [PATCH 14/18] fix error in query Signed-off-by: Anna Tchernych --- lib/bindings/c/cbindgen.toml | 1 + lib/bindings/c/src/lib.rs | 657 +++++++++++++++++------------------ 2 files changed, 312 insertions(+), 346 deletions(-) diff --git a/lib/bindings/c/cbindgen.toml b/lib/bindings/c/cbindgen.toml index bf1744c55b..e48adc6c4c 100644 --- a/lib/bindings/c/cbindgen.toml +++ b/lib/bindings/c/cbindgen.toml @@ -39,5 +39,6 @@ include = [ "dynamo_free_worker_selection_result" ] + [export.rename] "DynamoLlmResult" = "dynamo_llm_result_t" diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 10cc3b64ce..8bd47e9054 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -1,6 +1,5 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 - use async_once_cell::OnceCell as AsyncOnceCell; use libc::c_char; use once_cell::sync::OnceCell; @@ -23,7 +22,9 @@ use dynamo_runtime::{ use std::panic::{AssertUnwindSafe, catch_unwind}; static WK: OnceCell = OnceCell::new(); + static DRT: AsyncOnceCell = AsyncOnceCell::new(); + // [FIXME] shouldn't the publisher be instance passing between API calls? static KV_PUB: OnceCell = OnceCell::new(); @@ -212,6 +213,7 @@ fn kv_event_create_stored_block_from_parts( tokens_hash, } } + static WARN_COUNT: AtomicU32 = AtomicU32::new(0); fn kv_event_create_stored_from_parts( @@ -366,12 +368,156 @@ pub extern "C" fn dynamo_kv_event_publish_removed( }) } +/* ------------------------------------------------------------- */ /* ----------------- worker selection pipeline ----------------- */ +/* ------------------------------------------------------------- */ +use tokio::sync::{mpsc, oneshot}; + +enum Cmd { + CreatePipeline { + namespace: String, + component: String, + model: String, + use_kv_routing: bool, + busy_threshold: Option, + overlap_score_weight: Option, + router_temperature: Option, + use_kv_events: bool, + router_replica_sync: bool, + resp: oneshot::Sender>, // returns a pipeline id + }, + Query { + pipeline_id: u64, + request: NvCreateChatCompletionRequest, + resp: oneshot::Sender, NvCreateChatCompletionRequest), String>>, + }, + DestroyPipeline { + pipeline_id: u64, + resp: oneshot::Sender<()>, + }, +} + +struct Host { + tx: mpsc::Sender, +} + +static HOST: OnceCell = OnceCell::new(); + +fn ensure_host_started() -> Result<&'static Host, String> { + HOST.get_or_try_init(|| -> Result { + let (tx, mut rx) = mpsc::channel::(128); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("host runtime build failed"); + + rt.block_on(async move { + use std::collections::HashMap; + use std::sync::atomic::{AtomicU64, Ordering}; + static NEXT_ID: AtomicU64 = AtomicU64::new(1); + + // Pipeline state lives inside the runtime + struct HeldPipeline { + pipeline: ServiceEngine< + SingleIn, + ManyOut>, + >, + } + let mut pipelines: HashMap = HashMap::new(); + + while let Some(cmd) = rx.recv().await { + match cmd { + Cmd::CreatePipeline { + namespace, + component, + model, + use_kv_routing, + busy_threshold, + overlap_score_weight, + router_temperature, + use_kv_events, + router_replica_sync, + resp, + } => { + let fut = async { + let router_mode = if use_kv_routing { + RouterMode::KV + } else { + RouterMode::RoundRobin + }; + + let kv_router_config = if use_kv_routing { + use dynamo_llm::kv_router::KvRouterConfig; + Some(KvRouterConfig::new( + overlap_score_weight, + router_temperature, + Some(use_kv_events), + Some(router_replica_sync), + None, // max_num_batched_tokens + None, // router_snapshot_threshold + None, // router_reset_states + )) + } else { + None + }; + + let pipeline = create_worker_selection_pipeline_chat( + &namespace, + &component, + &model, + router_mode, + busy_threshold, + kv_router_config, + ) + .await + .map_err(|e| format!("{e:?}"))?; + + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + pipelines.insert(id, HeldPipeline { pipeline }); + Ok::(id) + }; + + let _ = resp.send(fut.await); + } + + Cmd::Query { + pipeline_id, + request, + resp, + } => { + let fut = async { + let hp = pipelines + .get(&pipeline_id) + .ok_or_else(|| "invalid pipeline id".to_string())?; + + query_worker_selection_and_annotate(&hp.pipeline, request) + .await + .map_err(|e| format!("{e:?}")) + }; + + let _ = resp.send(fut.await); + } + + Cmd::DestroyPipeline { pipeline_id, resp } => { + pipelines.remove(&pipeline_id); + let _ = resp.send(()); + } + } + } + }); + + // runtime drops here when loop ends + }); + + Ok(Host { tx }) + }) +} // Worker selection pipeline handle containing the actual pipeline pub struct WorkerSelectionPipeline { - pipeline: - ServiceEngine, ManyOut>>, + id: u64, // id known by the host thread } /// C FFI wrapper for creating a worker selection pipeline @@ -402,275 +548,93 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( router_replica_sync: bool, pipeline_out: *mut *mut WorkerSelectionPipeline, ) -> DynamoLlmResult { - // Ensure runtime initialized - let wk = match WK.get() { - Some(wk) => wk, - None => { - eprintln!("Runtime not initialized - call dynamo_llm_init first"); - return DynamoLlmResult::ERR; - } - }; - - // Convert incoming C strings up front. - let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { - Ok(s) => s.to_owned(), - Err(e) => { - eprintln!("Failed to convert namespace C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { - Ok(s) => s.to_owned(), - Err(e) => { - eprintln!("Failed to convert component C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - - let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { - Ok(s) => s.to_owned(), - Err(e) => { - eprintln!("Failed to convert model_name C string: {:?}", e); + ffi_guard(|| { + if pipeline_out.is_null() { + eprintln!("pipeline_out pointer is null"); return DynamoLlmResult::ERR; } - }; - - // Flags to move into closures. - let use_kv_routing_c = use_kv_routing; - let busy_threshold_c = busy_threshold; - let overlap_score_weight_c = overlap_score_weight; - let router_temperature_c = router_temperature; - let use_kv_events_c = use_kv_events; - let router_replica_sync_c = router_replica_sync; - - // Use the worker’s runtime to host the operation. - let rt = wk.runtime(); - let secondary = rt.secondary().clone(); - - // Run on the runtime, but mark the region as “blocking allowed”. - let res: Result = secondary.block_on(async move { - tokio::task::block_in_place(|| -> Result { - // Tiny throwaway runtime to drive the async builder. - let rt2 = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| format!("build inner runtime: {e:?}"))?; - - // Run async pipeline creation inside rt2. - let out = rt2.block_on(async { - // Ensure global DRT (so builder won’t spin its own transient one). - DRT.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) - .await - .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}"))?; - - let router_mode = if use_kv_routing_c { - RouterMode::KV - } else { - RouterMode::RoundRobin - }; - let busy_threshold_opt = if busy_threshold_c < 0.0 { - None - } else { - Some(busy_threshold_c) - }; - - let kv_router_config = if use_kv_routing_c { - use dynamo_llm::kv_router::KvRouterConfig; - Some(KvRouterConfig::new( - if overlap_score_weight_c < 0.0 { - None - } else { - Some(overlap_score_weight_c) - }, - if router_temperature_c < 0.0 { - None - } else { - Some(router_temperature_c) - }, - Some(use_kv_events_c), - Some(router_replica_sync_c), - None, // max_num_batched_tokens - None, // router_snapshot_threshold - None, // router_reset_states - )) - } else { - None - }; - - let pipeline = create_worker_selection_pipeline_chat( - &namespace, - &component_name, - &model_name, - router_mode, - busy_threshold_opt, - kv_router_config, - ) - .await - .map_err(|e| format!("Failed to create worker selection pipeline: {e:?}"))?; - - let handle = WorkerSelectionPipeline { pipeline }; - let raw: *mut WorkerSelectionPipeline = Box::into_raw(Box::new(handle)); - Ok(raw as usize) - }); - // Drop the inner runtime in a context where blocking is allowed. - rt2.shutdown_background(); + // start host once + let host = match ensure_host_started() { + Ok(h) => h, + Err(e) => { + eprintln!("{e}"); + return DynamoLlmResult::ERR; + } + }; - out - }) - }); + let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { + Ok(s) => s.to_owned(), + Err(e) => { + eprintln!("bad namespace: {e:?}"); + return DynamoLlmResult::ERR; + } + }; + let component = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { + Ok(s) => s.to_owned(), + Err(e) => { + eprintln!("bad component: {e:?}"); + return DynamoLlmResult::ERR; + } + }; + let model = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { + Ok(s) => s.to_owned(), + Err(e) => { + eprintln!("bad model: {e:?}"); + return DynamoLlmResult::ERR; + } + }; - let pipeline_ptr: *mut WorkerSelectionPipeline = match res { - Ok(raw) => raw as *mut WorkerSelectionPipeline, - Err(msg) => { - eprintln!("{msg}"); + let (tx, rx) = oneshot::channel(); + let cmd = Cmd::CreatePipeline { + namespace, + component, + model, + use_kv_routing, + busy_threshold: if busy_threshold < 0.0 { + None + } else { + Some(busy_threshold) + }, + overlap_score_weight: if overlap_score_weight < 0.0 { + None + } else { + Some(overlap_score_weight) + }, + router_temperature: if router_temperature < 0.0 { + None + } else { + Some(router_temperature) + }, + use_kv_events, + router_replica_sync, + resp: tx, + }; + if let Err(e) = host.tx.blocking_send(cmd) { + eprintln!("host channel closed: {e}"); return DynamoLlmResult::ERR; } - }; - if pipeline_out.is_null() { - eprintln!("pipeline_out pointer is null"); + let id = match rx.blocking_recv() { + Ok(Ok(id)) => id, + Ok(Err(msg)) => { + eprintln!("{msg}"); + return DynamoLlmResult::ERR; + } + Err(e) => { + eprintln!("host dropped response: {e}"); + return DynamoLlmResult::ERR; + } + }; + + let handle = Box::new(WorkerSelectionPipeline { id }); unsafe { - drop(Box::from_raw(pipeline_ptr)); + *pipeline_out = Box::into_raw(handle); } - return DynamoLlmResult::ERR; - } - unsafe { - *pipeline_out = pipeline_ptr; - } - - DynamoLlmResult::OK + DynamoLlmResult::OK + }) } -// Below is the simplest fix -// pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( -// namespace_c_str: *const c_char, -// component_c_str: *const c_char, -// model_name_c_str: *const c_char, -// use_kv_routing: bool, -// busy_threshold: f64, // < 0 => None -// overlap_score_weight: f64, // < 0 => default -// router_temperature: f64, // < 0 => default -// use_kv_events: bool, -// router_replica_sync: bool, -// pipeline_out: *mut *mut WorkerSelectionPipeline, -// ) -> DynamoLlmResult { -// // Ensure runtime initialized -// let wk = match WK.get() { -// Some(wk) => wk, -// None => { -// eprintln!("Runtime not initialized - call dynamo_llm_init first"); -// return DynamoLlmResult::ERR; -// } -// }; - -// // Convert inputs to owned Strings up front (we're crossing FFI) -// let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { -// Ok(s) => s.to_owned(), -// Err(e) => { -// eprintln!("Failed to convert namespace C string: {:?}", e); -// return DynamoLlmResult::ERR; -// } -// }; -// let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { -// Ok(s) => s.to_owned(), -// Err(e) => { -// eprintln!("Failed to convert component C string: {:?}", e); -// return DynamoLlmResult::ERR; -// } -// }; -// let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() { -// Ok(s) => s.to_owned(), -// Err(e) => { -// eprintln!("Failed to convert model_name C string: {:?}", e); -// return DynamoLlmResult::ERR; -// } -// }; - -// // Use the long-lived runtime (no ad-hoc runtime here). -// let rt = wk.runtime(); -// let secondary = rt.secondary().clone(); - -// let result = secondary.block_on(async { -// // Make sure DistributedRuntime is initialized (idempotent). -// if let Err(e) = DRT -// .get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) -// .await -// { -// eprintln!("Failed to initialize distributed runtime: {:?}", e); -// return DynamoLlmResult::ERR; -// } - -// // Router mode / thresholds -// let router_mode = if use_kv_routing { -// RouterMode::KV -// } else { -// RouterMode::RoundRobin -// }; -// let busy_threshold_opt = if busy_threshold < 0.0 { -// None -// } else { -// Some(busy_threshold) -// }; - -// // Optional KV router config -// let kv_router_config = if use_kv_routing { -// use dynamo_llm::kv_router::KvRouterConfig; -// Some(KvRouterConfig::new( -// if overlap_score_weight < 0.0 { -// None -// } else { -// Some(overlap_score_weight) -// }, -// if router_temperature < 0.0 { -// None -// } else { -// Some(router_temperature) -// }, -// Some(use_kv_events), -// Some(router_replica_sync), -// None, // max_num_batched_tokens -// None, // router_snapshot_threshold -// None, // router_reset_states -// )) -// } else { -// None -// }; - -// // Build the pipeline on the persistent runtime. -// let pipeline = match create_worker_selection_pipeline_chat( -// &namespace, -// &component_name, -// &model_name, -// router_mode, -// busy_threshold_opt, -// kv_router_config, -// ) -// .await -// { -// Ok(p) => p, -// Err(e) => { -// eprintln!("Failed to create worker selection pipeline: {:?}", e); -// return DynamoLlmResult::ERR; -// } -// }; - -// // Hand back an opaque pointer -// let handle = WorkerSelectionPipeline { pipeline }; -// let raw = Box::into_raw(Box::new(handle)); -// if pipeline_out.is_null() { -// // avoid leaking if caller passed null out-arg -// unsafe { drop(Box::from_raw(raw)) }; -// eprintln!("pipeline_out pointer is null"); -// return DynamoLlmResult::ERR; -// } -// unsafe { *pipeline_out = raw }; -// DynamoLlmResult::OK -// }); - -// result -// } - /// Destroy a worker selection pipeline and free its memory /// /// # Safety @@ -684,8 +648,17 @@ pub unsafe extern "C" fn dynamo_destroy_worker_selection_pipeline( eprintln!("Pipeline pointer is null"); return DynamoLlmResult::ERR; } - // Convert back to Box with correct type and let it drop to free memory - let _boxed_pipeline: Box = unsafe { Box::from_raw(pipeline) }; + let id = unsafe { &*pipeline }.id; + let _boxed: Box = unsafe { Box::from_raw(pipeline) }; + + if let Ok(host) = ensure_host_started() { + let (tx, rx) = oneshot::channel(); + let _ = host.tx.blocking_send(Cmd::DestroyPipeline { + pipeline_id: id, + resp: tx, + }); + let _ = rx.blocking_recv(); // best-effort + } DynamoLlmResult::OK }) } @@ -714,103 +687,95 @@ pub unsafe extern "C" fn dynamo_query_worker_selection_and_annotate( eprintln!("Pipeline pointer is null"); return DynamoLlmResult::ERR; } + let host = match ensure_host_started() { + Ok(h) => h, + Err(e) => { + eprintln!("{e}"); + return DynamoLlmResult::ERR; + } + }; - let wk = match WK.get() { - Some(wk) => wk, - None => { - eprintln!("Runtime not initialized - call dynamo_llm_init first"); + let req_str = match unsafe { CStr::from_ptr(request_json_c_str) }.to_str() { + Ok(s) => s, + Err(e) => { + eprintln!("bad request json: {e:?}"); return DynamoLlmResult::ERR; } }; - let rt = wk.runtime(); - let secondary = rt.secondary().clone(); + let request: NvCreateChatCompletionRequest = match serde_json::from_str(req_str) { + Ok(r) => r, + Err(e) => { + eprintln!("parse request failed: {e:?}"); + return DynamoLlmResult::ERR; + } + }; - let result = secondary.block_on(async { - // Convert C string to Rust string - let request_json = match unsafe { CStr::from_ptr(request_json_c_str) }.to_str() { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to convert request JSON C string: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + let id = unsafe { &*pipeline }.id; - // Parse JSON into NvCreateChatCompletionRequest - let original_request: NvCreateChatCompletionRequest = - match serde_json::from_str(request_json) { - Ok(req) => req, - Err(e) => { - eprintln!("Failed to parse request JSON: {:?}", e); - return DynamoLlmResult::ERR; - } - }; - - // Get pipeline reference - let pipeline_ref = unsafe { &*pipeline }; - - // Call the wrapper function - let (worker_id, tokens, annotated_request) = - match query_worker_selection_and_annotate(&pipeline_ref.pipeline, original_request) - .await - { - Ok(result) => result, - Err(e) => { - eprintln!("Failed to query worker selection: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + let (tx, rx) = oneshot::channel(); + if let Err(e) = host.tx.blocking_send(Cmd::Query { + pipeline_id: id, + request, + resp: tx, + }) { + eprintln!("host channel closed: {e}"); + return DynamoLlmResult::ERR; + } - // Convert annotated request back to JSON - let annotated_json = match serde_json::to_string(&annotated_request) { - Ok(json) => json, - Err(e) => { - eprintln!("Failed to serialize annotated request: {:?}", e); - return DynamoLlmResult::ERR; - } - }; + let (worker_id, tokens, annotated_req) = match rx.blocking_recv() { + Ok(Ok(t)) => t, + Ok(Err(msg)) => { + eprintln!("Failed to query worker selection: {msg}"); + return DynamoLlmResult::ERR; + } + Err(e) => { + eprintln!("host dropped response: {e}"); + return DynamoLlmResult::ERR; + } + }; - // Allocate memory for tokens array - let tokens_ptr = if tokens.is_empty() { - std::ptr::null_mut() - } else { - let tokens_len = tokens.len(); - let layout = std::alloc::Layout::array::(tokens_len).unwrap(); - let ptr = unsafe { std::alloc::alloc(layout) as *mut u32 }; - if ptr.is_null() { - eprintln!("Failed to allocate memory for tokens"); - return DynamoLlmResult::ERR; - } - // Copy tokens to allocated memory - unsafe { std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len) }; - ptr - }; + // marshal outputs (same as your current code) + let tokens_ptr = if tokens.is_empty() { + std::ptr::null_mut() + } else { + let len = tokens.len(); + let layout = std::alloc::Layout::array::(len).unwrap(); + let ptr = unsafe { std::alloc::alloc(layout) as *mut u32 }; + if ptr.is_null() { + eprintln!("alloc tokens failed"); + return DynamoLlmResult::ERR; + } + unsafe { std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, len) }; + ptr + }; - // Allocate memory for annotated request JSON string - let json_cstring = match std::ffi::CString::new(annotated_json) { - Ok(cstr) => cstr, - Err(e) => { - eprintln!("Failed to create C string for annotated JSON: {:?}", e); - if !tokens_ptr.is_null() { - let layout = std::alloc::Layout::array::(tokens.len()).unwrap(); - unsafe { std::alloc::dealloc(tokens_ptr as *mut u8, layout) }; - } - return DynamoLlmResult::ERR; + let annotated_json = match serde_json::to_string(&annotated_req) { + Ok(s) => s, + Err(e) => { + eprintln!("serialize annotated req failed: {e:?}"); + return DynamoLlmResult::ERR; + } + }; + let cjson = match std::ffi::CString::new(annotated_json) { + Ok(c) => c, + Err(e) => { + eprintln!("cstr annotated failed: {e:?}"); + if !tokens_ptr.is_null() { + let layout = std::alloc::Layout::array::(tokens.len()).unwrap(); + unsafe { std::alloc::dealloc(tokens_ptr as *mut u8, layout) }; } - }; - - // Set output parameters - unsafe { - *worker_instance_id_out = worker_id; - *token_ids_out = tokens_ptr; - *token_count_out = tokens.len(); - *annotated_request_json_out = json_cstring.into_raw(); + return DynamoLlmResult::ERR; } + }; - DynamoLlmResult::OK - }); - - result + unsafe { + *worker_instance_id_out = worker_id; + *token_ids_out = tokens_ptr; + *token_count_out = tokens.len(); + *annotated_request_json_out = cjson.into_raw(); + } + DynamoLlmResult::OK }) } From 21c9cff63a08c1c641c8073a7355fefcd9bb9967 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 16:31:22 -0700 Subject: [PATCH 15/18] add trace Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 8bd47e9054..e0e6bf9b82 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -441,6 +441,11 @@ fn ensure_host_started() -> Result<&'static Host, String> { router_replica_sync, resp, } => { + tracing::info!( + target: "capi", + "CreatePipeline ns={:?} component={:?} model={:?} use_kv_routing={:?} busy_threshold={:?} overlap_score_weight={:?} router_temperature={:?} use_kv_events={:?} router_replica_sync={:?}", + namespace, component, model, use_kv_routing, busy_threshold, overlap_score_weight, router_temperature, use_kv_events, router_replica_sync + ); let fut = async { let router_mode = if use_kv_routing { RouterMode::KV From 036d8789f4cb99fbe8a9472e83352d2c3138a1c9 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 17:02:17 -0700 Subject: [PATCH 16/18] disable kv-routing temp Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index e0e6bf9b82..1771c2e4a0 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -433,7 +433,7 @@ fn ensure_host_started() -> Result<&'static Host, String> { namespace, component, model, - use_kv_routing, + use_kv_routing: _, // TODO rm busy_threshold, overlap_score_weight, router_temperature, @@ -441,6 +441,7 @@ fn ensure_host_started() -> Result<&'static Host, String> { router_replica_sync, resp, } => { + let use_kv_routing = false; // TODO rm tracing::info!( target: "capi", "CreatePipeline ns={:?} component={:?} model={:?} use_kv_routing={:?} busy_threshold={:?} overlap_score_weight={:?} router_temperature={:?} use_kv_events={:?} router_replica_sync={:?}", @@ -467,6 +468,8 @@ fn ensure_host_started() -> Result<&'static Host, String> { } else { None }; + tracing::info!("Host creating pipeline: ns={namespace} component={component} mode={:?}", + if use_kv_routing { RouterMode::KV } else { RouterMode::RoundRobin }); let pipeline = create_worker_selection_pipeline_chat( &namespace, From a7deb6998c9b2a620a9eea9a1766c6b6fdc7463c Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 18:31:27 -0700 Subject: [PATCH 17/18] simpl to debug Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 65 +++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 1771c2e4a0..680997a8dd 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -436,58 +436,63 @@ fn ensure_host_started() -> Result<&'static Host, String> { use_kv_routing: _, // TODO rm busy_threshold, overlap_score_weight, - router_temperature, - use_kv_events, - router_replica_sync, + router_temperature: _, // we’ll force 0.0 TODO + use_kv_events: _, // we’ll force false TODO + router_replica_sync: _, // pick false for now TODO resp, } => { - let use_kv_routing = false; // TODO rm + // TODO rm + // Force KV mode + deterministic routing + let use_kv_routing = true; + let router_temperature = Some(0.0); + let use_kv_events = false; + let router_replica_sync = false; + tracing::info!( target: "capi", - "CreatePipeline ns={:?} component={:?} model={:?} use_kv_routing={:?} busy_threshold={:?} overlap_score_weight={:?} router_temperature={:?} use_kv_events={:?} router_replica_sync={:?}", + "CreatePipeline ns={:?} component={:?} model={:?} KV_ROUTING={:?} busy_threshold={:?} overlap_score_weight={:?} router_temperature={:?} use_kv_events={:?} router_replica_sync={:?}", namespace, component, model, use_kv_routing, busy_threshold, overlap_score_weight, router_temperature, use_kv_events, router_replica_sync ); - let fut = async { - let router_mode = if use_kv_routing { - RouterMode::KV - } else { - RouterMode::RoundRobin - }; - let kv_router_config = if use_kv_routing { + // 1) Build the pipeline inside a future that does NOT capture `pipelines` + let build = async move { + let router_mode = RouterMode::KV; + + let kv_router_config = { use dynamo_llm::kv_router::KvRouterConfig; Some(KvRouterConfig::new( - overlap_score_weight, - router_temperature, - Some(use_kv_events), - Some(router_replica_sync), - None, // max_num_batched_tokens - None, // router_snapshot_threshold - None, // router_reset_states + overlap_score_weight, // keep caller/defaults + router_temperature, // Some(0.0) => deterministic + Some(use_kv_events), // false + Some(router_replica_sync), // false + None, None, None, )) - } else { - None }; - tracing::info!("Host creating pipeline: ns={namespace} component={component} mode={:?}", - if use_kv_routing { RouterMode::KV } else { RouterMode::RoundRobin }); - let pipeline = create_worker_selection_pipeline_chat( + create_worker_selection_pipeline_chat( &namespace, - &component, + &component, // "backend" &model, router_mode, busy_threshold, kv_router_config, ) .await - .map_err(|e| format!("{e:?}"))?; + .map_err(|e| format!("{e:?}")) + }; - let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); - pipelines.insert(id, HeldPipeline { pipeline }); - Ok::(id) + // 2) Await the build, THEN insert into `pipelines` synchronously + let out = match build.await { + Ok(pipeline) => { + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + pipelines.insert(id, HeldPipeline { pipeline }); + Ok(id) + } + Err(err) => Err(err), }; - let _ = resp.send(fut.await); + // 3) Reply + let _ = resp.send(out); } Cmd::Query { From e2d202a040e5be0c219c1da25a1977f74eeafb2c Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 22 Sep 2025 19:10:49 -0700 Subject: [PATCH 18/18] runtime fix Signed-off-by: Anna Tchernych --- lib/bindings/c/src/lib.rs | 362 ++++++++++++++++++++------------------ 1 file changed, 190 insertions(+), 172 deletions(-) diff --git a/lib/bindings/c/src/lib.rs b/lib/bindings/c/src/lib.rs index 680997a8dd..71c793c3e7 100644 --- a/lib/bindings/c/src/lib.rs +++ b/lib/bindings/c/src/lib.rs @@ -28,11 +28,156 @@ static DRT: AsyncOnceCell = AsyncOnceCell::new(); // [FIXME] shouldn't the publisher be instance passing between API calls? static KV_PUB: OnceCell = OnceCell::new(); +/// Run async work on a fresh single-threaded Tokio runtime on a plain OS thread. +/// Uses `shutdown_background()` so dropping the runtime is safe from this thread. +fn run_on_local_runtime(op: impl FnOnce() -> Fut + Send + 'static) -> Result +where + Fut: std::future::Future> + Send + 'static, + T: Send + 'static, +{ + std::thread::spawn(move || -> Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("build local runtime: {e:?}"))?; + + let out = rt.block_on(op()); + rt.shutdown_background(); + out + }) + .join() + .map_err(|_| "panic in local runtime thread".to_string())? +} + +fn start_host_on_worker_runtime() -> Result<(), String> { + if HOST.get().is_some() { + return Ok(()); + } + + // channel the rest of the code uses to talk to the host loop + let (tx, mut rx) = tokio::sync::mpsc::channel::(128); + + // host loop on a dedicated OS thread + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("host runtime build failed"); + + rt.block_on(async move { + use std::collections::HashMap; + use std::sync::atomic::{AtomicU64, Ordering}; + static NEXT_ID: AtomicU64 = AtomicU64::new(1); + + struct HeldPipeline { + pipeline: ServiceEngine< + SingleIn, + ManyOut>, + >, + } + let mut pipelines: HashMap = HashMap::new(); + + while let Some(cmd) = rx.recv().await { + match cmd { + Cmd::CreatePipeline { + namespace, + component, + model, + use_kv_routing, + busy_threshold, + overlap_score_weight, + router_temperature, + use_kv_events, + router_replica_sync, + resp, + } => { + let out = async move { + let router_mode = if use_kv_routing { + RouterMode::KV + } else { + RouterMode::RoundRobin + }; + let kv_router_config = if use_kv_routing { + use dynamo_llm::kv_router::KvRouterConfig; + Some(KvRouterConfig::new( + overlap_score_weight, + router_temperature, + Some(use_kv_events), + Some(router_replica_sync), + None, + None, + None, + )) + } else { + None + }; + + let pipeline = create_worker_selection_pipeline_chat( + &namespace, + &component, + &model, + router_mode, + busy_threshold, + kv_router_config, + ) + .await + .map_err(|e| format!("{e:?}"))?; + + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + Ok::<(u64, _), String>((id, pipeline)) + } + .await; + + let res = match out { + Ok((id, pipeline)) => { + pipelines.insert(id, HeldPipeline { pipeline }); + Ok(id) + } + Err(err) => Err(err), + }; + let _ = resp.send(res); + } + + Cmd::Query { + pipeline_id, + request, + resp, + } => { + let out = async { + let hp = pipelines + .get(&pipeline_id) + .ok_or_else(|| "invalid pipeline id".to_string())?; + query_worker_selection_and_annotate(&hp.pipeline, request) + .await + .map_err(|e| format!("{e:?}")) + } + .await; + let _ = resp.send(out); + } + + Cmd::DestroyPipeline { pipeline_id, resp } => { + pipelines.remove(&pipeline_id); + let _ = resp.send(()); + } + } + } + }); + + // runtime drops here + }); + + HOST.set(Host { tx }) + .map_err(|_| "HOST already set".to_string()) +} + fn initialize_tracing() { // Sets up RUST_LOG environment variable for logging while KV Publishing // Example: os.environ["RUST_LOG"] = "debug" let subscriber = tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(true) + .with_thread_ids(true) + .with_level(true) .finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); @@ -59,32 +204,6 @@ fn ffi_guard DynamoLlmResult>(f: F) -> DynamoLlmResult { } } -/// Run async work on a fresh single-threaded Tokio runtime on a plain OS thread. -/// Uses `shutdown_background()` to avoid the "Cannot drop a runtime where blocking -/// is not allowed" panic during runtime drop. -fn run_on_local_runtime(op: impl FnOnce() -> Fut + Send + 'static) -> Result -where - Fut: std::future::Future> + Send + 'static, - T: Send + 'static, -{ - std::thread::spawn(move || -> Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| format!("build local runtime: {e:?}"))?; - - // run the future - let out = rt.block_on(op()); - - // IMPORTANT: avoid blocking shutdown on this thread - rt.shutdown_background(); - - out - }) - .join() - .map_err(|_| "panic in local runtime thread".to_string())? -} - /* ------------------------------ init ------------------------------ */ /// # Safety @@ -98,52 +217,72 @@ pub unsafe extern "C" fn dynamo_llm_init( ) -> DynamoLlmResult { ffi_guard(|| { initialize_tracing(); + tracing::info!(target: "capi", "init: begin"); + // 1) Ensure Worker exists (no spawn on its runtime) let wk = match WK.get_or_try_init(Worker::from_settings) { Ok(wk) => wk.clone(), Err(e) => { - eprintln!("Failed to initialize runtime: {:?}", e); + tracing::error!(target: "capi", error=?e, "Worker::from_settings failed"); return DynamoLlmResult::ERR; } }; - // Convert C strings to owned Rust Strings before we jump threads. - let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() { - Ok(s) => s.to_string(), + // 2) Start host loop thread/runtime + if let Err(e) = start_host_on_worker_runtime() { + tracing::error!(target: "capi", error=%e, "start_host_on_worker_runtime failed"); + return DynamoLlmResult::ERR; + } + + // 3) Parse inputs + let namespace = match CStr::from_ptr(namespace_c_str).to_str() { + Ok(s) => s.to_owned(), Err(e) => { - eprintln!("Failed to convert C string to Rust string: {:?}", e); + tracing::error!(target: "capi", error=?e, "bad namespace C string"); return DynamoLlmResult::ERR; } }; - let component = match unsafe { CStr::from_ptr(component_c_str) }.to_str() { - Ok(s) => s.to_string(), + let component = match CStr::from_ptr(component_c_str).to_str() { + Ok(s) => s.to_owned(), Err(e) => { - eprintln!("Failed to convert C string to Rust string: {:?}", e); + tracing::error!(target: "capi", error=?e, "bad component C string"); return DynamoLlmResult::ERR; } }; - - // Initialize DistributedRuntime on an isolated runtime/thread. - let rt = wk.runtime().clone(); - let drt_init_res = run_on_local_runtime(move || async move { - DRT.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await }) - .await - .map(|_| ()) - .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}")) + tracing::debug!(target: "capi", %namespace, %component, worker_id, kv_block_size, "init: parsed inputs"); + + // 4) Initialize DRT on a local runtime without borrowing `wk` across threads + let rt_handle = wk.runtime().clone(); // <- CLONE THE HANDLE + let drt_init = run_on_local_runtime(move || async move { + DRT.get_or_try_init(async { + DistributedRuntime::from_settings(rt_handle.clone()).await + }) + .await + .map(|_| ()) + .map_err(|e| format!("Failed to initialize distributed runtime: {e:?}")) }); - if let Err(msg) = drt_init_res { - eprintln!("{msg}"); + if let Err(msg) = drt_init { + tracing::error!(target: "capi", "DRT init failed: {msg}"); return DynamoLlmResult::ERR; } - - // Initialize the KV publisher once. - match KV_PUB.get_or_try_init(move || { - dynamo_create_kv_publisher(namespace, component, worker_id, kv_block_size) + tracing::info!(target: "capi", "DRT ready"); + + // 5) Initialize the KV publisher once + match KV_PUB.get_or_try_init({ + let namespace = namespace.clone(); + let component = component.clone(); + move || { + tracing::info!(target: "capi", %namespace, %component, worker_id, kv_block_size, "creating KvEventPublisher"); + dynamo_create_kv_publisher(namespace, component, worker_id, kv_block_size) + } }) { - Ok(_) => DynamoLlmResult::OK, + Ok(_) => { + tracing::info!(target: "capi", "KvEventPublisher ready"); + DynamoLlmResult::OK + } Err(e) => { - eprintln!("Failed to create KV publisher: {:?}", e); + tracing::error!(target: "capi", error=?e, "KvEventPublisher creation failed"); DynamoLlmResult::ERR } } @@ -404,128 +543,7 @@ struct Host { static HOST: OnceCell = OnceCell::new(); fn ensure_host_started() -> Result<&'static Host, String> { - HOST.get_or_try_init(|| -> Result { - let (tx, mut rx) = mpsc::channel::(128); - - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("host runtime build failed"); - - rt.block_on(async move { - use std::collections::HashMap; - use std::sync::atomic::{AtomicU64, Ordering}; - static NEXT_ID: AtomicU64 = AtomicU64::new(1); - - // Pipeline state lives inside the runtime - struct HeldPipeline { - pipeline: ServiceEngine< - SingleIn, - ManyOut>, - >, - } - let mut pipelines: HashMap = HashMap::new(); - - while let Some(cmd) = rx.recv().await { - match cmd { - Cmd::CreatePipeline { - namespace, - component, - model, - use_kv_routing: _, // TODO rm - busy_threshold, - overlap_score_weight, - router_temperature: _, // we’ll force 0.0 TODO - use_kv_events: _, // we’ll force false TODO - router_replica_sync: _, // pick false for now TODO - resp, - } => { - // TODO rm - // Force KV mode + deterministic routing - let use_kv_routing = true; - let router_temperature = Some(0.0); - let use_kv_events = false; - let router_replica_sync = false; - - tracing::info!( - target: "capi", - "CreatePipeline ns={:?} component={:?} model={:?} KV_ROUTING={:?} busy_threshold={:?} overlap_score_weight={:?} router_temperature={:?} use_kv_events={:?} router_replica_sync={:?}", - namespace, component, model, use_kv_routing, busy_threshold, overlap_score_weight, router_temperature, use_kv_events, router_replica_sync - ); - - // 1) Build the pipeline inside a future that does NOT capture `pipelines` - let build = async move { - let router_mode = RouterMode::KV; - - let kv_router_config = { - use dynamo_llm::kv_router::KvRouterConfig; - Some(KvRouterConfig::new( - overlap_score_weight, // keep caller/defaults - router_temperature, // Some(0.0) => deterministic - Some(use_kv_events), // false - Some(router_replica_sync), // false - None, None, None, - )) - }; - - create_worker_selection_pipeline_chat( - &namespace, - &component, // "backend" - &model, - router_mode, - busy_threshold, - kv_router_config, - ) - .await - .map_err(|e| format!("{e:?}")) - }; - - // 2) Await the build, THEN insert into `pipelines` synchronously - let out = match build.await { - Ok(pipeline) => { - let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); - pipelines.insert(id, HeldPipeline { pipeline }); - Ok(id) - } - Err(err) => Err(err), - }; - - // 3) Reply - let _ = resp.send(out); - } - - Cmd::Query { - pipeline_id, - request, - resp, - } => { - let fut = async { - let hp = pipelines - .get(&pipeline_id) - .ok_or_else(|| "invalid pipeline id".to_string())?; - - query_worker_selection_and_annotate(&hp.pipeline, request) - .await - .map_err(|e| format!("{e:?}")) - }; - - let _ = resp.send(fut.await); - } - - Cmd::DestroyPipeline { pipeline_id, resp } => { - pipelines.remove(&pipeline_id); - let _ = resp.send(()); - } - } - } - }); - - // runtime drops here when loop ends - }); - - Ok(Host { tx }) - }) + HOST.get().ok_or_else(|| "host not started".to_string()) } // Worker selection pipeline handle containing the actual pipeline