Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/bindings/c/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repository.workspace = true

[lib]
name = "dynamo_llm_capi"
crate-type = ["cdylib"]
crate-type = ["cdylib", "staticlib"]

[build-dependencies]
cbindgen = "0.27"
Expand Down
328 changes: 326 additions & 2 deletions lib/bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ use once_cell::sync::OnceCell;
use std::ffi::CStr;
use std::sync::atomic::{AtomicU32, Ordering};

use dynamo_llm::entrypoint::input::worker_selection_pipeline::{
create_worker_selection_pipeline_chat, query_worker_selection_and_annotate,
};
use dynamo_llm::kv_router::{
indexer::compute_block_hash_for_seq, protocols::*, publisher::KvEventPublisher,
};
use dynamo_runtime::{DistributedRuntime, Worker};
use dynamo_llm::protocols::common::llm_backend::LLMEngineOutput;
use dynamo_llm::types::{Annotated, openai::chat_completions::NvCreateChatCompletionRequest};
use dynamo_runtime::{
DistributedRuntime, Worker,
pipeline::{ManyOut, RouterMode, ServiceEngine, SingleIn},
};
static WK: OnceCell<Worker> = OnceCell::new();
static DRT: AsyncOnceCell<DistributedRuntime> = AsyncOnceCell::new();
// [FIXME] shouldn't the publisher be instance passing between API calls?
Expand Down Expand Up @@ -293,7 +301,6 @@ pub extern "C" fn dynamo_kv_event_publish_removed(
}
}
}

// Need to setup etcd and nats to run these tests
// #[cfg(test)]
// mod tests {
Expand Down Expand Up @@ -324,3 +331,320 @@ pub extern "C" fn dynamo_kv_event_publish_removed(
// assert_eq!(shutdown_result as u32, DynamoLlmResult::OK as u32);
// }
// }

//
// Apis related to the best worker selection.
// Worker selection pipeline handle containing the actual pipeline
pub struct WorkerSelectionPipeline {
pipeline:
ServiceEngine<SingleIn<NvCreateChatCompletionRequest>, ManyOut<Annotated<LLMEngineOutput>>>,
}

/// C FFI wrapper for creating a worker selection pipeline
///
/// Returns a pipeline handle that can be used repeatedly for queries.
/// Call dynamo_destroy_worker_selection_pipeline when done.
/// Uses the "generate" endpoint by default.
///
/// # Safety
/// The namespace_c_str, component_c_str, and model_name_c_str
/// are passed as pointers to C strings
///
/// # KV Router Configuration Parameters
/// - overlap_score_weight: Weight for KV cache overlap in worker selection (use negative value for default)
/// - router_temperature: Randomness in worker selection, 0.0 = deterministic (use negative value for default)
/// - use_kv_events: Whether to use KV cache events for tracking
/// - router_replica_sync: Whether to synchronize router state across replicas
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline(
namespace_c_str: *const c_char,
component_c_str: *const c_char,
model_name_c_str: *const c_char,
use_kv_routing: bool,
busy_threshold: f64, // Use negative value to indicate None
overlap_score_weight: f64, // Use negative value for default
router_temperature: f64, // Use negative value for default
use_kv_events: bool,
router_replica_sync: bool,
pipeline_out: *mut *mut WorkerSelectionPipeline,
) -> DynamoLlmResult {
let wk = match WK.get() {
Some(wk) => wk,
None => {
eprintln!("Runtime not initialized - call dynamo_llm_init first");
return DynamoLlmResult::ERR;
}
};

let rt = wk.runtime();
let secondary = rt.secondary().clone();

let result = secondary.block_on(async {
// Convert C strings to Rust strings
let namespace = match unsafe { CStr::from_ptr(namespace_c_str) }.to_str() {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to convert namespace C string: {:?}", e);
return DynamoLlmResult::ERR;
}
};

let component_name = match unsafe { CStr::from_ptr(component_c_str) }.to_str() {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to convert component C string: {:?}", e);
return DynamoLlmResult::ERR;
}
};

let model_name = match unsafe { CStr::from_ptr(model_name_c_str) }.to_str() {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to convert model_name C string: {:?}", e);
return DynamoLlmResult::ERR;
}
};

// Determine router mode and busy threshold
let router_mode = if use_kv_routing {
RouterMode::KV
} else {
RouterMode::RoundRobin
};

let busy_threshold_opt = if busy_threshold < 0.0 {
None
} else {
Some(busy_threshold)
};

// Create KV router config if using KV routing
let kv_router_config = if use_kv_routing {
use dynamo_llm::kv_router::KvRouterConfig;

Some(KvRouterConfig::new(
if overlap_score_weight < 0.0 {
None
} else {
Some(overlap_score_weight)
},
if router_temperature < 0.0 {
None
} else {
Some(router_temperature)
},
Some(use_kv_events),
Some(router_replica_sync),
None, // max_num_batched_tokens - use default
None, // router_snapshot_threshold - use default
None, // router_reset_states - use default
))
} else {
None
};

// Create the worker selection pipeline
let pipeline = match create_worker_selection_pipeline_chat(
namespace,
component_name,
model_name,
router_mode,
busy_threshold_opt,
kv_router_config,
)
.await
{
Ok(pipeline) => pipeline,
Err(e) => {
eprintln!("Failed to create worker selection pipeline: {:?}", e);
return DynamoLlmResult::ERR;
}
};

// Wrap pipeline in handle struct and box it
let pipeline_handle = WorkerSelectionPipeline { pipeline };
let boxed_pipeline = Box::new(pipeline_handle);
let pipeline_ptr = Box::into_raw(boxed_pipeline);

unsafe {
*pipeline_out = pipeline_ptr;
}

DynamoLlmResult::OK
});

result
}

/// Destroy a worker selection pipeline and free its memory
///
/// # Safety
/// The pipeline pointer must be valid and should not be used after this call
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dynamo_destroy_worker_selection_pipeline(
pipeline: *mut WorkerSelectionPipeline,
) -> DynamoLlmResult {
if pipeline.is_null() {
eprintln!("Pipeline pointer is null");
return DynamoLlmResult::ERR;
}

// Convert back to Box with correct type and let it drop to free memory
let _boxed_pipeline: Box<WorkerSelectionPipeline> = unsafe { Box::from_raw(pipeline) };

DynamoLlmResult::OK
}

/// Query worker selection and return annotated request
///
/// This function takes an original OpenAI request (as JSON), adds query_instance_id annotation,
/// runs it through the pipeline to get worker selection, then returns the worker_id, tokens,
/// and the original request annotated with worker_instance_id and token_data.
///
/// # Safety
/// All pointer parameters must be valid. The request_json_c_str must be a valid C string
/// containing valid JSON. The caller is responsible for freeing the allocated memory
/// for token_ids_out and annotated_request_json_out.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dynamo_query_worker_selection_and_annotate(
pipeline: *mut WorkerSelectionPipeline,
request_json_c_str: *const c_char,
worker_instance_id_out: *mut i64,
token_ids_out: *mut *mut u32,
token_count_out: *mut usize,
annotated_request_json_out: *mut *mut c_char,
) -> DynamoLlmResult {
if pipeline.is_null() {
eprintln!("Pipeline pointer is null");
return DynamoLlmResult::ERR;
}

let wk = match WK.get() {
Some(wk) => wk,
None => {
eprintln!("Runtime not initialized - call dynamo_llm_init first");
return DynamoLlmResult::ERR;
}
};

let rt = wk.runtime();
let secondary = rt.secondary().clone();

let result = secondary.block_on(async {
// Convert C string to Rust string
let request_json = match unsafe { CStr::from_ptr(request_json_c_str) }.to_str() {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to convert request JSON C string: {:?}", e);
return DynamoLlmResult::ERR;
}
};

// Parse JSON into NvCreateChatCompletionRequest
let original_request: NvCreateChatCompletionRequest =
match serde_json::from_str(request_json) {
Ok(req) => req,
Err(e) => {
eprintln!("Failed to parse request JSON: {:?}", e);
return DynamoLlmResult::ERR;
}
};

// Get pipeline reference
let pipeline_ref = unsafe { &*pipeline };

// Call the wrapper function
let (worker_id, tokens, annotated_request) =
match query_worker_selection_and_annotate(&pipeline_ref.pipeline, original_request)
.await
{
Ok(result) => result,
Err(e) => {
eprintln!("Failed to query worker selection: {:?}", e);
return DynamoLlmResult::ERR;
}
};

// Convert annotated request back to JSON
let annotated_json = match serde_json::to_string(&annotated_request) {
Ok(json) => json,
Err(e) => {
eprintln!("Failed to serialize annotated request: {:?}", e);
return DynamoLlmResult::ERR;
}
};

// Allocate memory for tokens array
let tokens_ptr = if tokens.is_empty() {
std::ptr::null_mut()
} else {
let tokens_len = tokens.len();
let layout = std::alloc::Layout::array::<u32>(tokens_len).unwrap();
let ptr = unsafe { std::alloc::alloc(layout) as *mut u32 };
if ptr.is_null() {
eprintln!("Failed to allocate memory for tokens");
return DynamoLlmResult::ERR;
}
// Copy tokens to allocated memory
unsafe { std::ptr::copy_nonoverlapping(tokens.as_ptr(), ptr, tokens_len) };
ptr
};

// Allocate memory for annotated request JSON string
let json_cstring = match std::ffi::CString::new(annotated_json) {
Ok(cstr) => cstr,
Err(e) => {
eprintln!("Failed to create C string for annotated JSON: {:?}", e);
if !tokens_ptr.is_null() {
let layout = std::alloc::Layout::array::<u32>(tokens.len()).unwrap();
unsafe { std::alloc::dealloc(tokens_ptr as *mut u8, layout) };
}
return DynamoLlmResult::ERR;
}
};

// Set output parameters
unsafe {
*worker_instance_id_out = worker_id;
*token_ids_out = tokens_ptr;
*token_count_out = tokens.len();
*annotated_request_json_out = json_cstring.into_raw();
}

DynamoLlmResult::OK
});

result
}

/// Free memory allocated by dynamo_query_worker_selection_and_annotate
///
/// # Safety
/// The pointers must have been allocated by dynamo_query_worker_selection_and_annotate
/// and should not be used after this call.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dynamo_free_worker_selection_result(
token_ids: *mut u32,
token_count: usize,
annotated_request_json: *mut c_char,
) -> DynamoLlmResult {
// Free tokens array if not null
if !token_ids.is_null() && token_count > 0 {
let layout = match std::alloc::Layout::array::<u32>(token_count) {
Ok(layout) => layout,
Err(_) => {
eprintln!("Invalid layout for tokens array");
return DynamoLlmResult::ERR;
}
};
unsafe { std::alloc::dealloc(token_ids as *mut u8, layout) };
}

// Free JSON string if not null
if !annotated_request_json.is_null() {
let _cstring = unsafe { std::ffi::CString::from_raw(annotated_request_json) };
// CString will be automatically freed when it goes out of scope
}

DynamoLlmResult::OK
}
1 change: 1 addition & 0 deletions lib/llm/src/entrypoint/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod endpoint;
pub mod grpc;
pub mod http;
pub mod text;
pub mod worker_selection_pipeline;

use dynamo_runtime::protocols::ENDPOINT_SCHEME;
use either::Either;
Expand Down
Loading
Loading