Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/_static/llama-stack-spec.html
Original file line number Diff line number Diff line change
Expand Up @@ -13616,6 +13616,10 @@
"unit": {
"type": "string",
"description": "The unit of measurement for the metric value"
},
"metric_type": {
"$ref": "#/components/schemas/MetricType",
"description": "The type of metric (optional, inferred if not provided for backwards compatibility)"
}
},
"additionalProperties": false,
Expand All @@ -13631,6 +13635,17 @@
"title": "MetricEvent",
"description": "A metric event containing a measured value."
},
"MetricType": {
"type": "string",
"enum": [
"counter",
"up_down_counter",
"histogram",
"gauge"
],
"title": "MetricType",
"description": "The type of metric being recorded."
},
"SpanEndPayload": {
"type": "object",
"properties": {
Expand Down
13 changes: 13 additions & 0 deletions docs/_static/llama-stack-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10122,6 +10122,10 @@ components:
type: string
description: >-
The unit of measurement for the metric value
metric_type:
$ref: '#/components/schemas/MetricType'
description: >-
The type of metric (optional, inferred if not provided for backwards compatibility)
additionalProperties: false
required:
- trace_id
Expand All @@ -10134,6 +10138,15 @@ components:
title: MetricEvent
description: >-
A metric event containing a measured value.
MetricType:
type: string
enum:
- counter
- up_down_counter
- histogram
- gauge
title: MetricType
description: The type of metric being recorded.
SpanEndPayload:
type: object
properties:
Expand Down
17 changes: 17 additions & 0 deletions llama_stack/apis/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ class EventType(Enum):
METRIC = "metric"


@json_schema_type
class MetricType(Enum):
"""The type of metric being recorded.
:cvar COUNTER: A counter metric that only increases (e.g., requests_total)
:cvar UP_DOWN_COUNTER: A counter that can increase or decrease (e.g., active_connections)
:cvar HISTOGRAM: A histogram metric for measuring distributions (e.g., request_duration_seconds)
:cvar GAUGE: A gauge metric for point-in-time values (e.g., cpu_usage_percent)
"""

COUNTER = "counter"
UP_DOWN_COUNTER = "up_down_counter"
HISTOGRAM = "histogram"
GAUGE = "gauge"


@json_schema_type
class LogSeverity(Enum):
"""The severity level of a log message.
Expand Down Expand Up @@ -143,12 +158,14 @@ class MetricEvent(EventCommon):
:param metric: The name of the metric being measured
:param value: The numeric value of the metric measurement
:param unit: The unit of measurement for the metric value
:param metric_type: The type of metric (optional, inferred if not provided for backwards compatibility)
"""

type: Literal[EventType.METRIC] = EventType.METRIC
metric: str # this would be an enum
value: int | float
unit: str
metric_type: MetricType | None = None


@json_schema_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

import asyncio
import copy
import json
import re
import secrets
import string
import time
import uuid
import warnings
from collections.abc import AsyncGenerator
from datetime import UTC, datetime

import httpx
from opentelemetry.trace import get_current_span

from llama_stack.apis.agents import (
AgentConfig,
Expand Down Expand Up @@ -60,6 +63,7 @@
UserMessage,
)
from llama_stack.apis.safety import Safety
from llama_stack.apis.telemetry import MetricEvent, MetricType, Telemetry
from llama_stack.apis.tools import ToolGroups, ToolInvocationResult, ToolRuntime
from llama_stack.apis.vector_io import VectorIO
from llama_stack.core.datatypes import AccessRule
Expand Down Expand Up @@ -97,6 +101,7 @@ def __init__(
tool_runtime_api: ToolRuntime,
tool_groups_api: ToolGroups,
vector_io_api: VectorIO,
telemetry_api: Telemetry | None,
persistence_store: KVStore,
created_at: str,
policy: list[AccessRule],
Expand All @@ -106,6 +111,7 @@ def __init__(
self.inference_api = inference_api
self.safety_api = safety_api
self.vector_io_api = vector_io_api
self.telemetry_api = telemetry_api
self.storage = AgentPersistence(agent_id, persistence_store, policy)
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
Expand All @@ -118,6 +124,9 @@ def __init__(
output_shields=agent_config.output_shields,
)

# Initialize workflow start time to None
self._workflow_start_time: float | None = None

def turn_to_messages(self, turn: Turn) -> list[Message]:
messages = []

Expand Down Expand Up @@ -167,6 +176,72 @@ def turn_to_messages(self, turn: Turn) -> list[Message]:
async def create_session(self, name: str) -> str:
return await self.storage.create_session(name)

def _emit_metric(
self,
metric_name: str,
value: int | float,
unit: str,
attributes: dict[str, str] | None = None,
metric_type: MetricType | None = None,
) -> None:
"""Emit a single metric event"""
logger.info(f"_emit_metric called: {metric_name} = {value} {unit}")

if not self.telemetry_api:
logger.warning(f"No telemetry_api available for metric {metric_name}")
return

span = get_current_span()
if not span:
logger.warning(f"No current span available for metric {metric_name}")
return

context = span.get_span_context()
metric = MetricEvent(
trace_id=format(context.trace_id, "x"),
span_id=format(context.span_id, "x"),
metric=metric_name,
value=value,
timestamp=time.time(),
unit=unit,
attributes={"agent_id": self.agent_id, **(attributes or {})},
metric_type=metric_type,
)

# Create task with name for better debugging and capture any async errors
task_name = f"metric-{metric_name}-{self.agent_id}"
logger.info(f"Creating telemetry task: {task_name}")
task = asyncio.create_task(self.telemetry_api.log_event(metric), name=task_name)

def _on_metric_task_done(t: asyncio.Task) -> None:
try:
exc = t.exception()
except asyncio.CancelledError:
logger.debug("Metric task %s was cancelled", task_name)
return
if exc is not None:
logger.warning("Metric task %s failed: %s", task_name, exc)

# Only add callback if task creation succeeded (not None from mocking)
if task is not None:
task.add_done_callback(_on_metric_task_done)

def _track_step(self):
logger.info("_track_step called")
self._emit_metric("llama_stack_agent_steps_total", 1, "1", metric_type=MetricType.COUNTER)

def _track_workflow(self, status: str, duration: float):
logger.info(f"_track_workflow called: status={status}, duration={duration:.2f}s")
self._emit_metric("llama_stack_agent_workflows_total", 1, "1", {"status": status}, MetricType.COUNTER)
self._emit_metric(
"llama_stack_agent_workflow_duration_seconds", duration, "s", metric_type=MetricType.HISTOGRAM
)

def _track_tool(self, tool_name: str):
logger.info(f"_track_tool called: {tool_name}")
normalized_name = "rag" if tool_name == "knowledge_search" else tool_name
self._emit_metric("llama_stack_agent_tool_calls_total", 1, "1", {"tool": normalized_name}, MetricType.COUNTER)

async def get_messages_from_turns(self, turns: list[Turn]) -> list[Message]:
messages = []
if self.agent_config.instructions != "":
Expand Down Expand Up @@ -201,6 +276,9 @@ async def resume_turn(self, request: AgentTurnResumeRequest) -> AsyncGenerator:
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)

# Set workflow start time for resume operations
self._workflow_start_time = time.time()

await self._initialize_tools()
async for chunk in self._run_turn(request):
yield chunk
Expand All @@ -212,6 +290,9 @@ async def _run_turn(
) -> AsyncGenerator:
assert request.stream is True, "Non-streaming not supported"

# Track workflow start time for metrics
self._workflow_start_time = time.time()

is_resume = isinstance(request, AgentTurnResumeRequest)
session_info = await self.storage.get_session_info(request.session_id)
if session_info is None:
Expand Down Expand Up @@ -313,6 +394,10 @@ async def _run_turn(
)
)
else:
# Track workflow completion when turn is actually complete
workflow_duration = time.time() - (self._workflow_start_time or time.time())
self._track_workflow("completed", workflow_duration)

chunk = AgentTurnResponseStreamChunk(
event=AgentTurnResponseEvent(
payload=AgentTurnResponseTurnCompletePayload(
Expand Down Expand Up @@ -726,6 +811,10 @@ async def _run(
)
)

# Track step execution metric
self._track_step()
self._track_tool(tool_call.tool_name)

# Add the result message to input_messages for the next iteration
input_messages.append(result_message)

Expand Down Expand Up @@ -900,6 +989,7 @@ async def execute_tool_call_maybe(
},
)
logger.debug(f"tool call {tool_name_str} completed with result: {result}")

return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
UserMessage,
)
from llama_stack.apis.safety import Safety
from llama_stack.apis.telemetry import Telemetry
from llama_stack.apis.tools import ToolGroups, ToolRuntime
from llama_stack.apis.vector_io import VectorIO
from llama_stack.core.datatypes import AccessRule
Expand All @@ -64,13 +65,15 @@ def __init__(
tool_runtime_api: ToolRuntime,
tool_groups_api: ToolGroups,
policy: list[AccessRule],
telemetry_api: Telemetry | None = None,
):
self.config = config
self.inference_api = inference_api
self.vector_io_api = vector_io_api
self.safety_api = safety_api
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
self.telemetry_api = telemetry_api

self.in_memory_store = InmemoryKVStoreImpl()
self.openai_responses_impl: OpenAIResponsesImpl | None = None
Expand Down Expand Up @@ -130,6 +133,7 @@ async def _get_agent_impl(self, agent_id: str) -> ChatAgent:
vector_io_api=self.vector_io_api,
tool_runtime_api=self.tool_runtime_api,
tool_groups_api=self.tool_groups_api,
telemetry_api=self.telemetry_api,
persistence_store=(
self.persistence_store if agent_info.enable_session_persistence else self.in_memory_store
),
Expand Down
Loading
Loading