Skip to content

Commit 34d499a

Browse files
authored
fix(telemetry): added mcp tracing context propagation (#569)
1 parent c85464c commit 34d499a

File tree

3 files changed

+815
-0
lines changed

3 files changed

+815
-0
lines changed

src/strands/tools/mcp/mcp_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from ...types.media import ImageFormat
3030
from ...types.tools import ToolResultContent, ToolResultStatus
3131
from .mcp_agent_tool import MCPAgentTool
32+
from .mcp_instrumentation import mcp_instrumentation
3233
from .mcp_types import MCPToolResult, MCPTransport
3334

3435
logger = logging.getLogger(__name__)
@@ -68,6 +69,7 @@ def __init__(self, transport_callable: Callable[[], MCPTransport]):
6869
Args:
6970
transport_callable: A callable that returns an MCPTransport (read_stream, write_stream) tuple
7071
"""
72+
mcp_instrumentation()
7173
self._session_id = uuid.uuid4()
7274
self._log_debug_with_thread("initializing MCPClient connection")
7375
self._init_future: futures.Future[None] = futures.Future() # Main thread blocks until future completes
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
"""OpenTelemetry instrumentation for Model Context Protocol (MCP) tracing.
2+
3+
Enables distributed tracing across MCP client-server boundaries by injecting
4+
OpenTelemetry context into MCP request metadata (_meta field) and extracting
5+
it on the server side, creating unified traces that span from agent calls
6+
through MCP tool executions.
7+
8+
Based on: https://github.com/traceloop/openllmetry/tree/main/packages/opentelemetry-instrumentation-mcp
9+
Related issue: https://github.com/modelcontextprotocol/modelcontextprotocol/issues/246
10+
"""
11+
12+
from contextlib import _AsyncGeneratorContextManager, asynccontextmanager
13+
from dataclasses import dataclass
14+
from typing import Any, AsyncGenerator, Callable, Tuple
15+
16+
from mcp.shared.message import SessionMessage
17+
from mcp.types import JSONRPCMessage, JSONRPCRequest
18+
from opentelemetry import context, propagate
19+
from wrapt import ObjectProxy, register_post_import_hook, wrap_function_wrapper
20+
21+
22+
@dataclass(slots=True, frozen=True)
23+
class ItemWithContext:
24+
"""Wrapper for items that need to carry OpenTelemetry context.
25+
26+
Used to preserve tracing context across async boundaries in MCP sessions,
27+
ensuring that distributed traces remain connected even when messages are
28+
processed asynchronously.
29+
30+
Attributes:
31+
item: The original item being wrapped
32+
ctx: The OpenTelemetry context associated with the item
33+
"""
34+
35+
item: Any
36+
ctx: context.Context
37+
38+
39+
def mcp_instrumentation() -> None:
40+
"""Apply OpenTelemetry instrumentation patches to MCP components.
41+
42+
This function instruments three key areas of MCP communication:
43+
1. Client-side: Injects tracing context into tool call requests
44+
2. Transport-level: Extracts context from incoming messages
45+
3. Session-level: Manages bidirectional context flow
46+
47+
The patches enable distributed tracing by:
48+
- Adding OpenTelemetry context to the _meta field of MCP requests
49+
- Extracting and activating context on the server side
50+
- Preserving context across async message processing boundaries
51+
"""
52+
53+
def patch_mcp_client(wrapped: Callable[..., Any], instance: Any, args: Any, kwargs: Any) -> Any:
54+
"""Patch MCP client to inject OpenTelemetry context into tool calls.
55+
56+
Intercepts outgoing MCP requests and injects the current OpenTelemetry
57+
context into the request's _meta field for tools/call methods. This
58+
enables server-side context extraction and trace continuation.
59+
60+
Args:
61+
wrapped: The original function being wrapped
62+
instance: The instance the method is being called on
63+
args: Positional arguments to the wrapped function
64+
kwargs: Keyword arguments to the wrapped function
65+
66+
Returns:
67+
Result of the wrapped function call
68+
"""
69+
if len(args) < 1:
70+
return wrapped(*args, **kwargs)
71+
72+
request = args[0]
73+
method = getattr(request.root, "method", None)
74+
75+
if method != "tools/call":
76+
return wrapped(*args, **kwargs)
77+
78+
try:
79+
if hasattr(request.root, "params") and request.root.params:
80+
# Handle Pydantic models
81+
if hasattr(request.root.params, "model_dump") and hasattr(request.root.params, "model_validate"):
82+
params_dict = request.root.params.model_dump()
83+
# Add _meta with tracing context
84+
meta = params_dict.setdefault("_meta", {})
85+
propagate.get_global_textmap().inject(meta)
86+
87+
# Recreate the Pydantic model with the updated data
88+
# This preserves the original model type and avoids serialization warnings
89+
params_class = type(request.root.params)
90+
try:
91+
request.root.params = params_class.model_validate(params_dict)
92+
except Exception:
93+
# Fallback to dict if model recreation fails
94+
request.root.params = params_dict
95+
96+
elif isinstance(request.root.params, dict):
97+
# Handle dict params directly
98+
meta = request.root.params.setdefault("_meta", {})
99+
propagate.get_global_textmap().inject(meta)
100+
101+
return wrapped(*args, **kwargs)
102+
103+
except Exception:
104+
return wrapped(*args, **kwargs)
105+
106+
def transport_wrapper() -> Callable[
107+
[Callable[..., Any], Any, Any, Any], _AsyncGeneratorContextManager[tuple[Any, Any]]
108+
]:
109+
"""Create a wrapper for MCP transport connections.
110+
111+
Returns a context manager that wraps transport read/write streams
112+
with context extraction capabilities. The wrapped reader will
113+
automatically extract OpenTelemetry context from incoming messages.
114+
115+
Returns:
116+
An async context manager that yields wrapped transport streams
117+
"""
118+
119+
@asynccontextmanager
120+
async def traced_method(
121+
wrapped: Callable[..., Any], instance: Any, args: Any, kwargs: Any
122+
) -> AsyncGenerator[Tuple[Any, Any], None]:
123+
async with wrapped(*args, **kwargs) as result:
124+
try:
125+
read_stream, write_stream = result
126+
except ValueError:
127+
read_stream, write_stream, _ = result
128+
yield TransportContextExtractingReader(read_stream), write_stream
129+
130+
return traced_method
131+
132+
def session_init_wrapper() -> Callable[[Any, Any, Tuple[Any, ...], dict[str, Any]], None]:
133+
"""Create a wrapper for MCP session initialization.
134+
135+
Wraps session message streams to enable bidirectional context flow.
136+
The reader extracts and activates context, while the writer preserves
137+
context for async processing.
138+
139+
Returns:
140+
A function that wraps session initialization
141+
"""
142+
143+
def traced_method(
144+
wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], kwargs: dict[str, Any]
145+
) -> None:
146+
wrapped(*args, **kwargs)
147+
reader = getattr(instance, "_incoming_message_stream_reader", None)
148+
writer = getattr(instance, "_incoming_message_stream_writer", None)
149+
if reader and writer:
150+
instance._incoming_message_stream_reader = SessionContextAttachingReader(reader)
151+
instance._incoming_message_stream_writer = SessionContextSavingWriter(writer)
152+
153+
return traced_method
154+
155+
# Apply patches
156+
wrap_function_wrapper("mcp.shared.session", "BaseSession.send_request", patch_mcp_client)
157+
158+
register_post_import_hook(
159+
lambda _: wrap_function_wrapper(
160+
"mcp.server.streamable_http", "StreamableHTTPServerTransport.connect", transport_wrapper()
161+
),
162+
"mcp.server.streamable_http",
163+
)
164+
165+
register_post_import_hook(
166+
lambda _: wrap_function_wrapper("mcp.server.session", "ServerSession.__init__", session_init_wrapper()),
167+
"mcp.server.session",
168+
)
169+
170+
171+
class TransportContextExtractingReader(ObjectProxy):
172+
"""A proxy reader that extracts OpenTelemetry context from MCP messages.
173+
174+
Wraps an async message stream reader to automatically extract and activate
175+
OpenTelemetry context from the _meta field of incoming MCP requests. This
176+
enables server-side trace continuation from client-injected context.
177+
178+
The reader handles both SessionMessage and JSONRPCMessage formats, and
179+
supports both dict and Pydantic model parameter structures.
180+
"""
181+
182+
def __init__(self, wrapped: Any) -> None:
183+
"""Initialize the context-extracting reader.
184+
185+
Args:
186+
wrapped: The original async stream reader to wrap
187+
"""
188+
super().__init__(wrapped)
189+
190+
async def __aenter__(self) -> Any:
191+
"""Enter the async context manager by delegating to the wrapped object."""
192+
return await self.__wrapped__.__aenter__()
193+
194+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
195+
"""Exit the async context manager by delegating to the wrapped object."""
196+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
197+
198+
async def __aiter__(self) -> AsyncGenerator[Any, None]:
199+
"""Iterate over messages, extracting and activating context as needed.
200+
201+
For each incoming message, checks if it contains tracing context in
202+
the _meta field. If found, extracts and activates the context for
203+
the duration of message processing, then properly detaches it.
204+
205+
Yields:
206+
Messages from the wrapped stream, processed under the appropriate
207+
OpenTelemetry context
208+
"""
209+
async for item in self.__wrapped__:
210+
if isinstance(item, SessionMessage):
211+
request = item.message.root
212+
elif type(item) is JSONRPCMessage:
213+
request = item.root
214+
else:
215+
yield item
216+
continue
217+
218+
if isinstance(request, JSONRPCRequest) and request.params:
219+
# Handle both dict and Pydantic model params
220+
if hasattr(request.params, "get"):
221+
# Dict-like access
222+
meta = request.params.get("_meta")
223+
elif hasattr(request.params, "_meta"):
224+
# Direct attribute access for Pydantic models
225+
meta = getattr(request.params, "_meta", None)
226+
else:
227+
meta = None
228+
229+
if meta:
230+
extracted_context = propagate.extract(meta)
231+
restore = context.attach(extracted_context)
232+
try:
233+
yield item
234+
continue
235+
finally:
236+
context.detach(restore)
237+
yield item
238+
239+
240+
class SessionContextSavingWriter(ObjectProxy):
241+
"""A proxy writer that preserves OpenTelemetry context with outgoing items.
242+
243+
Wraps an async message stream writer to capture the current OpenTelemetry
244+
context and associate it with outgoing items. This enables context
245+
preservation across async boundaries in MCP session processing.
246+
"""
247+
248+
def __init__(self, wrapped: Any) -> None:
249+
"""Initialize the context-saving writer.
250+
251+
Args:
252+
wrapped: The original async stream writer to wrap
253+
"""
254+
super().__init__(wrapped)
255+
256+
async def __aenter__(self) -> Any:
257+
"""Enter the async context manager by delegating to the wrapped object."""
258+
return await self.__wrapped__.__aenter__()
259+
260+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
261+
"""Exit the async context manager by delegating to the wrapped object."""
262+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
263+
264+
async def send(self, item: Any) -> Any:
265+
"""Send an item while preserving the current OpenTelemetry context.
266+
267+
Captures the current context and wraps the item with it, enabling
268+
the receiving side to restore the appropriate tracing context.
269+
270+
Args:
271+
item: The item to send through the stream
272+
273+
Returns:
274+
Result of sending the wrapped item
275+
"""
276+
ctx = context.get_current()
277+
return await self.__wrapped__.send(ItemWithContext(item, ctx))
278+
279+
280+
class SessionContextAttachingReader(ObjectProxy):
281+
"""A proxy reader that restores OpenTelemetry context from wrapped items.
282+
283+
Wraps an async message stream reader to detect ItemWithContext instances
284+
and restore their associated OpenTelemetry context during processing.
285+
This completes the context preservation cycle started by SessionContextSavingWriter.
286+
"""
287+
288+
def __init__(self, wrapped: Any) -> None:
289+
"""Initialize the context-attaching reader.
290+
291+
Args:
292+
wrapped: The original async stream reader to wrap
293+
"""
294+
super().__init__(wrapped)
295+
296+
async def __aenter__(self) -> Any:
297+
"""Enter the async context manager by delegating to the wrapped object."""
298+
return await self.__wrapped__.__aenter__()
299+
300+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
301+
"""Exit the async context manager by delegating to the wrapped object."""
302+
return await self.__wrapped__.__aexit__(exc_type, exc_value, traceback)
303+
304+
async def __aiter__(self) -> AsyncGenerator[Any, None]:
305+
"""Iterate over items, restoring context for ItemWithContext instances.
306+
307+
For items wrapped with context, temporarily activates the associated
308+
OpenTelemetry context during processing, then properly detaches it.
309+
Regular items are yielded without context modification.
310+
311+
Yields:
312+
Unwrapped items processed under their associated OpenTelemetry context
313+
"""
314+
async for item in self.__wrapped__:
315+
if isinstance(item, ItemWithContext):
316+
restore = context.attach(item.ctx)
317+
try:
318+
yield item.item
319+
finally:
320+
context.detach(restore)
321+
else:
322+
yield item

0 commit comments

Comments
 (0)