Skip to content

Commit 8206a3d

Browse files
committed
fix(telemetry): added mcp tracing context propagation
1 parent 3d526f2 commit 8206a3d

File tree

5 files changed

+648
-2
lines changed

5 files changed

+648
-2
lines changed

src/strands/models/anthropic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ async def structured_output(
414414
stop_reason, messages, _, _ = event["stop"]
415415

416416
if stop_reason != "tool_use":
417-
raise ValueError(f"Model returned stop_reason: {stop_reason} instead of \"tool_use\".")
417+
raise ValueError(f'Model returned stop_reason: {stop_reason} instead of "tool_use".')
418418

419419
content = messages["content"]
420420
output_response: dict[str, Any] | None = None

src/strands/models/bedrock.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ async def structured_output(
631631
stop_reason, messages, _, _ = event["stop"]
632632

633633
if stop_reason != "tool_use":
634-
raise ValueError(f"Model returned stop_reason: {stop_reason} instead of \"tool_use\".")
634+
raise ValueError(f'Model returned stop_reason: {stop_reason} instead of "tool_use".')
635635

636636
content = messages["content"]
637637
output_response: dict[str, Any] | None = None

src/strands/tools/mcp/mcp_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from ...types.media import ImageFormat
3030
from ...types.tools import ToolResultContent, ToolResultStatus
3131
from .mcp_agent_tool import MCPAgentTool
32+
from .mcp_instrumentation import mcp_instrumentation
3233
from .mcp_types import MCPToolResult, MCPTransport
3334

3435
logger = logging.getLogger(__name__)
@@ -68,6 +69,7 @@ def __init__(self, transport_callable: Callable[[], MCPTransport]):
6869
Args:
6970
transport_callable: A callable that returns an MCPTransport (read_stream, write_stream) tuple
7071
"""
72+
mcp_instrumentation()
7173
self._session_id = uuid.uuid4()
7274
self._log_debug_with_thread("initializing MCPClient connection")
7375
self._init_future: futures.Future[None] = futures.Future() # Main thread blocks until future completes
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
"""OpenTelemetry instrumentation for Model Context Protocol (MCP) tracing.
2+
3+
Enables distributed tracing across MCP client-server boundaries by injecting
4+
OpenTelemetry context into MCP request metadata (_meta field) and extracting
5+
it on the server side, creating unified traces that span from agent calls
6+
through MCP tool executions.
7+
8+
Based on: https://github.com/traceloop/openllmetry/tree/main/packages/opentelemetry-instrumentation-mcp
9+
Related issue: https://github.com/modelcontextprotocol/modelcontextprotocol/issues/246
10+
"""
11+
12+
from contextlib import _AsyncGeneratorContextManager, asynccontextmanager
13+
from dataclasses import dataclass
14+
from typing import Any, AsyncGenerator, Callable, Tuple
15+
16+
from mcp.shared.message import SessionMessage
17+
from mcp.types import JSONRPCMessage, JSONRPCRequest
18+
from opentelemetry import context, propagate
19+
from wrapt import ObjectProxy, register_post_import_hook, wrap_function_wrapper
20+
21+
22+
@dataclass(slots=True, frozen=True)
23+
class ItemWithContext:
24+
"""Wrapper for items that need to carry OpenTelemetry context.
25+
26+
Used to preserve tracing context across async boundaries in MCP sessions,
27+
ensuring that distributed traces remain connected even when messages are
28+
processed asynchronously.
29+
30+
Attributes:
31+
item: The original item being wrapped
32+
ctx: The OpenTelemetry context associated with the item
33+
"""
34+
35+
item: Any
36+
ctx: context.Context
37+
38+
39+
def mcp_instrumentation() -> None:
40+
"""Apply OpenTelemetry instrumentation patches to MCP components.
41+
42+
This function instruments three key areas of MCP communication:
43+
1. Client-side: Injects tracing context into tool call requests
44+
2. Transport-level: Extracts context from incoming messages
45+
3. Session-level: Manages bidirectional context flow
46+
47+
The patches enable distributed tracing by:
48+
- Adding OpenTelemetry context to the _meta field of MCP requests
49+
- Extracting and activating context on the server side
50+
- Preserving context across async message processing boundaries
51+
"""
52+
53+
def patch_mcp_client(wrapped: Callable[..., Any], instance: Any, args: Any, kwargs: Any) -> Any:
54+
"""Patch MCP client to inject OpenTelemetry context into tool calls.
55+
56+
Intercepts outgoing MCP requests and injects the current OpenTelemetry
57+
context into the request's _meta field for tools/call methods. This
58+
enables server-side context extraction and trace continuation.
59+
60+
Args:
61+
wrapped: The original function being wrapped
62+
instance: The instance the method is being called on
63+
args: Positional arguments to the wrapped function
64+
kwargs: Keyword arguments to the wrapped function
65+
66+
Returns:
67+
Result of the wrapped function call
68+
"""
69+
if len(args) < 1:
70+
return wrapped(*args, **kwargs)
71+
72+
request = args[0]
73+
method = getattr(request.root, "method", None)
74+
75+
if method != "tools/call":
76+
return wrapped(*args, **kwargs)
77+
78+
try:
79+
if hasattr(request.root, "params") and request.root.params:
80+
# Convert existing params to dict
81+
if hasattr(request.root.params, "model_dump"):
82+
params_dict = request.root.params.model_dump()
83+
elif hasattr(request.root.params, "__dict__"):
84+
params_dict = dict(request.root.params.__dict__)
85+
else:
86+
params_dict = {}
87+
else:
88+
params_dict = {}
89+
90+
# Add _meta with tracing context
91+
meta = params_dict.setdefault("_meta", {})
92+
propagate.get_global_textmap().inject(meta)
93+
94+
# Replace params with dict version (this is what the working instrumentation does)
95+
request.root.params = params_dict
96+
97+
return wrapped(*args, **kwargs)
98+
99+
except Exception:
100+
return wrapped(*args, **kwargs)
101+
102+
def transport_wrapper() -> Callable[
103+
[Callable[..., Any], Any, Any, Any], _AsyncGeneratorContextManager[tuple[Any, Any]]
104+
]:
105+
"""Create a wrapper for MCP transport connections.
106+
107+
Returns a context manager that wraps transport read/write streams
108+
with context extraction capabilities. The wrapped reader will
109+
automatically extract OpenTelemetry context from incoming messages.
110+
111+
Returns:
112+
An async context manager that yields wrapped transport streams
113+
"""
114+
115+
@asynccontextmanager
116+
async def traced_method(
117+
wrapped: Callable[..., Any], instance: Any, args: Any, kwargs: Any
118+
) -> AsyncGenerator[Tuple[Any, Any], None]:
119+
async with wrapped(*args, **kwargs) as result:
120+
try:
121+
read_stream, write_stream = result
122+
except ValueError:
123+
read_stream, write_stream, _ = result
124+
yield TransportContextExtractingReader(read_stream), write_stream
125+
126+
return traced_method
127+
128+
def session_init_wrapper() -> Callable[[Any, Any, Tuple[Any, ...], dict[str, Any]], None]:
129+
"""Create a wrapper for MCP session initialization.
130+
131+
Wraps session message streams to enable bidirectional context flow.
132+
The reader extracts and activates context, while the writer preserves
133+
context for async processing.
134+
135+
Returns:
136+
A function that wraps session initialization
137+
"""
138+
139+
def traced_method(
140+
wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], kwargs: dict[str, Any]
141+
) -> None:
142+
wrapped(*args, **kwargs)
143+
reader = getattr(instance, "_incoming_message_stream_reader", None)
144+
writer = getattr(instance, "_incoming_message_stream_writer", None)
145+
if reader and writer:
146+
instance._incoming_message_stream_reader = SessionContextAttachingReader(reader)
147+
instance._incoming_message_stream_writer = SessionContextSavingWriter(writer)
148+
149+
return traced_method
150+
151+
# Apply patches
152+
wrap_function_wrapper("mcp.shared.session", "BaseSession.send_request", patch_mcp_client)
153+
154+
register_post_import_hook(
155+
lambda _: wrap_function_wrapper(
156+
"mcp.server.streamable_http", "StreamableHTTPServerTransport.connect", transport_wrapper()
157+
),
158+
"mcp.server.streamable_http",
159+
)
160+
161+
register_post_import_hook(
162+
lambda _: wrap_function_wrapper("mcp.server.session", "ServerSession.__init__", session_init_wrapper()),
163+
"mcp.server.session",
164+
)
165+
166+
167+
class TransportContextExtractingReader(ObjectProxy):
168+
"""A proxy reader that extracts OpenTelemetry context from MCP messages.
169+
170+
Wraps an async message stream reader to automatically extract and activate
171+
OpenTelemetry context from the _meta field of incoming MCP requests. This
172+
enables server-side trace continuation from client-injected context.
173+
174+
The reader handles both SessionMessage and JSONRPCMessage formats, and
175+
supports both dict and Pydantic model parameter structures.
176+
"""
177+
178+
def __init__(self, wrapped: Any) -> None:
179+
"""Initialize the context-extracting reader.
180+
181+
Args:
182+
wrapped: The original async stream reader to wrap
183+
"""
184+
super().__init__(wrapped)
185+
186+
async def __aenter__(self) -> Any:
187+
"""Enter the async context manager by delegating to the wrapped object."""
188+
return await self.__wrapped__.__aenter__()
189+
190+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
191+
"""Exit the async context manager by delegating to the wrapped object."""
192+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
193+
194+
async def __aiter__(self) -> AsyncGenerator[Any, None]:
195+
"""Iterate over messages, extracting and activating context as needed.
196+
197+
For each incoming message, checks if it contains tracing context in
198+
the _meta field. If found, extracts and activates the context for
199+
the duration of message processing, then properly detaches it.
200+
201+
Yields:
202+
Messages from the wrapped stream, processed under the appropriate
203+
OpenTelemetry context
204+
"""
205+
async for item in self.__wrapped__:
206+
if isinstance(item, SessionMessage):
207+
request = item.message.root
208+
elif type(item) is JSONRPCMessage:
209+
request = item.root
210+
else:
211+
yield item
212+
continue
213+
214+
if isinstance(request, JSONRPCRequest) and request.params:
215+
# Handle both dict and Pydantic model params
216+
meta = request.params.get("_meta")
217+
218+
if meta:
219+
extracted_context = propagate.extract(meta)
220+
restore = context.attach(extracted_context)
221+
try:
222+
yield item
223+
continue
224+
finally:
225+
context.detach(restore)
226+
yield item
227+
228+
229+
class SessionContextSavingWriter(ObjectProxy):
230+
"""A proxy writer that preserves OpenTelemetry context with outgoing items.
231+
232+
Wraps an async message stream writer to capture the current OpenTelemetry
233+
context and associate it with outgoing items. This enables context
234+
preservation across async boundaries in MCP session processing.
235+
"""
236+
237+
def __init__(self, wrapped: Any) -> None:
238+
"""Initialize the context-saving writer.
239+
240+
Args:
241+
wrapped: The original async stream writer to wrap
242+
"""
243+
super().__init__(wrapped)
244+
245+
async def __aenter__(self) -> Any:
246+
"""Enter the async context manager by delegating to the wrapped object."""
247+
return await self.__wrapped__.__aenter__()
248+
249+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
250+
"""Exit the async context manager by delegating to the wrapped object."""
251+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
252+
253+
async def send(self, item: Any) -> Any:
254+
"""Send an item while preserving the current OpenTelemetry context.
255+
256+
Captures the current context and wraps the item with it, enabling
257+
the receiving side to restore the appropriate tracing context.
258+
259+
Args:
260+
item: The item to send through the stream
261+
262+
Returns:
263+
Result of sending the wrapped item
264+
"""
265+
ctx = context.get_current()
266+
return await self.__wrapped__.send(ItemWithContext(item, ctx))
267+
268+
269+
class SessionContextAttachingReader(ObjectProxy):
270+
"""A proxy reader that restores OpenTelemetry context from wrapped items.
271+
272+
Wraps an async message stream reader to detect ItemWithContext instances
273+
and restore their associated OpenTelemetry context during processing.
274+
This completes the context preservation cycle started by SessionContextSavingWriter.
275+
"""
276+
277+
def __init__(self, wrapped: Any) -> None:
278+
"""Initialize the context-attaching reader.
279+
280+
Args:
281+
wrapped: The original async stream reader to wrap
282+
"""
283+
super().__init__(wrapped)
284+
285+
async def __aenter__(self) -> Any:
286+
"""Enter the async context manager by delegating to the wrapped object."""
287+
return await self.__wrapped__.__aenter__()
288+
289+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
290+
"""Exit the async context manager by delegating to the wrapped object."""
291+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
292+
293+
async def __aiter__(self) -> AsyncGenerator[Any, None]:
294+
"""Iterate over items, restoring context for ItemWithContext instances.
295+
296+
For items wrapped with context, temporarily activates the associated
297+
OpenTelemetry context during processing, then properly detaches it.
298+
Regular items are yielded without context modification.
299+
300+
Yields:
301+
Unwrapped items processed under their associated OpenTelemetry context
302+
"""
303+
async for item in self.__wrapped__:
304+
if isinstance(item, ItemWithContext):
305+
restore = context.attach(item.ctx)
306+
try:
307+
yield item.item
308+
finally:
309+
context.detach(restore)
310+
else:
311+
yield item

0 commit comments

Comments
 (0)