Skip to content

fix: LangfuseTracer is not thread-safe, causing mixed traces in async environments #2188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions integrations/langfuse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,50 @@ response = pipe.run(
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
print(response["tracer"]["trace_id"])

# Using session_id to group related traces
response = pipe.run(
data={
"prompt_builder": {"template_variables": {"location": "Paris"}, "template": messages},
"tracer": {"session_id": "user_123_conversation_456"},
}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
print(response["tracer"]["trace_id"])
```

In this example, we add the `LangfuseConnector` to the pipeline with the name "tracer".
Each run of the pipeline produces one trace viewable on the Langfuse website with a specific URL.
The trace captures the entire execution context, including the prompts, completions, and metadata.

### Using Session IDs

You can group related traces together by passing a `session_id` when running the pipeline. This is useful for:
- **Multi-turn conversations**: Group all messages in a chat session
- **Workflow tracking**: Group all operations in a user workflow
- **Debugging**: Correlate related traces across different pipeline runs

```python
# Group traces by user session
response = pipe.run(
data={
"prompt_builder": {"template_variables": {"location": "Tokyo"}, "template": messages},
"tracer": {"session_id": "user_123_session_456"},
}
)

# Group traces by workflow
response = pipe.run(
data={
"prompt_builder": {"template_variables": {"location": "London"}, "template": messages},
"tracer": {"session_id": "workflow_789_step_2"},
}
)
```

All traces with the same `session_id` will be grouped together in the Langfuse UI, making it easier to analyze related operations.

## Trace Visualization

Langfuse provides a user-friendly interface to visualize and analyze the traces generated by your Haystack pipeline.
Expand Down
6 changes: 5 additions & 1 deletion integrations/langfuse/example/basic_rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ def get_pipeline(document_store: InMemoryDocumentStore):

pipeline = get_pipeline(document_store)
question = "What does Rhodes Statue look like?"
response = pipeline.run({"text_embedder": {"text": question}, "prompt_builder": {"question": question}})
response = pipeline.run({
"text_embedder": {"text": question},
"prompt_builder": {"question": question},
"tracer": {"session_id": "rag_session_001"}
})

print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
1 change: 1 addition & 0 deletions integrations/langfuse/example/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
},
"tracer": {
"invocation_context": {"some_key": "some_value"},
"session_id": "user_123_chat_session",
},
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,25 @@ def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:

connector = LangfuseConnector(span_handler=CustomSpanHandler())
```

**Using Sessions to Group Related Traces:**

```python
# Create a connector and add it to the pipeline first
tracer = LangfuseConnector(name="Agent Workflow")
pipeline.add_component("tracer", tracer)

# Pass session_id at runtime to group related traces
response = pipeline.run(
data={
"your_data": "value",
"tracer": {"session_id": "user_123_workflow_456"},
}
)

# All traces with the same session_id will be grouped together
# in the Langfuse UI, making it easier to correlate related operations
```
"""

def __init__(
Expand Down Expand Up @@ -172,22 +191,32 @@ def __init__(
tracing.enable_tracing(self.tracer)

@component.output_types(name=str, trace_url=str, trace_id=str)
def run(self, invocation_context: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
def run(
self, invocation_context: Optional[Dict[str, Any]] = None, session_id: Optional[str] = None
) -> Dict[str, str]:
"""
Runs the LangfuseConnector component.

:param invocation_context: A dictionary with additional context for the invocation. This parameter
is useful when users want to mark this particular invocation with additional information, e.g.
a run id from their own execution framework, user id, etc. These key-value pairs are then visible
a run id from their own execution framework, user_id, etc. These key-value pairs are then visible
in the Langfuse traces.
:param session_id: Optional session ID to group this specific run with other related traces.
If provided, this session_id will be used for the current pipeline run, allowing Langfuse
to group related operations together.
:returns: A dictionary with the following keys:
- `name`: The name of the tracing component.
- `trace_url`: The URL to the tracing data.
- `trace_id`: The ID of the trace.
"""
# Set session_id for this run if provided
if session_id is not None:
self.tracer._session_id = session_id

logger.debug(
"Langfuse tracer invoked with the following context: '{invocation_context}'",
"Langfuse tracer invoked with the following context: '{invocation_context}' and session_id: '{session_id}'",
invocation_context=invocation_context,
session_id=session_id,
)
return {"name": self.name, "trace_url": self.tracer.get_trace_url(), "trace_id": self.tracer.get_trace_id()}

Expand Down Expand Up @@ -230,4 +259,5 @@ def from_dict(cls, data: Dict[str, Any]) -> "LangfuseConnector":
init_params["span_handler"] = (
deserialize_class_instance(init_params["span_handler"]) if init_params["span_handler"] else None
)
# session_id is a simple string, no deserialization needed
return default_from_dict(cls, data)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from contextvars import ContextVar
from dataclasses import dataclass
from datetime import datetime
from threading import RLock
from typing import Any, Dict, Iterator, List, Optional, Union

from haystack import default_from_dict, default_to_dict, logging
Expand Down Expand Up @@ -61,10 +62,20 @@
_COMPONENT_OUTPUT_KEY = "haystack.component.output"
_COMPONENT_INPUT_KEY = "haystack.component.input"

# Context var used to keep track of tracing related info.
# This mainly useful for parents spans.
# External session metadata for trace correlation (Haystack system)
# Stores trace_id, user_id, session_id, tags, version for root trace creation
tracing_context_var: ContextVar[Dict[Any, Any]] = ContextVar("tracing_context")

# Internal span execution hierarchy for our tracer
# Manages parent-child relationships and prevents cross-request span interleaving
span_stack_var: ContextVar[Optional[List["LangfuseSpan"]]] = ContextVar("span_stack", default=None)

# Root trace client for the current logical request/session. Acts as a stable
# anchor to attach child spans across async/task boundaries where the span stack
# might not be directly available. Not cleared at span end on purpose; overwritten
# when a new top-level trace is created in the same logical task.
root_trace_client_var: ContextVar[Optional[StatefulTraceClient]] = ContextVar("root_trace_client", default=None)


class LangfuseSpan(Span):
"""
Expand Down Expand Up @@ -168,6 +179,7 @@ class SpanContext:
parent_span: Optional[Span]
trace_name: str = "Haystack"
public: bool = False
session_id: Optional[str] = None

def __post_init__(self) -> None:
"""
Expand Down Expand Up @@ -203,6 +215,9 @@ class SpanHandler(ABC):

def __init__(self) -> None:
self.tracer: Optional[langfuse.Langfuse] = None
# Instance-scoped registry to anchor a single root trace per session_id
self._root_traces_by_session: Dict[str, StatefulTraceClient] = {}
self._root_registry_lock = RLock()

def init_tracer(self, tracer: langfuse.Langfuse) -> None:
"""
Expand Down Expand Up @@ -254,7 +269,16 @@ def to_dict(self) -> Dict[str, Any]:


class DefaultSpanHandler(SpanHandler):
"""DefaultSpanHandler provides the default Langfuse tracing behavior for Haystack."""
"""DefaultSpanHandler provides the default Langfuse tracing behavior for Haystack.

Attachment precedence when creating a top-level span (no effective parent):
1) Use implicit root from root_trace_client_var if set (same logical task)
2) Reuse per-session root from this handler instance via session_id
3) Create a brand new root trace

Rationale: ensures single hierarchical trace even when ContextVars don't
propagate (threads/executors), by falling back to a session-scoped registry.
"""

def create_span(self, context: SpanContext) -> LangfuseSpan:
if self.tracer is None:
Expand All @@ -265,24 +289,57 @@ def create_span(self, context: SpanContext) -> LangfuseSpan:
)
raise RuntimeError(message)

# External correlation metadata used when creating a new root
tracing_ctx = tracing_context_var.get({})
if not context.parent_span:
# Create a new trace when there's no parent span
return LangfuseSpan(
self.tracer.trace(
name=context.trace_name,
public=context.public,
id=tracing_ctx.get("trace_id"),
user_id=tracing_ctx.get("user_id"),
session_id=tracing_ctx.get("session_id"),
tags=tracing_ctx.get("tags"),
version=tracing_ctx.get("version"),
)
# Determine effective parent using current span stack if parent_span is not given
current_stack = span_stack_var.get()
effective_parent = context.parent_span
if effective_parent is None and current_stack:
effective_parent = current_stack[-1]

if not effective_parent:
# If there's no effective parent but we have an implicit root in the current task
# context, create a child under that. This preserves hierarchy across async/task
# boundaries where the span stack isn't propagated but the task-local root is.
root_trace_client = root_trace_client_var.get() or tracing_ctx.get("root_trace_client")
if root_trace_client is not None:
if context.component_type in _ALL_SUPPORTED_GENERATORS:
return LangfuseSpan(root_trace_client.generation(name=context.name))
return LangfuseSpan(root_trace_client.span(name=context.name))

# Otherwise, try to reuse a per-session root maintained on this handler instance,
# or create a brand new root and cache it for future top-level spans in the same session.
session_id = context.session_id or tracing_ctx.get("session_id")

# Try to reuse an existing root for this session if available on this handler instance
if session_id:
with self._root_registry_lock:
existing = self._root_traces_by_session.get(session_id)
if existing is not None:
if context.component_type in _ALL_SUPPORTED_GENERATORS:
return LangfuseSpan(existing.generation(name=context.name))
return LangfuseSpan(existing.span(name=context.name))

new_trace = self.tracer.trace(
name=context.trace_name,
public=context.public,
id=tracing_ctx.get("trace_id"),
user_id=tracing_ctx.get("user_id"),
session_id=session_id,
tags=tracing_ctx.get("tags"),
version=tracing_ctx.get("version"),
)
# Store the created root trace client in the context var for reuse
root_trace_client_var.set(new_trace)
# Also store for this session on this handler to enable reuse across top-level spans
if session_id:
with self._root_registry_lock:
self._root_traces_by_session[session_id] = new_trace
return LangfuseSpan(new_trace)
elif context.component_type in _ALL_SUPPORTED_GENERATORS:
return LangfuseSpan(context.parent_span.raw_span().generation(name=context.name))
return LangfuseSpan(effective_parent.raw_span().generation(name=context.name))
else:
return LangfuseSpan(context.parent_span.raw_span().span(name=context.name))
return LangfuseSpan(effective_parent.raw_span().span(name=context.name))

def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
# If the span is at the pipeline level, we add input and output keys to the span
Expand Down Expand Up @@ -341,6 +398,7 @@ def __init__(
name: str = "Haystack",
public: bool = False,
span_handler: Optional[SpanHandler] = None,
session_id: Optional[str] = None,
) -> None:
"""
Initialize a LangfuseTracer instance.
Expand All @@ -352,6 +410,8 @@ def __init__(
be publicly accessible to anyone with the tracing URL. If set to `False`, the tracing data will be private
and only accessible to the Langfuse account owner.
:param span_handler: Custom handler for processing spans. If None, uses DefaultSpanHandler.
:param session_id: Optional session ID to group multiple traces together. If provided, all traces
created by this tracer will use this session_id, allowing Langfuse to group them as one session.
"""
if not proxy_tracer.is_content_tracing_enabled:
logger.warning(
Expand All @@ -360,9 +420,11 @@ def __init__(
"before importing Haystack."
)
self._tracer = tracer
# Keep _context as deprecated shim to avoid AttributeError if anyone uses it
self._context: List[LangfuseSpan] = []
self._name = name
self._public = public
self._session_id = session_id
self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true"
self._span_handler = span_handler or DefaultSpanHandler()
self._span_handler.init_tracer(tracer)
Expand All @@ -386,12 +448,18 @@ def trace(
parent_span=parent_span or self.current_span(),
trace_name=self._name,
public=self._public,
session_id=self._session_id,
)

# Create span using the handler
span = self._span_handler.create_span(span_context)

self._context.append(span)
# Build new span hierarchy: copy existing stack, add new span, save for restoration
prev_stack = span_stack_var.get()
new_stack = (prev_stack or []).copy()
new_stack.append(span)
token = span_stack_var.set(new_stack)

span.set_tags(tags)

try:
Expand All @@ -414,14 +482,17 @@ def trace(
cleanup_error=cleanup_error,
)
finally:
# CRITICAL: Always pop context to prevent corruption
# This is especially important for nested pipeline scenarios
if self._context and self._context[-1] == span:
self._context.pop()
# Restore previous span stack using saved token - ensures proper cleanup
span_stack_var.reset(token)

if self.enforce_flush:
self.flush()

# Note: we intentionally do NOT clear root_trace_client_var here. It's a ContextVar scoped
# to the current async Task; when a new top-level trace is created, we overwrite it.
# This allows child operations started after a parent span completed (but still in the same
# logical request/task) to continue attaching to the same root trace.

def flush(self) -> None:
self._tracer.flush()

Expand All @@ -431,7 +502,9 @@ def current_span(self) -> Optional[Span]:

:return: The current span if available, else None.
"""
return self._context[-1] if self._context else None
# Get top of span stack (most recent span) from context-local storage
stack = span_stack_var.get()
return stack[-1] if stack else None

def get_trace_url(self) -> str:
"""
Expand Down
31 changes: 31 additions & 0 deletions integrations/langfuse/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import pytest

# Enable content tracing for tests - must be set before importing haystack modules
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack_integrations.tracing.langfuse.tracer import (
tracing_context_var,
span_stack_var,
root_trace_client_var,
)


@pytest.fixture(autouse=True)
def reset_tracing_globals():
"""Ensure tracing-related ContextVars are clean for every test.

This prevents cross-test contamination where roots/spans leak via ContextVars
or per-task stacks, causing order-dependent flakiness.
"""
token_ctx = tracing_context_var.set({})
token_stack = span_stack_var.set(None)
token_root = root_trace_client_var.set(None)
try:
yield
finally:
tracing_context_var.reset(token_ctx)
span_stack_var.reset(token_stack)
root_trace_client_var.reset(token_root)


Loading
Loading