Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
router_config.busy_threshold,
target_namespace,
Arc::new(http_service.clone()),
http_service.state().metrics_clone(),
)
.await?;
}
Expand Down Expand Up @@ -217,7 +218,11 @@ async fn run_watcher(
busy_threshold: Option<f64>,
target_namespace: Option<String>,
http_service: Arc<HttpService>,
metrics: Arc<crate::http::service::metrics::Metrics>,
) -> anyhow::Result<()> {
// Clone model_manager before it's moved into ModelWatcher
let model_manager_clone = model_manager.clone();

let mut watch_obj = ModelWatcher::new(
runtime,
model_manager,
Expand All @@ -234,11 +239,22 @@ async fn run_watcher(

watch_obj.set_notify_on_model_update(tx);

// Spawn a task to watch for model type changes and update HTTP service endpoints
// Spawn a task to watch for model type changes and update HTTP service endpoints and metrics
let _endpoint_enabler_task = tokio::spawn(async move {
while let Some(model_type) = rx.recv().await {
tracing::debug!("Received model type update: {:?}", model_type);

// Update HTTP endpoints (existing functionality)
update_http_endpoints(http_service.clone(), model_type);

// Update metrics (only for added models)
update_model_metrics(
model_type,
model_manager_clone.clone(),
metrics.clone(),
Some(etcd_client.clone()),
)
.await;
}
});

Expand Down Expand Up @@ -271,3 +287,46 @@ fn update_http_endpoints(service: Arc<HttpService>, model_type: ModelUpdate) {
}
}
}

/// Updates metrics for model type changes
async fn update_model_metrics(
model_type: ModelUpdate,
model_manager: Arc<ModelManager>,
metrics: Arc<crate::http::service::metrics::Metrics>,
etcd_client: Option<etcd::Client>,
) {
match model_type {
ModelUpdate::Added(model_type) => {
tracing::debug!("Updating metrics for added model type: {:?}", model_type);

// Get all model entries and update metrics for matching types
let model_entries = model_manager.get_model_entries();
for entry in model_entries {
if entry.model_type == model_type {
// Update runtime config metrics if available
if let Some(runtime_config) = &entry.runtime_config {
metrics.update_runtime_config_metrics(&entry.name, runtime_config);
}

// Update MDC metrics if etcd is available
if let Some(ref etcd) = etcd_client
&& let Err(e) = metrics
.update_metrics_from_model_entry_with_mdc(&entry, etcd)
.await
{
tracing::debug!(
model = %entry.name,
error = %e,
"Failed to update MDC metrics for newly added model"
);
}
}
}
}
ModelUpdate::Removed(model_type) => {
tracing::debug!("Model type removed: {:?}", model_type);
// Note: Metrics are typically not removed to preserve historical data
// This matches the behavior in the polling task
}
}
}
154 changes: 13 additions & 141 deletions lib/llm/src/http/service/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,37 +472,6 @@ impl Metrics {
}
}

/// Update model deployment card metrics for a model
/// This should be called when model deployment card information is available
pub fn update_mdc_metrics(
&self,
model_name: &str,
context_length: u32,
kv_cache_block_size: u32,
migration_limit: u32,
) {
self.model_context_length
.with_label_values(&[model_name])
.set(context_length as i64);

self.model_kv_cache_block_size
.with_label_values(&[model_name])
.set(kv_cache_block_size as i64);

self.model_migration_limit
.with_label_values(&[model_name])
.set(migration_limit as i64);
}

/// Update metrics from a ModelEntry
/// This is a convenience method that extracts runtime config from a ModelEntry
/// and updates the appropriate metrics
pub fn update_metrics_from_model_entry(&self, model_entry: &ModelEntry) {
if let Some(runtime_config) = &model_entry.runtime_config {
self.update_runtime_config_metrics(&model_entry.name, runtime_config);
}
}

/// Update metrics from a ModelEntry and its ModelDeploymentCard
/// This updates both runtime config metrics and MDC-specific metrics
pub async fn update_metrics_from_model_entry_with_mdc(
Expand All @@ -525,12 +494,19 @@ impl Metrics {
.await
{
Ok(Some(mdc)) => {
self.update_mdc_metrics(
&model_entry.name,
mdc.context_length,
mdc.kv_cache_block_size,
mdc.migration_limit,
);
// Inline MDC metrics update
self.model_context_length
.with_label_values(&[&model_entry.name])
.set(mdc.context_length as i64);

self.model_kv_cache_block_size
.with_label_values(&[&model_entry.name])
.set(mdc.kv_cache_block_size as i64);

self.model_migration_limit
.with_label_values(&[&model_entry.name])
.set(mdc.migration_limit as i64);

tracing::debug!(
model = %model_entry.name,
"Successfully updated MDC metrics"
Expand All @@ -554,110 +530,6 @@ impl Metrics {
Ok(())
}

/// Start a background task that periodically updates runtime config metrics
///
/// ## Why Polling is Required
///
/// Polling is necessary because new models may come online at any time through the distributed
/// discovery system. The ModelManager is continuously updated as workers register/deregister
/// with etcd, and we need to periodically check for these changes to expose their metrics.
///
/// ## Behavior
///
/// - Polls the ModelManager for current models and updates metrics accordingly
/// - Models are never removed from metrics to preserve historical data
/// - If multiple model instances have the same name, only the first instance's metrics are used
/// - Subsequent instances with duplicate names will be skipped
///
/// ## MDC (Model Deployment Card) Behavior
///
/// Currently, we don't overwrite an MDC. The first worker to start wins, and we assume
/// that all other workers claiming to serve that model really are using the same configuration.
/// Later, every worker will have its own MDC, and the frontend will validate that they
/// checksum the same. For right now, you can assume they have the same MDC, because
/// they aren't allowed to change it.
///
/// The task will run until the provided cancellation token is cancelled.
pub fn start_runtime_config_polling_task(
metrics: Arc<Self>,
manager: Arc<crate::discovery::ModelManager>,
etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
poll_interval: Duration,
cancel_token: tokio_util::sync::CancellationToken,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(poll_interval);
let mut known_models = std::collections::HashSet::new();

tracing::info!(
interval_secs = poll_interval.as_secs(),
"Starting runtime config metrics polling task (metrics never removed)"
);

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
tracing::info!("Runtime config metrics polling task cancelled");
break;
}
_ = interval.tick() => {
// Continue with polling logic
}
}

// Get current model entries from the manager
let current_entries = manager.get_model_entries();
let mut current_models = std::collections::HashSet::new();

// Note: If multiple model instances have the same name, only the first instance's config metrics are recorded.
// Subsequent instances with duplicate names will be skipped for config updates.
// This is based on the assumption that all workers serving the same model have identical
// configuration values (MDC content, runtime config, etc.). This assumption holds because
// workers are not allowed to change their configuration after registration.

// Update configuration metrics for current models
for entry in current_entries {
// Skip config processing if we've already seen this model name
if !current_models.insert(entry.name.clone()) {
tracing::debug!(
model_name = %entry.name,
endpoint = ?entry.endpoint_id,
"Skipping duplicate model instance - only first instance config metrics are recorded"
);
continue;
}

// Update runtime config metrics if available
if let Some(runtime_config) = &entry.runtime_config {
metrics.update_runtime_config_metrics(&entry.name, runtime_config);
}

// Optionally load MDC for additional metrics if etcd is available
if let Some(ref etcd) = etcd_client
&& let Err(e) = metrics
.update_metrics_from_model_entry_with_mdc(&entry, etcd)
.await
{
tracing::debug!(
model = %entry.name,
error = %e,
"Failed to update MDC metrics (this is normal if MDC is not available)"
);
}
}

// Update our known models set
known_models.extend(current_models.iter().cloned());

tracing::trace!(
active_models = current_models.len(),
total_known_models = known_models.len(),
"Updated runtime config metrics for active models"
);
}
})
}

/// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
/// and the kind of endpoint that was hit
///
Expand Down
24 changes: 2 additions & 22 deletions lib/llm/src/http/service/service_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ pub struct HttpService {
tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>,
route_docs: Vec<RouteDoc>,

// Metrics polling configuration
etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
}

#[derive(Clone, Builder)]
Expand Down Expand Up @@ -204,22 +201,6 @@ impl HttpService {
let protocol = if self.enable_tls { "HTTPS" } else { "HTTP" };
tracing::info!(protocol, address, "Starting HTTP(S) service");

// Start background task to poll runtime config metrics with proper cancellation
let poll_interval_secs = std::env::var("DYN_HTTP_SVC_CONFIG_METRICS_POLL_INTERVAL_SECS")
.ok()
.and_then(|s| s.parse::<f64>().ok())
.filter(|&secs| secs > 0.0) // Guard against zero or negative values
.unwrap_or(8.0);
let poll_interval = Duration::from_secs_f64(poll_interval_secs);

let _polling_task = super::metrics::Metrics::start_runtime_config_polling_task(
self.state.metrics_clone(),
self.state.manager_clone(),
self.etcd_client.clone(),
poll_interval,
cancel_token.child_token(),
);

let router = self.router.clone();
let observer = cancel_token.child_token();

Expand Down Expand Up @@ -313,8 +294,8 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?;

let model_manager = Arc::new(ModelManager::new());
let etcd_client = config.etcd_client.clone();
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));
let etcd_client = config.etcd_client;
let state = Arc::new(State::new_with_etcd(model_manager, etcd_client));

state
.flags
Expand Down Expand Up @@ -366,7 +347,6 @@ impl HttpServiceConfigBuilder {
tls_cert_path: config.tls_cert_path,
tls_key_path: config.tls_key_path,
route_docs: all_docs,
etcd_client,
})
}

Expand Down
Loading
Loading