From 96537ef6a969f82bbd99e8343aae54ff5d4395b4 Mon Sep 17 00:00:00 2001 From: Ge Li <77590974+GeLi2001@users.noreply.github.com> Date: Sun, 5 Oct 2025 02:04:25 -0700 Subject: [PATCH 1/3] wip --- .../instrumentation/agno/_wrappers.py | 30 ++++----- .../tests/test_instrumentor.py | 64 ++++++++++++++++++- 2 files changed, 78 insertions(+), 16 deletions(-) diff --git a/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py b/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py index 574fb510a6..88746bced3 100644 --- a/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py +++ b/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py @@ -82,14 +82,22 @@ def _generate_node_id() -> str: return token_hex(8) # Generates 16 hex characters (8 bytes) -def _run_arguments(arguments: Mapping[str, Any]) -> Iterator[Tuple[str, AttributeValue]]: - user_id = arguments.get("user_id") +def _extract_session_id(arguments: Mapping[str, Any]) -> Optional[str]: + """Extract session_id with proper fallback logic for Agno v2 compatibility.""" session_id = arguments.get("session_id") # For agno v2: session_id might be in the session object for internal _run method - session = arguments.get("session") - if session and hasattr(session, "session_id"): - session_id = session.session_id + if not session_id: + session = arguments.get("session") + if session and hasattr(session, "session_id"): + session_id = session.session_id + + return session_id + + +def _run_arguments(arguments: Mapping[str, Any]) -> Iterator[Tuple[str, AttributeValue]]: + user_id = arguments.get("user_id") + session_id = _extract_session_id(arguments) if session_id: yield SESSION_ID, session_id @@ -271,11 +279,7 @@ def run_stream( try: yield from wrapped(*args, **kwargs) # Use get_last_run_output instead of removed agent.run_response - session_id = None - try: - session_id = arguments.get("session_id") - except Exception: - session_id = None + session_id = _extract_session_id(arguments) run_response = None if hasattr(agent, "get_last_run_output"): @@ -401,11 +405,7 @@ async def arun_stream( yield response # Use get_last_run_output instead of removed agent.run_response - session_id = None - try: - session_id = arguments.get("session_id") - except Exception: - session_id = None + session_id = _extract_session_id(arguments) run_response = None if hasattr(agent, "get_last_run_output"): diff --git a/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py b/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py index 5779ef130f..0f9e1a9840 100644 --- a/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py +++ b/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py @@ -1,4 +1,4 @@ -from typing import Any, Generator +from typing import Any, Generator, Iterator, Optional import pytest import vcr # type: ignore @@ -231,3 +231,65 @@ def is_valid_node_id(node_id: str) -> bool: assert web_agent_span is not None or finance_agent_span is not None, ( "At least one agent span should be found" ) + + +def test_session_id_streaming_regression( + tracer_provider: TracerProvider, + in_memory_span_exporter: InMemorySpanExporter, + setup_agno_instrumentation: Any, +) -> None: + """Regression test: ensure session_id is properly extracted in streaming methods.""" + from unittest.mock import Mock + + from openinference.instrumentation.agno._wrappers import _RunWrapper + + # Test the session_id extraction logic directly + mock_tracer = Mock() + mock_span = Mock() + mock_context_manager = Mock() + mock_context_manager.__enter__ = Mock(return_value=mock_span) + mock_context_manager.__exit__ = Mock(return_value=None) + mock_tracer.start_as_current_span.return_value = mock_context_manager + + run_wrapper = _RunWrapper(tracer=mock_tracer) + + # Mock agent with get_last_run_output method that raises exception when session_id is None + mock_agent = Mock() + + def mock_get_last_run_output(session_id: str = None) -> Mock: + if session_id is None: + raise Exception("No session_id provided") + mock_output = Mock() + mock_output.to_json.return_value = '{"result": "test output"}' + return mock_output + + mock_agent.get_last_run_output = mock_get_last_run_output + mock_agent.name = "Test Agent" + + # Mock the internal _run_stream method with proper signature + def mock_run_stream(message: str, session_id: str = None, **kwargs: Any) -> Iterator[str]: + yield "chunk1" + yield "chunk2" + + # Test arguments with session_id + test_args = ("test message",) + test_kwargs = {"session_id": "test_session_123"} + + # This should not raise "No session_id provided" exception + try: + result = list( + run_wrapper.run_stream( + wrapped=mock_run_stream, instance=mock_agent, args=test_args, kwargs=test_kwargs + ) + ) + # If we reach here, the session_id was properly extracted + assert len(result) == 2 + assert result == ["chunk1", "chunk2"] + test_passed = True + except Exception as e: + if "No session_id provided" in str(e): + test_passed = False # The bug is still present + else: + raise # Some other unexpected exception + + assert test_passed, "The session_id extraction should work properly in streaming methods" From a2c613bce1c4650c688f955a7b37c7290449af16 Mon Sep 17 00:00:00 2001 From: Ge Li <77590974+GeLi2001@users.noreply.github.com> Date: Sun, 5 Oct 2025 02:22:57 -0700 Subject: [PATCH 2/3] fix linting checks --- .../src/openinference/instrumentation/agno/_wrappers.py | 2 +- .../tests/test_instrumentor.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py b/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py index 88746bced3..a1fb6743b4 100644 --- a/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py +++ b/python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py @@ -92,7 +92,7 @@ def _extract_session_id(arguments: Mapping[str, Any]) -> Optional[str]: if session and hasattr(session, "session_id"): session_id = session.session_id - return session_id + return str(session_id) if session_id is not None else None def _run_arguments(arguments: Mapping[str, Any]) -> Iterator[Tuple[str, AttributeValue]]: diff --git a/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py b/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py index 0f9e1a9840..2a71072d44 100644 --- a/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py +++ b/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py @@ -256,7 +256,7 @@ def test_session_id_streaming_regression( # Mock agent with get_last_run_output method that raises exception when session_id is None mock_agent = Mock() - def mock_get_last_run_output(session_id: str = None) -> Mock: + def mock_get_last_run_output(session_id: Optional[str] = None) -> Mock: if session_id is None: raise Exception("No session_id provided") mock_output = Mock() @@ -267,7 +267,9 @@ def mock_get_last_run_output(session_id: str = None) -> Mock: mock_agent.name = "Test Agent" # Mock the internal _run_stream method with proper signature - def mock_run_stream(message: str, session_id: str = None, **kwargs: Any) -> Iterator[str]: + def mock_run_stream( + message: str, session_id: Optional[str] = None, **kwargs: Any + ) -> Iterator[str]: yield "chunk1" yield "chunk2" From 7b44140d841e435ea8ae700ca53ab4b6c26f66d3 Mon Sep 17 00:00:00 2001 From: Ge Li <77590974+GeLi2001@users.noreply.github.com> Date: Fri, 10 Oct 2025 11:36:53 -0700 Subject: [PATCH 3/3] wip --- .../agno/fixtures/agent_run_stream.yaml | 106 ++++++++++++++++++ .../tests/test_instrumentor.py | 95 ++++++---------- 2 files changed, 142 insertions(+), 59 deletions(-) create mode 100644 python/instrumentation/openinference-instrumentation-agno/tests/openinference/instrumentation/agno/fixtures/agent_run_stream.yaml diff --git a/python/instrumentation/openinference-instrumentation-agno/tests/openinference/instrumentation/agno/fixtures/agent_run_stream.yaml b/python/instrumentation/openinference-instrumentation-agno/tests/openinference/instrumentation/agno/fixtures/agent_run_stream.yaml new file mode 100644 index 0000000000..d688d453ad --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-agno/tests/openinference/instrumentation/agno/fixtures/agent_run_stream.yaml @@ -0,0 +1,106 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"What is 2+2?"}],"model":"gpt-4o-mini","stream":true}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, br + connection: + - keep-alive + content-length: + - '82' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.108.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.108.0 + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.5 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"2"},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"+"},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"2"},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" equals"},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" 4"},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]} + + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1699999999,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} + + + data: [DONE] + + ' + headers: + cache-control: + - no-cache, must-revalidate + cf-cache-status: + - DYNAMIC + cf-ray: + - 8c123456789abcdef-SJC + content-type: + - text/plain; charset=utf-8 + date: + - Fri, 10 Oct 2025 10:00:00 GMT + openai-model: + - gpt-4o-mini-2024-07-18 + openai-organization: + - org-test123 + openai-processing-ms: + - '500' + openai-version: + - '2020-10-01' + server: + - cloudflare + strict-transport-security: + - max-age=15724800; includeSubDomains + transfer-encoding: + - chunked + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '200000' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '199950' + x-ratelimit-reset-requests: + - 6ms + x-ratelimit-reset-tokens: + - 15ms + x-request-id: + - req_test123456789abcdef + status: + code: 200 + message: OK +version: 1 diff --git a/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py b/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py index 2a71072d44..e9306aafc2 100644 --- a/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py +++ b/python/instrumentation/openinference-instrumentation-agno/tests/test_instrumentor.py @@ -1,4 +1,4 @@ -from typing import Any, Generator, Iterator, Optional +from typing import Any, Generator import pytest import vcr # type: ignore @@ -143,9 +143,7 @@ def test_agno_team_coordinate_instrumentation( name="Finance Agent", role="Get financial data", model=OpenAIChat(id="gpt-4o-mini"), - tools=[ - YFinanceTools() # type: ignore - ], + tools=[YFinanceTools()], instructions="Use tables to display data", ) @@ -239,59 +237,38 @@ def test_session_id_streaming_regression( setup_agno_instrumentation: Any, ) -> None: """Regression test: ensure session_id is properly extracted in streaming methods.""" - from unittest.mock import Mock - - from openinference.instrumentation.agno._wrappers import _RunWrapper - - # Test the session_id extraction logic directly - mock_tracer = Mock() - mock_span = Mock() - mock_context_manager = Mock() - mock_context_manager.__enter__ = Mock(return_value=mock_span) - mock_context_manager.__exit__ = Mock(return_value=None) - mock_tracer.start_as_current_span.return_value = mock_context_manager - - run_wrapper = _RunWrapper(tracer=mock_tracer) - - # Mock agent with get_last_run_output method that raises exception when session_id is None - mock_agent = Mock() - - def mock_get_last_run_output(session_id: Optional[str] = None) -> Mock: - if session_id is None: - raise Exception("No session_id provided") - mock_output = Mock() - mock_output.to_json.return_value = '{"result": "test output"}' - return mock_output - - mock_agent.get_last_run_output = mock_get_last_run_output - mock_agent.name = "Test Agent" - - # Mock the internal _run_stream method with proper signature - def mock_run_stream( - message: str, session_id: Optional[str] = None, **kwargs: Any - ) -> Iterator[str]: - yield "chunk1" - yield "chunk2" - - # Test arguments with session_id - test_args = ("test message",) - test_kwargs = {"session_id": "test_session_123"} - - # This should not raise "No session_id provided" exception - try: - result = list( - run_wrapper.run_stream( - wrapped=mock_run_stream, instance=mock_agent, args=test_args, kwargs=test_kwargs - ) + with test_vcr.use_cassette( + "agent_run_stream.yaml", filter_headers=["authorization", "X-API-KEY"] + ): + import os + + os.environ["OPENAI_API_KEY"] = "fake_key" + agent = Agent( + name="Stream Agent", + model=OpenAIChat(id="gpt-4o-mini"), ) - # If we reach here, the session_id was properly extracted - assert len(result) == 2 - assert result == ["chunk1", "chunk2"] - test_passed = True - except Exception as e: - if "No session_id provided" in str(e): - test_passed = False # The bug is still present - else: - raise # Some other unexpected exception - - assert test_passed, "The session_id extraction should work properly in streaming methods" + + # Test streaming with session_id - this should not raise an exception + session_id = "test_session_stream" + # Use the public run method with stream=True instead of _run_stream + stream_result = agent.run("What is 2+2?", session_id=session_id, stream=True) + + # Consume the stream to trigger the instrumentation + if hasattr(stream_result, "__iter__"): + list(stream_result) + + spans = in_memory_span_exporter.get_finished_spans() + assert len(spans) >= 1 + + # Find the streaming span and verify session_id is captured + stream_span = None + for span in spans: + attributes = dict(span.attributes or dict()) + if span.name == "Stream_Agent.run": + stream_span = attributes + break + + assert stream_span is not None, "Stream agent span should be found" + assert stream_span.get("session.id") == session_id, ( + "Session ID should be properly extracted in streaming methods" + )