From d259a3002f94b67129fe4f5669353d5ea9684b2f Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Wed, 13 Aug 2025 11:48:21 +0200 Subject: [PATCH 1/7] LangfuseTracer is now thread-safe, avoids mixed traces in async environments --- .../tracing/langfuse/tracer.py | 27 ++-- integrations/langfuse/tests/test_tracer.py | 130 ++++++++++++++---- 2 files changed, 123 insertions(+), 34 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 36cb81ca0f..a4706cb01e 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -61,10 +61,14 @@ _COMPONENT_OUTPUT_KEY = "haystack.component.output" _COMPONENT_INPUT_KEY = "haystack.component.input" -# Context var used to keep track of tracing related info. -# This mainly useful for parents spans. +# External session metadata for trace correlation (Haystack system) +# Stores trace_id, user_id, session_id, tags, version for root trace creation tracing_context_var: ContextVar[Dict[Any, Any]] = ContextVar("tracing_context") +# Internal span execution hierarchy for our tracer +# Manages parent-child relationships and prevents cross-request span interleaving +span_stack_var: ContextVar[Optional[List["LangfuseSpan"]]] = ContextVar("span_stack", default=None) + class LangfuseSpan(Span): """ @@ -265,6 +269,7 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: ) raise RuntimeError(message) + # Get external tracing context for root trace creation (correlation metadata) tracing_ctx = tracing_context_var.get({}) if not context.parent_span: # Create a new trace when there's no parent span @@ -360,6 +365,7 @@ def __init__( "before importing Haystack." ) self._tracer = tracer + # Keep _context as deprecated shim to avoid AttributeError if anyone uses it self._context: List[LangfuseSpan] = [] self._name = name self._public = public @@ -391,7 +397,12 @@ def trace( # Create span using the handler span = self._span_handler.create_span(span_context) - self._context.append(span) + # Build new span hierarchy: copy existing stack, add new span, save for restoration + prev_stack = span_stack_var.get() + new_stack = (prev_stack or []).copy() + new_stack.append(span) + token = span_stack_var.set(new_stack) + span.set_tags(tags) try: @@ -414,10 +425,8 @@ def trace( cleanup_error=cleanup_error, ) finally: - # CRITICAL: Always pop context to prevent corruption - # This is especially important for nested pipeline scenarios - if self._context and self._context[-1] == span: - self._context.pop() + # Restore previous span stack using saved token - ensures proper cleanup + span_stack_var.reset(token) if self.enforce_flush: self.flush() @@ -431,7 +440,9 @@ def current_span(self) -> Optional[Span]: :return: The current span if available, else None. """ - return self._context[-1] if self._context else None + # Get top of span stack (most recent span) from context-local storage + stack = span_stack_var.get() + return stack[-1] if stack else None def get_trace_url(self) -> str: """ diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 1b17780193..71e4e844c2 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import asyncio import datetime import json import logging @@ -15,38 +16,43 @@ from haystack_integrations.components.connectors.langfuse import LangfuseConnector from haystack_integrations.tracing.langfuse.tracer import ( - _COMPONENT_OUTPUT_KEY, DefaultSpanHandler, LangfuseSpan, LangfuseTracer, - SpanContext) + _COMPONENT_OUTPUT_KEY, + DefaultSpanHandler, + LangfuseSpan, + LangfuseTracer, + SpanContext, +) class MockSpan: - def __init__(self): + def __init__(self, name="mock_span"): self._data = {} self._span = self - self.operation_name = "operation_name" + self.operation_name = name + self._name = name def raw_span(self): return self def span(self, name=None): - # assert correct operation name passed to the span - assert name == "operation_name" - return self + # Return a new mock span for child spans + return MockSpan(name=name or "child_span") def update(self, **kwargs): self._data.update(kwargs) def generation(self, name=None): - return self + # Return a new mock span for generation spans + return MockSpan(name=name or "generation_span") def end(self): pass class MockTracer: - def trace(self, name, **kwargs): - return MockSpan() + # Return a unique mock span for each trace call + return MockSpan(name=name) def flush(self): pass @@ -62,7 +68,6 @@ def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: class TestLangfuseSpan: - # LangfuseSpan can be initialized with a span object def test_initialized_with_span_object(self): mock_span = Mock() @@ -235,7 +240,8 @@ def test_initialization(self): langfuse_instance = Mock() tracer = LangfuseTracer(tracer=langfuse_instance, name="Haystack", public=True) assert tracer._tracer == langfuse_instance - assert tracer._context == [] + # Check behavioral state instead of internal _context list + assert tracer.current_span() is None assert tracer._name == "Haystack" assert tracer._public @@ -258,13 +264,14 @@ def test_create_new_span(self): # check that the trace method is called on the tracer instance with the provided operation name and tags with tracer.trace("operation_name", tags={"tag1": "value1", "tag2": "value2"}) as span: - assert len(tracer._context) == 1, "The trace span should have been added to the the root context span" + # Check that there is a current active span during tracing + assert tracer.current_span() is not None, "There should be an active span during tracing" + assert tracer.current_span() == span, "The current span should be the active span" assert span.raw_span().operation_name == "operation_name" assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"} - assert ( - len(tracer._context) == 0 - ), "The trace span should have been popped, and the root span is closed as well" + # Check that the span is cleaned up after tracing + assert tracer.current_span() is None, "There should be no active span after tracing completes" # check that update method is called on the span instance with the provided key value pairs def test_update_span_with_pipeline_input_output_data(self): @@ -327,12 +334,12 @@ def test_handle_tool_invoker(self): assert mock_span.update.call_count >= 1 name_update_call = None for call in mock_span.update.call_args_list: - if 'name' in call[1]: + if "name" in call[1]: name_update_call = call break assert name_update_call is not None, "No call to update the span name was made" - updated_name = name_update_call[1]['name'] + updated_name = name_update_call[1]["name"] # verify the format of the updated span name to be: `original_component_name - [list_of_tool_names]` assert updated_name != "tool_invoker", f"Expected 'tool_invoker` to be upddated with tool names" @@ -372,8 +379,7 @@ def test_update_span_flush_disable(self, monkeypatch): monkeypatch.setenv("HAYSTACK_LANGFUSE_ENFORCE_FLUSH", "false") tracer_mock = Mock() - from haystack_integrations.tracing.langfuse.tracer import \ - LangfuseTracer + from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False) with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: @@ -388,11 +394,12 @@ def test_context_is_empty_after_tracing(self): with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span: pass - assert tracer._context == [] + # Check behavioral state instead of internal _context list + assert tracer.current_span() is None def test_init_with_tracing_disabled(self, monkeypatch, caplog): # Clear haystack modules because ProxyTracer is initialized whenever haystack is imported - modules_to_clear = [name for name in sys.modules if name.startswith('haystack')] + modules_to_clear = [name for name in sys.modules if name.startswith("haystack")] for name in modules_to_clear: sys.modules.pop(name, None) @@ -416,7 +423,6 @@ def test_context_cleanup_after_nested_failures(self): After the fix: context is always cleaned up (length == 0) """ - @component class FailingParser: @component.output_types(result=str) @@ -451,10 +457,82 @@ def run(self, input_data: str): pass # Expected to fail # Critical assertion: context should be empty after failed operation - assert len(tracer.tracer._context) == 0 + assert tracer.tracer.current_span() is None # Test 2: Second run should work normally with clean context main_pipeline.run({"nested_component": {"input_data": '{"key": "valid"}'}}) - + # Critical assertion: context should be empty after successful operation - assert len(tracer.tracer._context) == 0 + assert tracer.tracer.current_span() is None + + def test_async_concurrency_span_isolation(self): + """ + Test that concurrent async traces maintain isolated span contexts. + + This test verifies that the context-local span stack prevents cross-request + span interleaving in concurrent environments like FastAPI servers. + """ + tracer = LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False) + + # Track spans from each task for verification + task1_spans = [] + task2_spans = [] + + async def trace_task(task_id: str, spans_list: list): + """Simulate a request with nested tracing operations""" + with tracer.trace(f"outer_operation_{task_id}") as outer_span: + spans_list.append(("outer", outer_span, tracer.current_span())) + + # Simulate some async work + await asyncio.sleep(0.01) + + with tracer.trace(f"inner_operation_{task_id}") as inner_span: + spans_list.append(("inner", inner_span, tracer.current_span())) + + # Simulate more async work + await asyncio.sleep(0.01) + + # Verify nested relationship within this task + assert tracer.current_span() == inner_span + + # After inner span, outer should be current again + spans_list.append(("after_inner", None, tracer.current_span())) + assert tracer.current_span() == outer_span + + # After all spans, should be None + spans_list.append(("after_outer", None, tracer.current_span())) + assert tracer.current_span() is None + + async def run_concurrent_traces(): + """Run two concurrent tracing tasks""" + await asyncio.gather(trace_task("task1", task1_spans), trace_task("task2", task2_spans)) + + # Run the concurrent test + asyncio.run(run_concurrent_traces()) + + # Verify both tasks completed successfully + assert len(task1_spans) == 4 + assert len(task2_spans) == 4 + + # Verify each task had proper span isolation + # Task 1 spans should be different from Task 2 spans + task1_outer = task1_spans[0][1] # outer span from task1 + task2_outer = task2_spans[0][1] # outer span from task2 + assert task1_outer != task2_outer + + task1_inner = task1_spans[1][1] # inner span from task1 + task2_inner = task2_spans[1][1] # inner span from task2 + assert task1_inner != task2_inner + + # Verify proper nesting within each task + # Task 1: outer -> inner -> outer -> None + assert task1_spans[0][2] == task1_outer # current_span during outer + assert task1_spans[1][2] == task1_inner # current_span during inner + assert task1_spans[2][2] == task1_outer # current_span after inner + assert task1_spans[3][2] is None # current_span after outer + + # Task 2: outer -> inner -> outer -> None + assert task2_spans[0][2] == task2_outer # current_span during outer + assert task2_spans[1][2] == task2_inner # current_span during inner + assert task2_spans[2][2] == task2_outer # current_span after inner + assert task2_spans[3][2] is None # current_span after outer From fdfd9eac5eda5357203cd027a57560b509b9d01a Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Wed, 13 Aug 2025 13:54:15 +0200 Subject: [PATCH 2/7] Improve session support --- .../connectors/langfuse/langfuse_connector.py | 43 ++++++++++++++++++- .../tracing/langfuse/tracer.py | 10 ++++- .../langfuse/tests/test_langfuse_connector.py | 6 +++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 9cdb2d9c29..00ab6170b7 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -112,6 +112,20 @@ def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: connector = LangfuseConnector(span_handler=CustomSpanHandler()) ``` + + **Using Sessions to Group Related Traces:** + + ```python + # Create a connector with a session ID to group all related traces + tracer = LangfuseConnector( + name="Agent Workflow", + session_id="user_123_workflow_456" + ) + + # All traces created by this connector will use the same session_id + # Langfuse will automatically group them as one session in the UI + # This is useful for complex workflows with multiple agents or pipelines + ``` """ def __init__( @@ -125,6 +139,7 @@ def __init__( *, host: Optional[str] = None, langfuse_client_kwargs: Optional[Dict[str, Any]] = None, + session_id: Optional[str] = None, ) -> None: """ Initialize the LangfuseConnector component. @@ -148,6 +163,9 @@ def __init__( :param langfuse_client_kwargs: Optional custom configuration for the Langfuse client. This is a dictionary containing any additional configuration options for the Langfuse client. See the Langfuse documentation for more details on available configuration options. + :param session_id: Optional session ID to group multiple traces together. If provided, all traces created + by this connector will be grouped under the same session in Langfuse, making it easier to correlate + related operations (e.g., all agents in a workflow). """ self.name = name self.public = public @@ -156,6 +174,7 @@ def __init__( self.span_handler = span_handler self.host = host self.langfuse_client_kwargs = langfuse_client_kwargs + self.session_id = session_id resolved_langfuse_client_kwargs = { "secret_key": secret_key.resolve_value() if secret_key else None, "public_key": public_key.resolve_value() if public_key else None, @@ -168,6 +187,7 @@ def __init__( name=name, public=public, span_handler=span_handler, + session_id=session_id, ) tracing.enable_tracing(self.tracer) @@ -178,19 +198,38 @@ def run(self, invocation_context: Optional[Dict[str, Any]] = None) -> Dict[str, :param invocation_context: A dictionary with additional context for the invocation. This parameter is useful when users want to mark this particular invocation with additional information, e.g. - a run id from their own execution framework, user id, etc. These key-value pairs are then visible + a run id from their own execution framework, user_id, etc. These key-value pairs are then visible in the Langfuse traces. :returns: A dictionary with the following keys: - `name`: The name of the tracing component. - `trace_url`: The URL to the tracing data. - `trace_id`: The ID of the trace. """ + # Note: session_id is handled at the trace level, not in invocation context + # The session_id parameter is used when creating traces, not when running the connector + logger.debug( "Langfuse tracer invoked with the following context: '{invocation_context}'", invocation_context=invocation_context, ) return {"name": self.name, "trace_url": self.tracer.get_trace_url(), "trace_id": self.tracer.get_trace_id()} + def get_session_id(self) -> Optional[str]: + """ + Get the session ID used by this connector. + + :return: The session ID if set, None otherwise. + """ + return self.session_id + + def get_tracer_session_id(self) -> Optional[str]: + """ + Get the session ID from the underlying tracer. + + :return: The session ID if set, None otherwise. + """ + return getattr(self.tracer, "_session_id", None) + def to_dict(self) -> Dict[str, Any]: """ Serialize this component to a dictionary. @@ -215,6 +254,7 @@ def to_dict(self) -> Dict[str, Any]: span_handler=span_handler, host=self.host, langfuse_client_kwargs=langfuse_client_kwargs, + session_id=self.session_id, ) @classmethod @@ -230,4 +270,5 @@ def from_dict(cls, data: Dict[str, Any]) -> "LangfuseConnector": init_params["span_handler"] = ( deserialize_class_instance(init_params["span_handler"]) if init_params["span_handler"] else None ) + # session_id is a simple string, no deserialization needed return default_from_dict(cls, data) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index a4706cb01e..40b36f4e18 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -172,6 +172,7 @@ class SpanContext: parent_span: Optional[Span] trace_name: str = "Haystack" public: bool = False + session_id: Optional[str] = None def __post_init__(self) -> None: """ @@ -273,13 +274,15 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: tracing_ctx = tracing_context_var.get({}) if not context.parent_span: # Create a new trace when there's no parent span + # Use session_id from context if provided, otherwise fall back to external context + session_id = context.session_id or tracing_ctx.get("session_id") return LangfuseSpan( self.tracer.trace( name=context.trace_name, public=context.public, id=tracing_ctx.get("trace_id"), user_id=tracing_ctx.get("user_id"), - session_id=tracing_ctx.get("session_id"), + session_id=session_id, tags=tracing_ctx.get("tags"), version=tracing_ctx.get("version"), ) @@ -346,6 +349,7 @@ def __init__( name: str = "Haystack", public: bool = False, span_handler: Optional[SpanHandler] = None, + session_id: Optional[str] = None, ) -> None: """ Initialize a LangfuseTracer instance. @@ -357,6 +361,8 @@ def __init__( be publicly accessible to anyone with the tracing URL. If set to `False`, the tracing data will be private and only accessible to the Langfuse account owner. :param span_handler: Custom handler for processing spans. If None, uses DefaultSpanHandler. + :param session_id: Optional session ID to group multiple traces together. If provided, all traces + created by this tracer will use this session_id, allowing Langfuse to group them as one session. """ if not proxy_tracer.is_content_tracing_enabled: logger.warning( @@ -369,6 +375,7 @@ def __init__( self._context: List[LangfuseSpan] = [] self._name = name self._public = public + self._session_id = session_id self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true" self._span_handler = span_handler or DefaultSpanHandler() self._span_handler.init_tracer(tracer) @@ -392,6 +399,7 @@ def trace( parent_span=parent_span or self.current_span(), trace_name=self._name, public=self._public, + session_id=self._session_id, ) # Create span using the handler diff --git a/integrations/langfuse/tests/test_langfuse_connector.py b/integrations/langfuse/tests/test_langfuse_connector.py index 7827ec0e71..2888bac60b 100644 --- a/integrations/langfuse/tests/test_langfuse_connector.py +++ b/integrations/langfuse/tests/test_langfuse_connector.py @@ -69,6 +69,7 @@ def test_to_dict(self, monkeypatch): "span_handler": None, "host": None, "langfuse_client_kwargs": None, + "session_id": None, }, } @@ -111,6 +112,7 @@ def test_to_dict_with_params(self, monkeypatch): }, "host": "https://example.com", "langfuse_client_kwargs": {"timeout": 30.0}, + "session_id": None, }, } @@ -136,6 +138,7 @@ def test_from_dict(self, monkeypatch): "span_handler": None, "host": None, "langfuse_client_kwargs": None, + "session_id": None, }, } langfuse_connector = LangfuseConnector.from_dict(data) @@ -146,6 +149,7 @@ def test_from_dict(self, monkeypatch): assert langfuse_connector.span_handler is None assert langfuse_connector.host is None assert langfuse_connector.langfuse_client_kwargs is None + assert langfuse_connector.session_id is None def test_from_dict_with_params(self, monkeypatch): monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") @@ -175,6 +179,7 @@ def test_from_dict_with_params(self, monkeypatch): }, "host": "https://example.com", "langfuse_client_kwargs": {"timeout": 30.0}, + "session_id": None, }, } @@ -186,6 +191,7 @@ def test_from_dict_with_params(self, monkeypatch): assert isinstance(langfuse_connector.span_handler, CustomSpanHandler) assert langfuse_connector.host == "https://example.com" assert langfuse_connector.langfuse_client_kwargs == {"timeout": 30.0} + assert langfuse_connector.session_id is None def test_pipeline_serialization(self, monkeypatch): # Set test env vars From d2daf07e41d3492c9688917dd49bb303271cfa4f Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Fri, 15 Aug 2025 15:36:20 +0200 Subject: [PATCH 3/7] Move session id to run --- integrations/langfuse/README.md | 32 +++++++++++++++++ integrations/langfuse/example/basic_rag.py | 6 +++- integrations/langfuse/example/chat.py | 1 + .../connectors/langfuse/langfuse_connector.py | 35 ++++++------------- .../langfuse/tests/test_langfuse_connector.py | 34 ++++++++++++++---- integrations/langfuse/tests/test_tracer.py | 14 +++++--- 6 files changed, 87 insertions(+), 35 deletions(-) diff --git a/integrations/langfuse/README.md b/integrations/langfuse/README.md index 5d5beeafe0..52aa0608f9 100644 --- a/integrations/langfuse/README.md +++ b/integrations/langfuse/README.md @@ -91,12 +91,44 @@ response = pipe.run( print(response["llm"]["replies"][0]) print(response["tracer"]["trace_url"]) print(response["tracer"]["trace_id"]) + +# Using session_id to group related traces +response = pipe.run( + data={"prompt_builder": {"template_variables": {"location": "Paris"}, "template": messages}}, + tracer={"session_id": "user_123_conversation_456"} +) +print(response["llm"]["replies"][0]) +print(response["tracer"]["trace_url"]) +print(response["tracer"]["trace_id"]) ``` In this example, we add the `LangfuseConnector` to the pipeline with the name "tracer". Each run of the pipeline produces one trace viewable on the Langfuse website with a specific URL. The trace captures the entire execution context, including the prompts, completions, and metadata. +### Using Session IDs + +You can group related traces together by passing a `session_id` when running the pipeline. This is useful for: +- **Multi-turn conversations**: Group all messages in a chat session +- **Workflow tracking**: Group all operations in a user workflow +- **Debugging**: Correlate related traces across different pipeline runs + +```python +# Group traces by user session +response = pipe.run( + data={"prompt_builder": {"template_variables": {"location": "Tokyo"}, "template": messages}}, + tracer={"session_id": "user_123_session_456"} +) + +# Group traces by workflow +response = pipe.run( + data={"prompt_builder": {"template_variables": {"location": "London"}, "template": messages}}, + tracer={"session_id": "workflow_789_step_2"} +) +``` + +All traces with the same `session_id` will be grouped together in the Langfuse UI, making it easier to analyze related operations. + ## Trace Visualization Langfuse provides a user-friendly interface to visualize and analyze the traces generated by your Haystack pipeline. diff --git a/integrations/langfuse/example/basic_rag.py b/integrations/langfuse/example/basic_rag.py index b1d5e620fc..9b3e845557 100644 --- a/integrations/langfuse/example/basic_rag.py +++ b/integrations/langfuse/example/basic_rag.py @@ -60,7 +60,11 @@ def get_pipeline(document_store: InMemoryDocumentStore): pipeline = get_pipeline(document_store) question = "What does Rhodes Statue look like?" - response = pipeline.run({"text_embedder": {"text": question}, "prompt_builder": {"question": question}}) + response = pipeline.run({ + "text_embedder": {"text": question}, + "prompt_builder": {"question": question}, + "tracer": {"session_id": "rag_session_001"} + }) print(response["llm"]["replies"][0]) print(response["tracer"]["trace_url"]) diff --git a/integrations/langfuse/example/chat.py b/integrations/langfuse/example/chat.py index 2308ed1f45..37a0c295e4 100644 --- a/integrations/langfuse/example/chat.py +++ b/integrations/langfuse/example/chat.py @@ -57,6 +57,7 @@ }, "tracer": { "invocation_context": {"some_key": "some_value"}, + "session_id": "user_123_chat_session", }, } ) diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 00ab6170b7..685f24c527 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -139,7 +139,6 @@ def __init__( *, host: Optional[str] = None, langfuse_client_kwargs: Optional[Dict[str, Any]] = None, - session_id: Optional[str] = None, ) -> None: """ Initialize the LangfuseConnector component. @@ -174,7 +173,6 @@ def __init__( self.span_handler = span_handler self.host = host self.langfuse_client_kwargs = langfuse_client_kwargs - self.session_id = session_id resolved_langfuse_client_kwargs = { "secret_key": secret_key.resolve_value() if secret_key else None, "public_key": public_key.resolve_value() if public_key else None, @@ -187,12 +185,13 @@ def __init__( name=name, public=public, span_handler=span_handler, - session_id=session_id, ) tracing.enable_tracing(self.tracer) @component.output_types(name=str, trace_url=str, trace_id=str) - def run(self, invocation_context: Optional[Dict[str, Any]] = None) -> Dict[str, str]: + def run( + self, invocation_context: Optional[Dict[str, Any]] = None, session_id: Optional[str] = None + ) -> Dict[str, str]: """ Runs the LangfuseConnector component. @@ -200,36 +199,25 @@ def run(self, invocation_context: Optional[Dict[str, Any]] = None) -> Dict[str, is useful when users want to mark this particular invocation with additional information, e.g. a run id from their own execution framework, user_id, etc. These key-value pairs are then visible in the Langfuse traces. + :param session_id: Optional session ID to group this specific run with other related traces. + If provided, this session_id will be used for the current pipeline run, allowing Langfuse + to group related operations together. :returns: A dictionary with the following keys: - `name`: The name of the tracing component. - `trace_url`: The URL to the tracing data. - `trace_id`: The ID of the trace. """ - # Note: session_id is handled at the trace level, not in invocation context - # The session_id parameter is used when creating traces, not when running the connector + # Set session_id for this run if provided + if session_id is not None: + self.tracer._session_id = session_id logger.debug( - "Langfuse tracer invoked with the following context: '{invocation_context}'", + "Langfuse tracer invoked with the following context: '{invocation_context}' and session_id: '{session_id}'", invocation_context=invocation_context, + session_id=session_id, ) return {"name": self.name, "trace_url": self.tracer.get_trace_url(), "trace_id": self.tracer.get_trace_id()} - def get_session_id(self) -> Optional[str]: - """ - Get the session ID used by this connector. - - :return: The session ID if set, None otherwise. - """ - return self.session_id - - def get_tracer_session_id(self) -> Optional[str]: - """ - Get the session ID from the underlying tracer. - - :return: The session ID if set, None otherwise. - """ - return getattr(self.tracer, "_session_id", None) - def to_dict(self) -> Dict[str, Any]: """ Serialize this component to a dictionary. @@ -254,7 +242,6 @@ def to_dict(self) -> Dict[str, Any]: span_handler=span_handler, host=self.host, langfuse_client_kwargs=langfuse_client_kwargs, - session_id=self.session_id, ) @classmethod diff --git a/integrations/langfuse/tests/test_langfuse_connector.py b/integrations/langfuse/tests/test_langfuse_connector.py index 2888bac60b..2d65e9966e 100644 --- a/integrations/langfuse/tests/test_langfuse_connector.py +++ b/integrations/langfuse/tests/test_langfuse_connector.py @@ -69,7 +69,6 @@ def test_to_dict(self, monkeypatch): "span_handler": None, "host": None, "langfuse_client_kwargs": None, - "session_id": None, }, } @@ -112,7 +111,6 @@ def test_to_dict_with_params(self, monkeypatch): }, "host": "https://example.com", "langfuse_client_kwargs": {"timeout": 30.0}, - "session_id": None, }, } @@ -138,7 +136,6 @@ def test_from_dict(self, monkeypatch): "span_handler": None, "host": None, "langfuse_client_kwargs": None, - "session_id": None, }, } langfuse_connector = LangfuseConnector.from_dict(data) @@ -149,7 +146,6 @@ def test_from_dict(self, monkeypatch): assert langfuse_connector.span_handler is None assert langfuse_connector.host is None assert langfuse_connector.langfuse_client_kwargs is None - assert langfuse_connector.session_id is None def test_from_dict_with_params(self, monkeypatch): monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") @@ -179,7 +175,6 @@ def test_from_dict_with_params(self, monkeypatch): }, "host": "https://example.com", "langfuse_client_kwargs": {"timeout": 30.0}, - "session_id": None, }, } @@ -191,7 +186,6 @@ def test_from_dict_with_params(self, monkeypatch): assert isinstance(langfuse_connector.span_handler, CustomSpanHandler) assert langfuse_connector.host == "https://example.com" assert langfuse_connector.langfuse_client_kwargs == {"timeout": 30.0} - assert langfuse_connector.session_id is None def test_pipeline_serialization(self, monkeypatch): # Set test env vars @@ -231,3 +225,31 @@ def test_pipeline_serialization(self, monkeypatch): # Verify pipeline is the same assert new_pipe == pipe + + def test_run_with_session_id(self, monkeypatch): + """Test that session_id can be passed as a runtime parameter""" + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + + langfuse_connector = LangfuseConnector(name="Session Test") + + # Mock the tracer methods to avoid real API calls + mock_tracer = Mock() + mock_tracer.get_trace_url.return_value = "https://example.com/trace" + mock_tracer.get_trace_id.return_value = "12345" + langfuse_connector.tracer = mock_tracer + + # Test with session_id + response = langfuse_connector.run(session_id="user_123_session_456") + assert response["name"] == "Session Test" + assert response["trace_url"] == "https://example.com/trace" + assert response["trace_id"] == "12345" + + # Verify session_id was set on the tracer + assert langfuse_connector.tracer._session_id == "user_123_session_456" + + # Test without session_id + response = langfuse_connector.run() + assert response["name"] == "Session Test" + assert response["trace_url"] == "https://example.com/trace" + assert response["trace_id"] == "12345" diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 71e4e844c2..4637427b64 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -2,6 +2,11 @@ # # SPDX-License-Identifier: Apache-2.0 +import os + +# Enable content tracing for tests - must be set before importing haystack modules +os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + import asyncio import datetime import json @@ -444,11 +449,12 @@ def run(self, input_data: str): result = self.internal_pipeline.run({"parser": {"data": input_data}}) return {"result": result["parser"]["result"]} - tracer = LangfuseConnector("test") + # Create a mock tracer instead of a real LangfuseConnector to avoid API calls + tracer = LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False) main_pipeline = Pipeline() main_pipeline.add_component("nested_component", ComponentWithNestedPipeline()) - main_pipeline.add_component("tracer", tracer) + # Don't add the tracer to the pipeline since we're testing the tracer directly # Test 1: First run will fail and should clean up context try: @@ -457,13 +463,13 @@ def run(self, input_data: str): pass # Expected to fail # Critical assertion: context should be empty after failed operation - assert tracer.tracer.current_span() is None + assert tracer.current_span() is None # Test 2: Second run should work normally with clean context main_pipeline.run({"nested_component": {"input_data": '{"key": "valid"}'}}) # Critical assertion: context should be empty after successful operation - assert tracer.tracer.current_span() is None + assert tracer.current_span() is None def test_async_concurrency_span_isolation(self): """ From 8866d1a05d5e8dc81c726bf6a9148b8e29fd5a54 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Sun, 17 Aug 2025 22:48:48 +0200 Subject: [PATCH 4/7] Update docs --- .../connectors/langfuse/langfuse_connector.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 685f24c527..9fc1013fe7 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -116,15 +116,20 @@ def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: **Using Sessions to Group Related Traces:** ```python - # Create a connector with a session ID to group all related traces - tracer = LangfuseConnector( - name="Agent Workflow", - session_id="user_123_workflow_456" + # Create a connector + tracer = LangfuseConnector(name="Agent Workflow") + + # Pass session_id at runtime to group related traces + response = tracer.run(session_id="user_123_workflow_456") + + # Or when used in a pipeline + response = pipeline.run( + data={"your_data": "value"}, + tracer={"session_id": "user_123_workflow_456"} ) - # All traces created by this connector will use the same session_id - # Langfuse will automatically group them as one session in the UI - # This is useful for complex workflows with multiple agents or pipelines + # All traces with the same session_id will be grouped together + # in the Langfuse UI, making it easier to correlate related operations ``` """ @@ -162,9 +167,6 @@ def __init__( :param langfuse_client_kwargs: Optional custom configuration for the Langfuse client. This is a dictionary containing any additional configuration options for the Langfuse client. See the Langfuse documentation for more details on available configuration options. - :param session_id: Optional session ID to group multiple traces together. If provided, all traces created - by this connector will be grouped under the same session in Langfuse, making it easier to correlate - related operations (e.g., all agents in a workflow). """ self.name = name self.public = public From 1c5ead0bbd5ba0e4f9f7b4d9e4c6fcf824530707 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 18 Aug 2025 16:24:16 +0200 Subject: [PATCH 5/7] Restore hierarchical traces --- .../tracing/langfuse/tracer.py | 95 ++++++++++++++++--- integrations/langfuse/tests/test_tracer.py | 80 ++++++++++++++++ 2 files changed, 160 insertions(+), 15 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 40b36f4e18..9510118f7f 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -9,6 +9,7 @@ from contextvars import ContextVar from dataclasses import dataclass from datetime import datetime +from threading import RLock from typing import Any, Dict, Iterator, List, Optional, Union from haystack import default_from_dict, default_to_dict, logging @@ -69,6 +70,16 @@ # Manages parent-child relationships and prevents cross-request span interleaving span_stack_var: ContextVar[Optional[List["LangfuseSpan"]]] = ContextVar("span_stack", default=None) +# Root trace client for the current logical request/session. This is used as a +# stable anchor to attach child spans across async/task boundaries where the +# span stack might not be directly available. +root_trace_client_var: ContextVar[Optional[StatefulTraceClient]] = ContextVar("root_trace_client", default=None) + +# Process-wide registry to anchor a single root trace per session_id, +# independent of ContextVar/task boundaries. +_root_traces_by_session: Dict[str, StatefulTraceClient] = {} +_root_registry_lock = RLock() + class LangfuseSpan(Span): """ @@ -173,6 +184,9 @@ class SpanContext: trace_name: str = "Haystack" public: bool = False session_id: Optional[str] = None + # Additional execution context information (observability only) + execution_context: Optional[str] = None + hierarchy_level: Optional[int] = None def __post_init__(self) -> None: """ @@ -272,25 +286,54 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: # Get external tracing context for root trace creation (correlation metadata) tracing_ctx = tracing_context_var.get({}) - if not context.parent_span: - # Create a new trace when there's no parent span - # Use session_id from context if provided, otherwise fall back to external context + # Determine effective parent using current span stack if parent_span is not given + current_stack = span_stack_var.get() + effective_parent = context.parent_span + if effective_parent is None and current_stack: + effective_parent = current_stack[-1] + + if not effective_parent: + # If there's no effective parent but we have an existing root trace in the context, + # create a child span under that trace. This preserves hierarchy across async/task boundaries + # where the span stack might not be propagated but the context is. + root_trace_client = root_trace_client_var.get() or tracing_ctx.get("root_trace_client") + if root_trace_client is not None: + if context.component_type in _ALL_SUPPORTED_GENERATORS: + return LangfuseSpan(root_trace_client.generation(name=context.name)) + return LangfuseSpan(root_trace_client.span(name=context.name)) + + # Otherwise, create a brand new trace and store it in the context for nested tasks to use session_id = context.session_id or tracing_ctx.get("session_id") - return LangfuseSpan( - self.tracer.trace( - name=context.trace_name, - public=context.public, - id=tracing_ctx.get("trace_id"), - user_id=tracing_ctx.get("user_id"), - session_id=session_id, - tags=tracing_ctx.get("tags"), - version=tracing_ctx.get("version"), - ) + + # Try to reuse a process-wide root for this session if available + if session_id: + with _root_registry_lock: + existing = _root_traces_by_session.get(session_id) + if existing is not None: + if context.component_type in _ALL_SUPPORTED_GENERATORS: + return LangfuseSpan(existing.generation(name=context.name)) + return LangfuseSpan(existing.span(name=context.name)) + + new_trace = self.tracer.trace( + name=context.trace_name, + public=context.public, + id=tracing_ctx.get("trace_id"), + user_id=tracing_ctx.get("user_id"), + session_id=session_id, + tags=tracing_ctx.get("tags"), + version=tracing_ctx.get("version"), ) + # Store the created root trace client in the context var for reuse + root_trace_client_var.set(new_trace) + # Also store in process-wide registry keyed by session to survive task/thread boundaries + if session_id: + with _root_registry_lock: + _root_traces_by_session[session_id] = new_trace + return LangfuseSpan(new_trace) elif context.component_type in _ALL_SUPPORTED_GENERATORS: - return LangfuseSpan(context.parent_span.raw_span().generation(name=context.name)) + return LangfuseSpan(effective_parent.raw_span().generation(name=context.name)) else: - return LangfuseSpan(context.parent_span.raw_span().span(name=context.name)) + return LangfuseSpan(effective_parent.raw_span().span(name=context.name)) def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: # If the span is at the pipeline level, we add input and output keys to the span @@ -389,6 +432,9 @@ def trace( component_type = tags.get(_COMPONENT_TYPE_KEY) # Create a new span context + current_stack = span_stack_var.get() + hierarchy_level = len(current_stack or []) + execution_context = self._determine_execution_context(operation_name, component_type) span_context = SpanContext( name=span_name, operation_name=operation_name, @@ -400,6 +446,8 @@ def trace( trace_name=self._name, public=self._public, session_id=self._session_id, + execution_context=execution_context, + hierarchy_level=hierarchy_level, ) # Create span using the handler @@ -439,6 +487,23 @@ def trace( if self.enforce_flush: self.flush() + # Note: we intentionally do NOT clear root_trace_client_var here. It's a ContextVar scoped + # to the current async Task; when a new top-level trace is created, we overwrite it. + # This allows child operations started after a parent span completed (but still in the same + # logical request/task) to continue attaching to the same root trace. + + def _determine_execution_context(self, operation_name: str, component_type: Optional[str]) -> str: + """Classify execution context for observability only.""" + try: + normalized_op = (operation_name or "").lower() + if "haystack.pipeline.run" in normalized_op or _PIPELINE_RUN_KEY in normalized_op: + return "pipeline" + if component_type and "agent" in component_type.lower(): + return "agent" + return "component" + except Exception: + return "component" + def flush(self) -> None: self._tracer.flush() diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index 4637427b64..c6d61df0c6 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -26,6 +26,8 @@ LangfuseSpan, LangfuseTracer, SpanContext, + span_stack_var, + tracing_context_var, ) @@ -147,6 +149,50 @@ def test_post_init(self): class TestDefaultSpanHandler: + def test_create_span_uses_stack_parent_when_parent_missing(self): + handler = DefaultSpanHandler() + handler.init_tracer(MockTracer()) + + # Prepare a current parent on the span stack + parent_span = LangfuseSpan(MockSpan(name="parent")) + token = span_stack_var.set([parent_span]) + try: + context = SpanContext( + name="child_component", + operation_name="haystack.component.run", + component_type="SomeComponent", + tags={}, + parent_span=None, + ) + child = handler.create_span(context) + # Should create a child span, not a new trace (which would have name == trace_name) + assert isinstance(child, LangfuseSpan) + assert child.raw_span()._name == "child_component" + finally: + span_stack_var.reset(token) + + def test_create_generation_span_uses_stack_parent_when_parent_missing(self): + handler = DefaultSpanHandler() + handler.init_tracer(MockTracer()) + + # Prepare a current parent on the span stack + parent_span = LangfuseSpan(MockSpan(name="parent")) + token = span_stack_var.set([parent_span]) + try: + context = SpanContext( + name="llm_call", + operation_name="haystack.component.run", + component_type="OpenAIChatGenerator", + tags={}, + parent_span=None, + ) + gen = handler.create_span(context) + assert isinstance(gen, LangfuseSpan) + # generation spans also use provided name + assert gen.raw_span()._name == "llm_call" + finally: + span_stack_var.reset(token) + def test_handle_generator(self): mock_span = Mock() mock_span.raw_span.return_value = mock_span @@ -542,3 +588,37 @@ async def run_concurrent_traces(): assert task2_spans[1][2] == task2_inner # current_span during inner assert task2_spans[2][2] == task2_outer # current_span after inner assert task2_spans[3][2] is None # current_span after outer + + def test_root_trace_reuse_via_context_when_stack_absent(self): + """When a root trace exists in tracing_context_var, subsequent spans without parent should attach to it.""" + tracer = LangfuseTracer(tracer=MockTracer(), name="Root Trace", public=False) + + # Start a root span (creates root trace and stores it in context) + with tracer.trace("haystack.pipeline.run") as root_span: + assert tracer.current_span() == root_span + + # Simulate a boundary where the span stack is absent (new task/callback without propagated context) + # but tracing_context_var is still available within the same logical request + ctx = tracing_context_var.get({}) + assert "root_trace_client" not in ctx # root should be cleared after stack empty + + # Recreate the root trace explicitly to simulate preservation across boundaries + # by setting the root_trace_client in context + mock_root = MockSpan(name="Root Trace") + new_ctx = {**ctx, "root_trace_client": mock_root} + token = tracing_context_var.set(new_ctx) + try: + # Now create a child without an explicit parent and without a span stack + handler = DefaultSpanHandler() + handler.init_tracer(MockTracer()) + context = SpanContext( + name="child_after_boundary", + operation_name="haystack.component.run", + component_type="SomeComponent", + tags={}, + parent_span=None, + ) + child = handler.create_span(context) + assert child.raw_span()._name == "child_after_boundary" + finally: + tracing_context_var.reset(token) From 2c5e4fc9d763d7dc31579d901cf81675a1ca2ec5 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 19 Aug 2025 14:06:40 +0200 Subject: [PATCH 6/7] Better solution for hier tracing --- integrations/langfuse/README.md | 18 ++++++--- .../connectors/langfuse/langfuse_connector.py | 12 +++--- .../tracing/langfuse/tracer.py | 40 +++++-------------- integrations/langfuse/tests/conftest.py | 31 ++++++++++++++ 4 files changed, 58 insertions(+), 43 deletions(-) create mode 100644 integrations/langfuse/tests/conftest.py diff --git a/integrations/langfuse/README.md b/integrations/langfuse/README.md index 52aa0608f9..8bcdb66569 100644 --- a/integrations/langfuse/README.md +++ b/integrations/langfuse/README.md @@ -94,8 +94,10 @@ print(response["tracer"]["trace_id"]) # Using session_id to group related traces response = pipe.run( - data={"prompt_builder": {"template_variables": {"location": "Paris"}, "template": messages}}, - tracer={"session_id": "user_123_conversation_456"} + data={ + "prompt_builder": {"template_variables": {"location": "Paris"}, "template": messages}, + "tracer": {"session_id": "user_123_conversation_456"}, + } ) print(response["llm"]["replies"][0]) print(response["tracer"]["trace_url"]) @@ -116,14 +118,18 @@ You can group related traces together by passing a `session_id` when running the ```python # Group traces by user session response = pipe.run( - data={"prompt_builder": {"template_variables": {"location": "Tokyo"}, "template": messages}}, - tracer={"session_id": "user_123_session_456"} + data={ + "prompt_builder": {"template_variables": {"location": "Tokyo"}, "template": messages}, + "tracer": {"session_id": "user_123_session_456"}, + } ) # Group traces by workflow response = pipe.run( - data={"prompt_builder": {"template_variables": {"location": "London"}, "template": messages}}, - tracer={"session_id": "workflow_789_step_2"} + data={ + "prompt_builder": {"template_variables": {"location": "London"}, "template": messages}, + "tracer": {"session_id": "workflow_789_step_2"}, + } ) ``` diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 9fc1013fe7..4e16e21c83 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -116,16 +116,16 @@ def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: **Using Sessions to Group Related Traces:** ```python - # Create a connector + # Create a connector and add it to the pipeline first tracer = LangfuseConnector(name="Agent Workflow") + pipeline.add_component("tracer", tracer) # Pass session_id at runtime to group related traces - response = tracer.run(session_id="user_123_workflow_456") - - # Or when used in a pipeline response = pipeline.run( - data={"your_data": "value"}, - tracer={"session_id": "user_123_workflow_456"} + data={ + "your_data": "value", + "tracer": {"session_id": "user_123_workflow_456"}, + } ) # All traces with the same session_id will be grouped together diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 9510118f7f..2d42e57ffb 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -75,11 +75,6 @@ # span stack might not be directly available. root_trace_client_var: ContextVar[Optional[StatefulTraceClient]] = ContextVar("root_trace_client", default=None) -# Process-wide registry to anchor a single root trace per session_id, -# independent of ContextVar/task boundaries. -_root_traces_by_session: Dict[str, StatefulTraceClient] = {} -_root_registry_lock = RLock() - class LangfuseSpan(Span): """ @@ -184,9 +179,6 @@ class SpanContext: trace_name: str = "Haystack" public: bool = False session_id: Optional[str] = None - # Additional execution context information (observability only) - execution_context: Optional[str] = None - hierarchy_level: Optional[int] = None def __post_init__(self) -> None: """ @@ -222,6 +214,9 @@ class SpanHandler(ABC): def __init__(self) -> None: self.tracer: Optional[langfuse.Langfuse] = None + # Instance-scoped registry to anchor a single root trace per session_id + self._root_traces_by_session: Dict[str, StatefulTraceClient] = {} + self._root_registry_lock = RLock() def init_tracer(self, tracer: langfuse.Langfuse) -> None: """ @@ -305,10 +300,10 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: # Otherwise, create a brand new trace and store it in the context for nested tasks to use session_id = context.session_id or tracing_ctx.get("session_id") - # Try to reuse a process-wide root for this session if available + # Try to reuse an existing root for this session if available on this handler instance if session_id: - with _root_registry_lock: - existing = _root_traces_by_session.get(session_id) + with self._root_registry_lock: + existing = self._root_traces_by_session.get(session_id) if existing is not None: if context.component_type in _ALL_SUPPORTED_GENERATORS: return LangfuseSpan(existing.generation(name=context.name)) @@ -325,10 +320,10 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: ) # Store the created root trace client in the context var for reuse root_trace_client_var.set(new_trace) - # Also store in process-wide registry keyed by session to survive task/thread boundaries + # Also store for this session on this handler to enable reuse across top-level spans if session_id: - with _root_registry_lock: - _root_traces_by_session[session_id] = new_trace + with self._root_registry_lock: + self._root_traces_by_session[session_id] = new_trace return LangfuseSpan(new_trace) elif context.component_type in _ALL_SUPPORTED_GENERATORS: return LangfuseSpan(effective_parent.raw_span().generation(name=context.name)) @@ -432,9 +427,6 @@ def trace( component_type = tags.get(_COMPONENT_TYPE_KEY) # Create a new span context - current_stack = span_stack_var.get() - hierarchy_level = len(current_stack or []) - execution_context = self._determine_execution_context(operation_name, component_type) span_context = SpanContext( name=span_name, operation_name=operation_name, @@ -446,8 +438,6 @@ def trace( trace_name=self._name, public=self._public, session_id=self._session_id, - execution_context=execution_context, - hierarchy_level=hierarchy_level, ) # Create span using the handler @@ -492,18 +482,6 @@ def trace( # This allows child operations started after a parent span completed (but still in the same # logical request/task) to continue attaching to the same root trace. - def _determine_execution_context(self, operation_name: str, component_type: Optional[str]) -> str: - """Classify execution context for observability only.""" - try: - normalized_op = (operation_name or "").lower() - if "haystack.pipeline.run" in normalized_op or _PIPELINE_RUN_KEY in normalized_op: - return "pipeline" - if component_type and "agent" in component_type.lower(): - return "agent" - return "component" - except Exception: - return "component" - def flush(self) -> None: self._tracer.flush() diff --git a/integrations/langfuse/tests/conftest.py b/integrations/langfuse/tests/conftest.py new file mode 100644 index 0000000000..f2fa7ed03f --- /dev/null +++ b/integrations/langfuse/tests/conftest.py @@ -0,0 +1,31 @@ +import os +import pytest + +# Enable content tracing for tests - must be set before importing haystack modules +os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + +from haystack_integrations.tracing.langfuse.tracer import ( + tracing_context_var, + span_stack_var, + root_trace_client_var, +) + + +@pytest.fixture(autouse=True) +def reset_tracing_globals(): + """Ensure tracing-related ContextVars are clean for every test. + + This prevents cross-test contamination where roots/spans leak via ContextVars + or per-task stacks, causing order-dependent flakiness. + """ + token_ctx = tracing_context_var.set({}) + token_stack = span_stack_var.set(None) + token_root = root_trace_client_var.set(None) + try: + yield + finally: + tracing_context_var.reset(token_ctx) + span_stack_var.reset(token_stack) + root_trace_client_var.reset(token_root) + + From f963c0fabed50086ce2e32b266bc75b372ff3fa2 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 19 Aug 2025 16:31:06 +0200 Subject: [PATCH 7/7] Add comments --- .../tracing/langfuse/tracer.py | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 2d42e57ffb..f2672d24ee 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -70,9 +70,10 @@ # Manages parent-child relationships and prevents cross-request span interleaving span_stack_var: ContextVar[Optional[List["LangfuseSpan"]]] = ContextVar("span_stack", default=None) -# Root trace client for the current logical request/session. This is used as a -# stable anchor to attach child spans across async/task boundaries where the -# span stack might not be directly available. +# Root trace client for the current logical request/session. Acts as a stable +# anchor to attach child spans across async/task boundaries where the span stack +# might not be directly available. Not cleared at span end on purpose; overwritten +# when a new top-level trace is created in the same logical task. root_trace_client_var: ContextVar[Optional[StatefulTraceClient]] = ContextVar("root_trace_client", default=None) @@ -268,7 +269,16 @@ def to_dict(self) -> Dict[str, Any]: class DefaultSpanHandler(SpanHandler): - """DefaultSpanHandler provides the default Langfuse tracing behavior for Haystack.""" + """DefaultSpanHandler provides the default Langfuse tracing behavior for Haystack. + + Attachment precedence when creating a top-level span (no effective parent): + 1) Use implicit root from root_trace_client_var if set (same logical task) + 2) Reuse per-session root from this handler instance via session_id + 3) Create a brand new root trace + + Rationale: ensures single hierarchical trace even when ContextVars don't + propagate (threads/executors), by falling back to a session-scoped registry. + """ def create_span(self, context: SpanContext) -> LangfuseSpan: if self.tracer is None: @@ -279,7 +289,7 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: ) raise RuntimeError(message) - # Get external tracing context for root trace creation (correlation metadata) + # External correlation metadata used when creating a new root tracing_ctx = tracing_context_var.get({}) # Determine effective parent using current span stack if parent_span is not given current_stack = span_stack_var.get() @@ -288,16 +298,17 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: effective_parent = current_stack[-1] if not effective_parent: - # If there's no effective parent but we have an existing root trace in the context, - # create a child span under that trace. This preserves hierarchy across async/task boundaries - # where the span stack might not be propagated but the context is. + # If there's no effective parent but we have an implicit root in the current task + # context, create a child under that. This preserves hierarchy across async/task + # boundaries where the span stack isn't propagated but the task-local root is. root_trace_client = root_trace_client_var.get() or tracing_ctx.get("root_trace_client") if root_trace_client is not None: if context.component_type in _ALL_SUPPORTED_GENERATORS: return LangfuseSpan(root_trace_client.generation(name=context.name)) return LangfuseSpan(root_trace_client.span(name=context.name)) - # Otherwise, create a brand new trace and store it in the context for nested tasks to use + # Otherwise, try to reuse a per-session root maintained on this handler instance, + # or create a brand new root and cache it for future top-level spans in the same session. session_id = context.session_id or tracing_ctx.get("session_id") # Try to reuse an existing root for this session if available on this handler instance