Skip to content
35 changes: 35 additions & 0 deletions packages/sample-app/sample_app/multiple_span_processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import task
from openai import OpenAI
import os

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


traceloop_processor = Traceloop.get_default_span_processor(disable_batch=True)

console_processor = SimpleSpanProcessor(ConsoleSpanExporter())

Traceloop.init(processor=[traceloop_processor, console_processor])


@task(name="joke_creation", version=1)
def create_joke():
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke about opentelemetry multiple span processors"}],
)

result = completion.choices[0].message.content
print(result)
return result


def main():
create_joke()


if __name__ == "__main__":
main()
60 changes: 60 additions & 0 deletions packages/traceloop-sdk/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,63 @@ def exporter_with_no_metrics():
if _trace_wrapper_instance:
TracerWrapper.instance = _trace_wrapper_instance
os.environ["TRACELOOP_METRICS_ENABLED"] = "true"


@pytest.fixture
def exporters_with_multiple_span_processors():
# Clear singleton if existed
if hasattr(TracerWrapper, "instance"):
_trace_wrapper_instance = TracerWrapper.instance
del TracerWrapper.instance

class CustomSpanProcessor(SimpleSpanProcessor):
def on_start(self, span, parent_context=None):
span.set_attribute("custom_processor", "enabled")
span.set_attribute("processor_type", "custom")

class MetricsSpanProcessor(SimpleSpanProcessor):
def __init__(self, exporter):
super().__init__(exporter)
self.span_count = 0

def on_start(self, span, parent_context=None):
self.span_count += 1
span.set_attribute("metrics_processor", "enabled")
span.set_attribute("span_count", self.span_count)

# Create exporters for different processors
default_exporter = InMemorySpanExporter()
custom_exporter = InMemorySpanExporter()
metrics_exporter = InMemorySpanExporter()

# Get the default Traceloop processor
default_processor = Traceloop.get_default_span_processor(
disable_batch=True,
exporter=default_exporter
)

# Create custom processors
custom_processor = CustomSpanProcessor(custom_exporter)
metrics_processor = MetricsSpanProcessor(metrics_exporter)

# Initialize with multiple processors
processors = [default_processor, custom_processor, metrics_processor]

Traceloop.init(
app_name="test_multiple_processors",
api_endpoint="http://localhost:4318", # Use local endpoint to avoid API key requirement
processor=processors,
disable_batch=True,
)

# Return all exporters so we can verify each processor worked
yield {
"default": default_exporter,
"custom": custom_exporter,
"metrics": metrics_exporter,
"processor": processors
}

# Restore singleton if any
if _trace_wrapper_instance:
TracerWrapper.instance = _trace_wrapper_instance
75 changes: 75 additions & 0 deletions packages/traceloop-sdk/tests/test_sdk_initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,78 @@ def run_workflow():
spans = exporter_with_no_metrics.get_finished_spans()
workflow_span = spans[0]
assert workflow_span


def test_multiple_span_processors(exporters_with_multiple_span_processors):
"""Test that multiple span processors work correctly together."""
from traceloop.sdk.decorators import workflow, task

@task(name="test_task")
def test_task():
return "task_result"

@workflow(name="test_workflow")
def test_workflow():
return test_task()

# Run the workflow to generate spans
result = test_workflow()
assert result == "task_result"

exporters = exporters_with_multiple_span_processors

# Check that all processors received spans
default_spans = exporters["default"].get_finished_spans()
custom_spans = exporters["custom"].get_finished_spans()
metrics_spans = exporters["metrics"].get_finished_spans()

# All processors should have received the spans
assert len(default_spans) == 2, "Default processor should have received spans"
assert len(custom_spans) == 2, "Custom processor should have received spans"
assert len(metrics_spans) == 2, "Metrics processor should have received spans"

# Verify that the default processor (Traceloop) added its attributes
default_span = default_spans[0]
# The default processor should have Traceloop-specific attributes
assert hasattr(default_span, 'attributes')

# Verify that custom processor added its attributes
custom_span = custom_spans[0]
assert custom_span.attributes.get("custom_processor") == "enabled"
assert custom_span.attributes.get("processor_type") == "custom"

# Verify that metrics processor added its attributes
# Now that we fixed the double-call bug, the span_count should be correct
workflow_spans = [s for s in metrics_spans if "workflow" in s.name]
task_spans = [s for s in metrics_spans if "task" in s.name]
assert len(workflow_spans) == 1
assert len(task_spans) == 1

# The workflow span should be processed first (span_count=1)
# The task span should be processed second (span_count=2)
workflow_span = workflow_spans[0]
task_span = task_spans[0]

assert workflow_span.attributes.get("metrics_processor") == "enabled"
assert workflow_span.attributes.get("span_count") == 1

assert task_span.attributes.get("metrics_processor") == "enabled"
assert task_span.attributes.get("span_count") == 2


def test_get_default_span_processor():
"""Test that get_default_span_processor returns a valid processor."""
from traceloop.sdk import Traceloop
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, BatchSpanProcessor

# Test with batch disabled
processor = Traceloop.get_default_span_processor(disable_batch=True)
assert isinstance(processor, SimpleSpanProcessor)
assert hasattr(processor, "_traceloop_processor")
assert getattr(processor, "_traceloop_processor") is True

# Test with batch enabled
processor = Traceloop.get_default_span_processor(disable_batch=False)
assert isinstance(processor, BatchSpanProcessor)
assert hasattr(processor, "_traceloop_processor")
assert getattr(processor, "_traceloop_processor") is True
4 changes: 2 additions & 2 deletions packages/traceloop-sdk/tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def joke_workflow():
assert open_ai_span.attributes.get("traceloop.prompt.template") == "Tell me a {what} about {subject}"
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.what") == "joke"
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.subject") == "OpenTelemetry"
assert open_ai_span.attributes.get("traceloop.prompt.version") == 5
assert open_ai_span.attributes.get("traceloop.prompt.version") == "5"

workflow_span = next(span for span in spans if span.name == "pirate_joke_generator.workflow")
task_span = next(span for span in spans if span.name == "something_creator.task")
Expand Down Expand Up @@ -95,7 +95,7 @@ async def joke_workflow():
assert open_ai_span.attributes.get("traceloop.prompt.template") == "Tell me a {what} about {subject}"
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.what") == "joke"
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.subject") == "OpenTelemetry"
assert open_ai_span.attributes.get("traceloop.prompt.version") == 5
assert open_ai_span.attributes.get("traceloop.prompt.version") == "5"

workflow_span = next(span for span in spans if span.name == "pirate_joke_generator.workflow")
task_span = next(span for span in spans if span.name == "something_creator.task")
Expand Down
47 changes: 45 additions & 2 deletions packages/traceloop-sdk/traceloop/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from pathlib import Path

from typing import Callable, Optional, Set
from typing import Callable, List, Optional, Set, Union
from colorama import Fore
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
from opentelemetry.sdk.trace.sampling import Sampler
Expand Down Expand Up @@ -59,7 +59,7 @@ def init(
metrics_headers: Dict[str, str] = None,
logging_exporter: LogExporter = None,
logging_headers: Dict[str, str] = None,
processor: Optional[SpanProcessor] = None,
processor: Optional[Union[SpanProcessor, List[SpanProcessor]]] = None,
propagator: TextMapPropagator = None,
sampler: Optional[Sampler] = None,
traceloop_sync_enabled: bool = False,
Expand Down Expand Up @@ -207,6 +207,49 @@ def set_association_properties(properties: dict) -> None:
def set_prompt(template: str, variables: dict, version: int):
set_external_prompt_tracing_context(template, variables, version)

@staticmethod
def get_default_span_processor(
disable_batch: bool = False,
api_endpoint: Optional[str] = None,
api_key: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
exporter: Optional[SpanExporter] = None
) -> SpanProcessor:
"""
Creates and returns the default Traceloop span processor.

This function allows users to get the default Traceloop span processor
to combine it with their custom processors when using the processors parameter.

Args:
disable_batch: If True, uses SimpleSpanProcessor, otherwise BatchSpanProcessor
api_endpoint: The endpoint URL for the exporter (uses current config if None)
headers: Headers for the exporter (uses current config if None)
exporter: Custom exporter to use (creates default if None)

Returns:
SpanProcessor: The default Traceloop span processor

Example:
# Get the default processor and combine with custom one
default_processor = Traceloop.get_default_span_processor()
custom_processor = MyCustomSpanProcessor()

Traceloop.init(
processors=[default_processor, custom_processor]
)
"""
from traceloop.sdk.tracing.tracing import get_default_span_processor
if headers is None:
if api_key is None:
api_key = os.getenv("TRACELOOP_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
}
if api_endpoint is None:
api_endpoint = os.getenv("TRACELOOP_BASE_URL")
return get_default_span_processor(disable_batch, api_endpoint, headers, exporter)

@staticmethod
def get():
"""
Expand Down
Loading
Loading