From ffb34b0799e9948067e7310bbe04925d84e1d1b0 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 12:14:29 +0300 Subject: [PATCH 01/12] feat(sdk): support multiple span processors --- .../sample_app/multiple_span_proccesors.py | 36 ++++++ .../traceloop-sdk/traceloop/sdk/__init__.py | 47 +++++++- .../traceloop/sdk/tracing/tracing.py | 106 ++++++++++++++---- 3 files changed, 166 insertions(+), 23 deletions(-) create mode 100644 packages/sample-app/sample_app/multiple_span_proccesors.py diff --git a/packages/sample-app/sample_app/multiple_span_proccesors.py b/packages/sample-app/sample_app/multiple_span_proccesors.py new file mode 100644 index 0000000000..42d820bae9 --- /dev/null +++ b/packages/sample-app/sample_app/multiple_span_proccesors.py @@ -0,0 +1,36 @@ +from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from traceloop.sdk import Traceloop +from traceloop.sdk.tracing.context_manager import get_tracer +from traceloop.sdk.decorators import task +from openai import OpenAI +import os + +client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + + +traceloop_processor = Traceloop.get_default_span_processor(disable_batch=True) +console_processor = SimpleSpanProcessor(ConsoleSpanExporter()) + +Traceloop.init(processors=[traceloop_processor, console_processor]) + + + +@task(name="joke_creation", version=1) +def create_joke(): + completion = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Tell me a joke about opentelemetry multiple span processors"}], + ) + + result = completion.choices[0].message.content + print(result) + return result + + +def main(): + create_joke() + +if __name__ == "__main__": + main() + diff --git a/packages/traceloop-sdk/traceloop/sdk/__init__.py b/packages/traceloop-sdk/traceloop/sdk/__init__.py index 099f08970b..95c3b3e2e7 100644 --- a/packages/traceloop-sdk/traceloop/sdk/__init__.py +++ b/packages/traceloop-sdk/traceloop/sdk/__init__.py @@ -2,7 +2,7 @@ import sys from pathlib import Path -from typing import Callable, Optional, Set +from typing import Callable, List, Optional, Set from colorama import Fore from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan from opentelemetry.sdk.trace.sampling import Sampler @@ -60,6 +60,7 @@ def init( logging_exporter: LogExporter = None, logging_headers: Dict[str, str] = None, processor: Optional[SpanProcessor] = None, + processors: Optional[List[SpanProcessor]] = None, propagator: TextMapPropagator = None, sampler: Optional[Sampler] = None, traceloop_sync_enabled: bool = False, @@ -144,6 +145,7 @@ def init( Traceloop.__tracer_wrapper = TracerWrapper( disable_batch=disable_batch, processor=processor, + processors=processors, propagator=propagator, exporter=exporter, sampler=sampler, @@ -207,6 +209,49 @@ def set_association_properties(properties: dict) -> None: def set_prompt(template: str, variables: dict, version: int): set_external_prompt_tracing_context(template, variables, version) + @staticmethod + def get_default_span_processor( + disable_batch: bool = False, + api_endpoint: Optional[str] = None, + api_key: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + exporter: Optional[SpanExporter] = None + ) -> SpanProcessor: + """ + Creates and returns the default Traceloop span processor. + + This function allows users to get the default Traceloop span processor + to combine it with their custom processors when using the processors parameter. + + Args: + disable_batch: If True, uses SimpleSpanProcessor, otherwise BatchSpanProcessor + api_endpoint: The endpoint URL for the exporter (uses current config if None) + headers: Headers for the exporter (uses current config if None) + exporter: Custom exporter to use (creates default if None) + + Returns: + SpanProcessor: The default Traceloop span processor + + Example: + # Get the default processor and combine with custom one + default_processor = Traceloop.get_default_span_processor() + custom_processor = MyCustomSpanProcessor() + + Traceloop.init( + processors=[default_processor, custom_processor] + ) + """ + from traceloop.sdk.tracing.tracing import get_default_span_processor + if not headers: + if not api_key: + api_key = os.getenv("TRACELOOP_API_KEY") + headers = { + "Authorization": f"Bearer {api_key}", + } + if not api_endpoint: + api_endpoint = os.getenv("TRACELOOP_BASE_URL") + return get_default_span_processor(disable_batch, api_endpoint, headers, exporter) + @staticmethod def get(): """ diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index be910e8734..d09685f796 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -32,7 +32,7 @@ from traceloop.sdk.tracing.content_allow_list import ContentAllowList from traceloop.sdk.utils import is_notebook from traceloop.sdk.utils.package_check import is_package_installed -from typing import Callable, Dict, Optional, Set +from typing import Callable, Dict, List, Optional, Set TRACER_NAME = "traceloop.tracer" @@ -67,7 +67,8 @@ class TracerWrapper(object): def __new__( cls, disable_batch=False, - processor: SpanProcessor = None, + processor: Optional[SpanProcessor] = None, + processors: Optional[List[SpanProcessor]] = None, propagator: TextMapPropagator = None, exporter: SpanExporter = None, sampler: Optional[Sampler] = None, @@ -85,10 +86,44 @@ def __new__( obj.__image_uploader = image_uploader obj.__resource = Resource(attributes=TracerWrapper.resource_attributes) obj.__tracer_provider = init_tracer_provider(resource=obj.__resource, sampler=sampler) - if processor: + + # Validate that only one processor parameter is provided + if processor is not None and processors is not None: + raise ValueError("Cannot specify both 'processor' and 'processors' parameters. Use only one.") + + # Handle multiple processors case + if processors is not None: + obj.__spans_processors = [] + obj.__traceloop_processor = None + obj.__spans_processor_original_on_start = [] + for proc in processors: + obj.__spans_processors.append(proc) + obj.__spans_processor_original_on_start.append(proc.on_start) + proc.on_start = obj._span_processor_on_start + + obj.__tracer_provider.add_span_processor(proc) + + # Find the Traceloop processor (if any) to apply special handling + # We'll identify it by checking if it was created by get_default_span_processor + # For now, we'll apply the special on_start to the first processor that looks like ours + for proc in processors: + if isinstance(proc, (SimpleSpanProcessor, BatchSpanProcessor)): + # This might be our default processor, apply special handling + proc.on_start = obj._span_processor_on_start + obj.__traceloop_processor = proc + break + + Telemetry().capture("tracer:init", {"processor": "multiple", "count": len(processors)}) + + # Handle single processor case (backward compatibility) + elif processor is not None: Telemetry().capture("tracer:init", {"processor": "custom"}) obj.__spans_processor: SpanProcessor = processor obj.__spans_processor_original_on_start = processor.on_start + obj.__spans_processor.on_start = obj._span_processor_on_start + obj.__tracer_provider.add_span_processor(obj.__spans_processor) + + # Handle default processor case else: if exporter: Telemetry().capture( @@ -107,22 +142,11 @@ def __new__( }, ) - obj.__spans_exporter: SpanExporter = ( - exporter - if exporter - else init_spans_exporter( - TracerWrapper.endpoint, TracerWrapper.headers - ) + obj.__spans_processor: SpanProcessor = get_default_span_processor( + disable_batch=disable_batch, + exporter=exporter ) - if disable_batch or is_notebook(): - obj.__spans_processor: SpanProcessor = SimpleSpanProcessor( - obj.__spans_exporter - ) - else: - obj.__spans_processor: SpanProcessor = BatchSpanProcessor( - obj.__spans_exporter - ) - obj.__spans_processor_original_on_start = None + if span_postprocess_callback: # Create a wrapper that calls both the custom and original methods original_on_end = obj.__spans_processor.on_end @@ -134,8 +158,8 @@ def wrapped_on_end(span): original_on_end(span) obj.__spans_processor.on_end = wrapped_on_end - obj.__spans_processor.on_start = obj._span_processor_on_start - obj.__tracer_provider.add_span_processor(obj.__spans_processor) + obj.__spans_processor.on_start = obj._span_processor_on_start + obj.__tracer_provider.add_span_processor(obj.__spans_processor) if propagator: set_global_textmap(propagator) @@ -224,7 +248,12 @@ def _span_processor_on_start(self, span, parent_context): # Call original on_start method if it exists in custom processor if self.__spans_processor_original_on_start: - self.__spans_processor_original_on_start(span, parent_context) + original = self.__spans_processor_original_on_start + if callable(original): + original(span, parent_context) + elif isinstance(original, list): + for func in original: + func(span, parent_context) @staticmethod def set_static_params( @@ -261,7 +290,11 @@ def set_disabled(cls, disabled: bool) -> None: cls.__disabled = disabled def flush(self): - self.__spans_processor.force_flush() + if hasattr(self, '_TracerWrapper__spans_processor'): + self.__spans_processor.force_flush() + elif hasattr(self, '_TracerWrapper__spans_processors'): + for processor in self.__spans_processors: + processor.force_flush() def get_tracer(self): return self.__tracer_provider.get_tracer(TRACER_NAME) @@ -334,6 +367,35 @@ def init_spans_exporter(api_endpoint: str, headers: Dict[str, str]) -> SpanExpor return GRPCExporter(endpoint=f"{api_endpoint}", headers=headers) +def get_default_span_processor( + disable_batch: bool = False, + api_endpoint: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + exporter: Optional[SpanExporter] = None +) -> SpanProcessor: + """ + Creates and returns the default Traceloop span processor. + + Args: + disable_batch: If True, uses SimpleSpanProcessor, otherwise BatchSpanProcessor + api_endpoint: The endpoint URL for the exporter (uses TracerWrapper.endpoint if None) + headers: Headers for the exporter (uses TracerWrapper.headers if None) + exporter: Custom exporter to use (creates default if None) + + Returns: + SpanProcessor: The default Traceloop span processor + """ + endpoint = api_endpoint or TracerWrapper.endpoint + request_headers = headers or TracerWrapper.headers + + spans_exporter = exporter or init_spans_exporter(endpoint, request_headers) + + if disable_batch or is_notebook(): + return SimpleSpanProcessor(spans_exporter) + else: + return BatchSpanProcessor(spans_exporter) + + def init_tracer_provider(resource: Resource, sampler: Optional[Sampler] = None) -> TracerProvider: provider: TracerProvider = None default_provider: TracerProvider = get_tracer_provider() From acddebc4564087b81231731def6cbc361202b1ff Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 12:24:12 +0300 Subject: [PATCH 02/12] tests --- packages/sample-app/sample_app/multiple_span_proccesors.py | 1 - packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sample-app/sample_app/multiple_span_proccesors.py b/packages/sample-app/sample_app/multiple_span_proccesors.py index 42d820bae9..f649c6de06 100644 --- a/packages/sample-app/sample_app/multiple_span_proccesors.py +++ b/packages/sample-app/sample_app/multiple_span_proccesors.py @@ -1,7 +1,6 @@ from opentelemetry.sdk.trace.export import ConsoleSpanExporter from opentelemetry.sdk.trace.export import SimpleSpanProcessor from traceloop.sdk import Traceloop -from traceloop.sdk.tracing.context_manager import get_tracer from traceloop.sdk.decorators import task from openai import OpenAI import os diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index d09685f796..d610f0a439 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -146,6 +146,7 @@ def __new__( disable_batch=disable_batch, exporter=exporter ) + obj.__spans_processor_original_on_start = None if span_postprocess_callback: # Create a wrapper that calls both the custom and original methods From 3664304c6ba65960c4cd9a06c659ee29a2d2452f Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 12:35:07 +0300 Subject: [PATCH 03/12] fixes + lint --- ...ccesors.py => multiple_span_processors.py} | 7 ++- .../traceloop-sdk/traceloop/sdk/__init__.py | 16 +++---- .../traceloop/sdk/tracing/tracing.py | 43 ++++++++----------- 3 files changed, 29 insertions(+), 37 deletions(-) rename packages/sample-app/sample_app/{multiple_span_proccesors.py => multiple_span_processors.py} (98%) diff --git a/packages/sample-app/sample_app/multiple_span_proccesors.py b/packages/sample-app/sample_app/multiple_span_processors.py similarity index 98% rename from packages/sample-app/sample_app/multiple_span_proccesors.py rename to packages/sample-app/sample_app/multiple_span_processors.py index f649c6de06..c0daa979a1 100644 --- a/packages/sample-app/sample_app/multiple_span_proccesors.py +++ b/packages/sample-app/sample_app/multiple_span_processors.py @@ -14,14 +14,13 @@ Traceloop.init(processors=[traceloop_processor, console_processor]) - @task(name="joke_creation", version=1) def create_joke(): completion = client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Tell me a joke about opentelemetry multiple span processors"}], ) - + result = completion.choices[0].message.content print(result) return result @@ -29,7 +28,7 @@ def create_joke(): def main(): create_joke() - + + if __name__ == "__main__": main() - diff --git a/packages/traceloop-sdk/traceloop/sdk/__init__.py b/packages/traceloop-sdk/traceloop/sdk/__init__.py index 95c3b3e2e7..992d4d4b27 100644 --- a/packages/traceloop-sdk/traceloop/sdk/__init__.py +++ b/packages/traceloop-sdk/traceloop/sdk/__init__.py @@ -219,36 +219,36 @@ def get_default_span_processor( ) -> SpanProcessor: """ Creates and returns the default Traceloop span processor. - + This function allows users to get the default Traceloop span processor to combine it with their custom processors when using the processors parameter. - + Args: disable_batch: If True, uses SimpleSpanProcessor, otherwise BatchSpanProcessor api_endpoint: The endpoint URL for the exporter (uses current config if None) headers: Headers for the exporter (uses current config if None) exporter: Custom exporter to use (creates default if None) - + Returns: SpanProcessor: The default Traceloop span processor - + Example: # Get the default processor and combine with custom one default_processor = Traceloop.get_default_span_processor() custom_processor = MyCustomSpanProcessor() - + Traceloop.init( processors=[default_processor, custom_processor] ) """ from traceloop.sdk.tracing.tracing import get_default_span_processor - if not headers: - if not api_key: + if headers is None: + if api_key is None: api_key = os.getenv("TRACELOOP_API_KEY") headers = { "Authorization": f"Bearer {api_key}", } - if not api_endpoint: + if api_endpoint is None: api_endpoint = os.getenv("TRACELOOP_BASE_URL") return get_default_span_processor(disable_batch, api_endpoint, headers, exporter) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index d610f0a439..a29cfd1909 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -86,35 +86,25 @@ def __new__( obj.__image_uploader = image_uploader obj.__resource = Resource(attributes=TracerWrapper.resource_attributes) obj.__tracer_provider = init_tracer_provider(resource=obj.__resource, sampler=sampler) - + # Validate that only one processor parameter is provided if processor is not None and processors is not None: raise ValueError("Cannot specify both 'processor' and 'processors' parameters. Use only one.") - + # Handle multiple processors case if processors is not None: obj.__spans_processors = [] - obj.__traceloop_processor = None obj.__spans_processor_original_on_start = [] for proc in processors: obj.__spans_processors.append(proc) obj.__spans_processor_original_on_start.append(proc.on_start) - proc.on_start = obj._span_processor_on_start - - obj.__tracer_provider.add_span_processor(proc) - - # Find the Traceloop processor (if any) to apply special handling - # We'll identify it by checking if it was created by get_default_span_processor - # For now, we'll apply the special on_start to the first processor that looks like ours - for proc in processors: - if isinstance(proc, (SimpleSpanProcessor, BatchSpanProcessor)): - # This might be our default processor, apply special handling + if hasattr(proc, "_traceloop_processor"): proc.on_start = obj._span_processor_on_start - obj.__traceloop_processor = proc - break - + + obj.__tracer_provider.add_span_processor(proc) + Telemetry().capture("tracer:init", {"processor": "multiple", "count": len(processors)}) - + # Handle single processor case (backward compatibility) elif processor is not None: Telemetry().capture("tracer:init", {"processor": "custom"}) @@ -122,7 +112,7 @@ def __new__( obj.__spans_processor_original_on_start = processor.on_start obj.__spans_processor.on_start = obj._span_processor_on_start obj.__tracer_provider.add_span_processor(obj.__spans_processor) - + # Handle default processor case else: if exporter: @@ -147,7 +137,7 @@ def __new__( exporter=exporter ) obj.__spans_processor_original_on_start = None - + if span_postprocess_callback: # Create a wrapper that calls both the custom and original methods original_on_end = obj.__spans_processor.on_end @@ -376,25 +366,28 @@ def get_default_span_processor( ) -> SpanProcessor: """ Creates and returns the default Traceloop span processor. - + Args: disable_batch: If True, uses SimpleSpanProcessor, otherwise BatchSpanProcessor api_endpoint: The endpoint URL for the exporter (uses TracerWrapper.endpoint if None) headers: Headers for the exporter (uses TracerWrapper.headers if None) exporter: Custom exporter to use (creates default if None) - + Returns: SpanProcessor: The default Traceloop span processor """ endpoint = api_endpoint or TracerWrapper.endpoint request_headers = headers or TracerWrapper.headers - + spans_exporter = exporter or init_spans_exporter(endpoint, request_headers) - + if disable_batch or is_notebook(): - return SimpleSpanProcessor(spans_exporter) + processor = SimpleSpanProcessor(spans_exporter) else: - return BatchSpanProcessor(spans_exporter) + processor = BatchSpanProcessor(spans_exporter) + + setattr(processor, "_traceloop_processor", True) + return processor def init_tracer_provider(resource: Resource, sampler: Optional[Sampler] = None) -> TracerProvider: From 28e2226b9e65c2f37d92eaa1e3c235f1e92ce221 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 14:29:57 +0300 Subject: [PATCH 04/12] fix tests --- .../sample_app/multiple_span_processors.py | 1 + packages/traceloop-sdk/tests/conftest.py | 60 ++++++++++ .../tests/test_sdk_initialization.py | 105 ++++++++++++++++++ .../traceloop/sdk/tracing/tracing.py | 31 +++--- 4 files changed, 182 insertions(+), 15 deletions(-) diff --git a/packages/sample-app/sample_app/multiple_span_processors.py b/packages/sample-app/sample_app/multiple_span_processors.py index c0daa979a1..589e1c74e8 100644 --- a/packages/sample-app/sample_app/multiple_span_processors.py +++ b/packages/sample-app/sample_app/multiple_span_processors.py @@ -9,6 +9,7 @@ traceloop_processor = Traceloop.get_default_span_processor(disable_batch=True) + console_processor = SimpleSpanProcessor(ConsoleSpanExporter()) Traceloop.init(processors=[traceloop_processor, console_processor]) diff --git a/packages/traceloop-sdk/tests/conftest.py b/packages/traceloop-sdk/tests/conftest.py index b339a85d1a..adc62335ab 100644 --- a/packages/traceloop-sdk/tests/conftest.py +++ b/packages/traceloop-sdk/tests/conftest.py @@ -155,3 +155,63 @@ def exporter_with_no_metrics(): if _trace_wrapper_instance: TracerWrapper.instance = _trace_wrapper_instance os.environ["TRACELOOP_METRICS_ENABLED"] = "true" + + +@pytest.fixture +def exporters_with_multiple_span_processors(): + # Clear singleton if existed + if hasattr(TracerWrapper, "instance"): + _trace_wrapper_instance = TracerWrapper.instance + del TracerWrapper.instance + + class CustomSpanProcessor(SimpleSpanProcessor): + def on_start(self, span, parent_context=None): + span.set_attribute("custom_processor", "enabled") + span.set_attribute("processor_type", "custom") + + class MetricsSpanProcessor(SimpleSpanProcessor): + def __init__(self, exporter): + super().__init__(exporter) + self.span_count = 0 + + def on_start(self, span, parent_context=None): + self.span_count += 1 + span.set_attribute("metrics_processor", "enabled") + span.set_attribute("span_count", self.span_count) + + # Create exporters for different processors + default_exporter = InMemorySpanExporter() + custom_exporter = InMemorySpanExporter() + metrics_exporter = InMemorySpanExporter() + + # Get the default Traceloop processor + default_processor = Traceloop.get_default_span_processor( + disable_batch=True, + exporter=default_exporter + ) + + # Create custom processors + custom_processor = CustomSpanProcessor(custom_exporter) + metrics_processor = MetricsSpanProcessor(metrics_exporter) + + # Initialize with multiple processors + processors = [default_processor, custom_processor, metrics_processor] + + Traceloop.init( + app_name="test_multiple_processors", + api_endpoint="http://localhost:4318", # Use local endpoint to avoid API key requirement + processors=processors, + disable_batch=True, + ) + + # Return all exporters so we can verify each processor worked + yield { + "default": default_exporter, + "custom": custom_exporter, + "metrics": metrics_exporter, + "processors": processors + } + + # Restore singleton if any + if _trace_wrapper_instance: + TracerWrapper.instance = _trace_wrapper_instance diff --git a/packages/traceloop-sdk/tests/test_sdk_initialization.py b/packages/traceloop-sdk/tests/test_sdk_initialization.py index f8bb26c542..3f1713fcbe 100644 --- a/packages/traceloop-sdk/tests/test_sdk_initialization.py +++ b/packages/traceloop-sdk/tests/test_sdk_initialization.py @@ -68,3 +68,108 @@ def run_workflow(): spans = exporter_with_no_metrics.get_finished_spans() workflow_span = spans[0] assert workflow_span + + +def test_multiple_span_processors(exporters_with_multiple_span_processors): + """Test that multiple span processors work correctly together.""" + from traceloop.sdk.decorators import workflow, task + + @task(name="test_task") + def test_task(): + return "task_result" + + @workflow(name="test_workflow") + def test_workflow(): + return test_task() + + # Run the workflow to generate spans + result = test_workflow() + assert result == "task_result" + + exporters = exporters_with_multiple_span_processors + + # Check that all processors received spans + default_spans = exporters["default"].get_finished_spans() + custom_spans = exporters["custom"].get_finished_spans() + metrics_spans = exporters["metrics"].get_finished_spans() + + # All processors should have received the spans + assert len(default_spans) == 2, "Default processor should have received spans" + assert len(custom_spans) == 2, "Custom processor should have received spans" + assert len(metrics_spans) == 2, "Metrics processor should have received spans" + + # Verify that the default processor (Traceloop) added its attributes + default_span = default_spans[0] + # The default processor should have Traceloop-specific attributes + assert hasattr(default_span, 'attributes') + + # Verify that custom processor added its attributes + custom_span = custom_spans[0] + assert custom_span.attributes.get("custom_processor") == "enabled" + assert custom_span.attributes.get("processor_type") == "custom" + + # Verify that metrics processor added its attributes + # Now that we fixed the double-call bug, the span_count should be correct + workflow_spans = [s for s in metrics_spans if "workflow" in s.name] + task_spans = [s for s in metrics_spans if "task" in s.name] + assert len(workflow_spans) == 1 + assert len(task_spans) == 1 + + # The workflow span should be processed first (span_count=1) + # The task span should be processed second (span_count=2) + workflow_span = workflow_spans[0] + task_span = task_spans[0] + + assert workflow_span.attributes.get("metrics_processor") == "enabled" + assert workflow_span.attributes.get("span_count") == 1 + + assert task_span.attributes.get("metrics_processor") == "enabled" + assert task_span.attributes.get("span_count") == 2 + + +def test_get_default_span_processor(): + """Test that get_default_span_processor returns a valid processor.""" + from traceloop.sdk import Traceloop + from opentelemetry.sdk.trace.export import SimpleSpanProcessor, BatchSpanProcessor + + # Test with batch disabled + processor = Traceloop.get_default_span_processor(disable_batch=True) + assert isinstance(processor, SimpleSpanProcessor) + assert hasattr(processor, "_traceloop_processor") + assert getattr(processor, "_traceloop_processor") is True + + # Test with batch enabled + processor = Traceloop.get_default_span_processor(disable_batch=False) + assert isinstance(processor, BatchSpanProcessor) + assert hasattr(processor, "_traceloop_processor") + assert getattr(processor, "_traceloop_processor") is True + + +def test_processors_parameter_validation(): + """Test that using both processor and processors parameters raises an error.""" + from traceloop.sdk import Traceloop + from traceloop.sdk.tracing.tracing import TracerWrapper + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + # Clear singleton if existed + if hasattr(TracerWrapper, "instance"): + _trace_wrapper_instance = TracerWrapper.instance + del TracerWrapper.instance + + try: + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + + # This should raise a ValueError + with pytest.raises(ValueError, match="Cannot specify both 'processor' and 'processors' parameters"): + Traceloop.init( + app_name="test_validation", + processor=processor, + processors=[processor], + disable_batch=True, + ) + finally: + # Restore singleton if any + if '_trace_wrapper_instance' in locals(): + TracerWrapper.instance = _trace_wrapper_instance diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index a29cfd1909..da541f9329 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -94,12 +94,16 @@ def __new__( # Handle multiple processors case if processors is not None: obj.__spans_processors = [] - obj.__spans_processor_original_on_start = [] for proc in processors: + original_on_start = proc.on_start + + def chained_on_start(span, parent_context=None, orig=original_on_start): + obj._span_processor_on_start(span, parent_context) + if orig: + orig(span, parent_context) + + proc.on_start = chained_on_start obj.__spans_processors.append(proc) - obj.__spans_processor_original_on_start.append(proc.on_start) - if hasattr(proc, "_traceloop_processor"): - proc.on_start = obj._span_processor_on_start obj.__tracer_provider.add_span_processor(proc) @@ -109,8 +113,14 @@ def __new__( elif processor is not None: Telemetry().capture("tracer:init", {"processor": "custom"}) obj.__spans_processor: SpanProcessor = processor - obj.__spans_processor_original_on_start = processor.on_start - obj.__spans_processor.on_start = obj._span_processor_on_start + original_on_start = obj.__spans_processor.on_start + + def chained_on_start(span, parent_context=None, orig=original_on_start): + obj._span_processor_on_start(span, parent_context) + if orig: + orig(span, parent_context) + + obj.__spans_processor.on_start = chained_on_start obj.__tracer_provider.add_span_processor(obj.__spans_processor) # Handle default processor case @@ -237,15 +247,6 @@ def _span_processor_on_start(self, span, parent_context): value, ) - # Call original on_start method if it exists in custom processor - if self.__spans_processor_original_on_start: - original = self.__spans_processor_original_on_start - if callable(original): - original(span, parent_context) - elif isinstance(original, list): - for func in original: - func(span, parent_context) - @staticmethod def set_static_params( resource_attributes: dict, From 3bb8969d2b0978731c4a99291effa1b98d8d40e0 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 15:07:08 +0300 Subject: [PATCH 05/12] refactor on_Start a bit --- .../traceloop/sdk/tracing/tracing.py | 68 ++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index da541f9329..6399ed6f91 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -21,7 +21,9 @@ SimpleSpanProcessor, BatchSpanProcessor, ) -from opentelemetry.trace import get_tracer_provider, ProxyTracerProvider +from opentelemetry.context import Context + +from opentelemetry.trace import get_tracer_provider, ProxyTracerProvider, Span from opentelemetry.context import get_value, attach, set_value from opentelemetry.instrumentation.threading import ThreadingInstrumentor @@ -103,6 +105,9 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): orig(span, parent_context) proc.on_start = chained_on_start + if hasattr(proc, "_traceloop_processor"): + proc.on_start = obj._span_processor_on_start + obj.__spans_processors.append(proc) obj.__tracer_provider.add_span_processor(proc) @@ -121,6 +126,8 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): orig(span, parent_context) obj.__spans_processor.on_start = chained_on_start + if hasattr(obj.__spans_processor, "_traceloop_processor"): + obj.__spans_processor.on_start = obj._span_processor_on_start obj.__tracer_provider.add_span_processor(obj.__spans_processor) # Handle default processor case @@ -146,7 +153,6 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): disable_batch=disable_batch, exporter=exporter ) - obj.__spans_processor_original_on_start = None if span_postprocess_callback: # Create a wrapper that calls both the custom and original methods @@ -358,6 +364,63 @@ def init_spans_exporter(api_endpoint: str, headers: Dict[str, str]) -> SpanExpor else: return GRPCExporter(endpoint=f"{api_endpoint}", headers=headers) +# Same as _span_processor_on_start but without the usage of self which comes from the sdk, good for standalone usage +def default_span_processor_on_start(span: Span, parent_context: Context | None = None): + workflow_name = get_value("workflow_name") + if workflow_name is not None: + span.set_attribute(SpanAttributes.TRACELOOP_WORKFLOW_NAME, str(workflow_name)) + + entity_path = get_value("entity_path") + if entity_path is not None: + span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_PATH, str(entity_path)) + + association_properties = get_value("association_properties") + if association_properties is not None and isinstance(association_properties, dict): + _set_association_properties_attributes(span, association_properties) + + if is_llm_span(span): + managed_prompt = get_value("managed_prompt") + if managed_prompt is not None: + span.set_attribute( + SpanAttributes.TRACELOOP_PROMPT_MANAGED, str(managed_prompt) + ) + + prompt_key = get_value("prompt_key") + if prompt_key is not None: + span.set_attribute(SpanAttributes.TRACELOOP_PROMPT_KEY, str(prompt_key)) + + prompt_version = get_value("prompt_version") + if prompt_version is not None: + span.set_attribute( + SpanAttributes.TRACELOOP_PROMPT_VERSION, str(prompt_version) + ) + + prompt_version_name = get_value("prompt_version_name") + if prompt_version_name is not None: + span.set_attribute( + SpanAttributes.TRACELOOP_PROMPT_VERSION_NAME, str(prompt_version_name) + ) + + prompt_version_hash = get_value("prompt_version_hash") + if prompt_version_hash is not None: + span.set_attribute( + SpanAttributes.TRACELOOP_PROMPT_VERSION_HASH, str(prompt_version_hash) + ) + + prompt_template = get_value("prompt_template") + if prompt_template is not None: + span.set_attribute( + SpanAttributes.TRACELOOP_PROMPT_TEMPLATE, str(prompt_template) + ) + + prompt_template_variables = get_value("prompt_template_variables") + if prompt_template_variables is not None and isinstance(prompt_template_variables, dict): + for key, value in prompt_template_variables.items(): + span.set_attribute( + f"{SpanAttributes.TRACELOOP_PROMPT_TEMPLATE_VARIABLES}.{key}", + value, + ) + def get_default_span_processor( disable_batch: bool = False, @@ -388,6 +451,7 @@ def get_default_span_processor( processor = BatchSpanProcessor(spans_exporter) setattr(processor, "_traceloop_processor", True) + processor.on_start = default_span_processor_on_start return processor From 8551147e8a38b2b624572ca18eb6168adf09091c Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 15:13:37 +0300 Subject: [PATCH 06/12] lint --- packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index 6399ed6f91..1e551ac178 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -364,8 +364,11 @@ def init_spans_exporter(api_endpoint: str, headers: Dict[str, str]) -> SpanExpor else: return GRPCExporter(endpoint=f"{api_endpoint}", headers=headers) -# Same as _span_processor_on_start but without the usage of self which comes from the sdk, good for standalone usage + def default_span_processor_on_start(span: Span, parent_context: Context | None = None): + """ + Same as _span_processor_on_start but without the usage of self which comes from the sdk, good for standalone usage. + """ workflow_name = get_value("workflow_name") if workflow_name is not None: span.set_attribute(SpanAttributes.TRACELOOP_WORKFLOW_NAME, str(workflow_name)) From ac436a3510cf0f405602dc5eaf50b70d6e3c2630 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 15:26:52 +0300 Subject: [PATCH 07/12] cleaner --- .../traceloop-sdk/traceloop/sdk/tracing/tracing.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index 1e551ac178..eb0624c3f4 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -98,16 +98,14 @@ def __new__( obj.__spans_processors = [] for proc in processors: original_on_start = proc.on_start + is_traceloop_processor = hasattr(proc, "_traceloop_processor") def chained_on_start(span, parent_context=None, orig=original_on_start): obj._span_processor_on_start(span, parent_context) - if orig: + if orig and not is_traceloop_processor: orig(span, parent_context) proc.on_start = chained_on_start - if hasattr(proc, "_traceloop_processor"): - proc.on_start = obj._span_processor_on_start - obj.__spans_processors.append(proc) obj.__tracer_provider.add_span_processor(proc) @@ -119,15 +117,15 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): Telemetry().capture("tracer:init", {"processor": "custom"}) obj.__spans_processor: SpanProcessor = processor original_on_start = obj.__spans_processor.on_start + is_traceloop_processor = hasattr(obj.__spans_processor, "_traceloop_processor") def chained_on_start(span, parent_context=None, orig=original_on_start): obj._span_processor_on_start(span, parent_context) - if orig: + if orig and not is_traceloop_processor: orig(span, parent_context) obj.__spans_processor.on_start = chained_on_start - if hasattr(obj.__spans_processor, "_traceloop_processor"): - obj.__spans_processor.on_start = obj._span_processor_on_start + obj.__tracer_provider.add_span_processor(obj.__spans_processor) # Handle default processor case From 2144fb47f6c4fb675b84a40a5a7c15a7e94f92fb Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 15:37:47 +0300 Subject: [PATCH 08/12] closure bug --- packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index eb0624c3f4..205d49f941 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -100,9 +100,9 @@ def __new__( original_on_start = proc.on_start is_traceloop_processor = hasattr(proc, "_traceloop_processor") - def chained_on_start(span, parent_context=None, orig=original_on_start): + def chained_on_start(span, parent_context=None, orig=original_on_start, is_traceloop=is_traceloop_processor): obj._span_processor_on_start(span, parent_context) - if orig and not is_traceloop_processor: + if orig and not is_traceloop: orig(span, parent_context) proc.on_start = chained_on_start @@ -119,9 +119,9 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): original_on_start = obj.__spans_processor.on_start is_traceloop_processor = hasattr(obj.__spans_processor, "_traceloop_processor") - def chained_on_start(span, parent_context=None, orig=original_on_start): + def chained_on_start(span, parent_context=None, orig=original_on_start, is_traceloop=is_traceloop_processor): obj._span_processor_on_start(span, parent_context) - if orig and not is_traceloop_processor: + if orig and not is_traceloop: orig(span, parent_context) obj.__spans_processor.on_start = chained_on_start From 6df824278c2d4892d81186a1a675ef21c10974d6 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Thu, 31 Jul 2025 15:52:03 +0300 Subject: [PATCH 09/12] simplier --- .../traceloop-sdk/traceloop/sdk/tracing/tracing.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index 205d49f941..3c390d6cb0 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -98,12 +98,11 @@ def __new__( obj.__spans_processors = [] for proc in processors: original_on_start = proc.on_start - is_traceloop_processor = hasattr(proc, "_traceloop_processor") - def chained_on_start(span, parent_context=None, orig=original_on_start, is_traceloop=is_traceloop_processor): - obj._span_processor_on_start(span, parent_context) - if orig and not is_traceloop: + def chained_on_start(span, parent_context=None, orig=original_on_start): + if orig: orig(span, parent_context) + obj._span_processor_on_start(span, parent_context) proc.on_start = chained_on_start obj.__spans_processors.append(proc) @@ -117,11 +116,10 @@ def chained_on_start(span, parent_context=None, orig=original_on_start, is_trace Telemetry().capture("tracer:init", {"processor": "custom"}) obj.__spans_processor: SpanProcessor = processor original_on_start = obj.__spans_processor.on_start - is_traceloop_processor = hasattr(obj.__spans_processor, "_traceloop_processor") - def chained_on_start(span, parent_context=None, orig=original_on_start, is_traceloop=is_traceloop_processor): + def chained_on_start(span, parent_context=None, orig=original_on_start): obj._span_processor_on_start(span, parent_context) - if orig and not is_traceloop: + if orig: orig(span, parent_context) obj.__spans_processor.on_start = chained_on_start From 360c57e21891fdedaa3ecd7594a12602a0b5ce33 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Sun, 3 Aug 2025 15:13:23 +0300 Subject: [PATCH 10/12] remove dups --- .../traceloop-sdk/tests/test_workflows.py | 4 +- .../traceloop/sdk/tracing/tracing.py | 55 +------------------ 2 files changed, 5 insertions(+), 54 deletions(-) diff --git a/packages/traceloop-sdk/tests/test_workflows.py b/packages/traceloop-sdk/tests/test_workflows.py index c8d45967ba..1255cda3db 100644 --- a/packages/traceloop-sdk/tests/test_workflows.py +++ b/packages/traceloop-sdk/tests/test_workflows.py @@ -49,7 +49,7 @@ def joke_workflow(): assert open_ai_span.attributes.get("traceloop.prompt.template") == "Tell me a {what} about {subject}" assert open_ai_span.attributes.get("traceloop.prompt.template_variables.what") == "joke" assert open_ai_span.attributes.get("traceloop.prompt.template_variables.subject") == "OpenTelemetry" - assert open_ai_span.attributes.get("traceloop.prompt.version") == 5 + assert open_ai_span.attributes.get("traceloop.prompt.version") == "5" workflow_span = next(span for span in spans if span.name == "pirate_joke_generator.workflow") task_span = next(span for span in spans if span.name == "something_creator.task") @@ -95,7 +95,7 @@ async def joke_workflow(): assert open_ai_span.attributes.get("traceloop.prompt.template") == "Tell me a {what} about {subject}" assert open_ai_span.attributes.get("traceloop.prompt.template_variables.what") == "joke" assert open_ai_span.attributes.get("traceloop.prompt.template_variables.subject") == "OpenTelemetry" - assert open_ai_span.attributes.get("traceloop.prompt.version") == 5 + assert open_ai_span.attributes.get("traceloop.prompt.version") == "5" workflow_span = next(span for span in spans if span.name == "pirate_joke_generator.workflow") task_span = next(span for span in spans if span.name == "something_creator.task") diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index 3c390d6cb0..5cab0d2e27 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -188,67 +188,18 @@ def exit_handler(self): self.flush() def _span_processor_on_start(self, span, parent_context): - workflow_name = get_value("workflow_name") - if workflow_name is not None: - span.set_attribute(SpanAttributes.TRACELOOP_WORKFLOW_NAME, workflow_name) - - entity_path = get_value("entity_path") - if entity_path is not None: - span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_PATH, entity_path) + default_span_processor_on_start(span, parent_context) + # TODO: this is here only because we need self to be able to access the content allow list + # we should refactor this to not need self association_properties = get_value("association_properties") if association_properties is not None: - _set_association_properties_attributes(span, association_properties) - if not self.enable_content_tracing: if self.__content_allow_list.is_allowed(association_properties): attach(set_value("override_enable_content_tracing", True)) else: attach(set_value("override_enable_content_tracing", False)) - if is_llm_span(span): - managed_prompt = get_value("managed_prompt") - if managed_prompt is not None: - span.set_attribute( - SpanAttributes.TRACELOOP_PROMPT_MANAGED, managed_prompt - ) - - prompt_key = get_value("prompt_key") - if prompt_key is not None: - span.set_attribute(SpanAttributes.TRACELOOP_PROMPT_KEY, prompt_key) - - prompt_version = get_value("prompt_version") - if prompt_version is not None: - span.set_attribute( - SpanAttributes.TRACELOOP_PROMPT_VERSION, prompt_version - ) - - prompt_version_name = get_value("prompt_version_name") - if prompt_version_name is not None: - span.set_attribute( - SpanAttributes.TRACELOOP_PROMPT_VERSION_NAME, prompt_version_name - ) - - prompt_version_hash = get_value("prompt_version_hash") - if prompt_version_hash is not None: - span.set_attribute( - SpanAttributes.TRACELOOP_PROMPT_VERSION_HASH, prompt_version_hash - ) - - prompt_template = get_value("prompt_template") - if prompt_template is not None: - span.set_attribute( - SpanAttributes.TRACELOOP_PROMPT_TEMPLATE, prompt_template - ) - - prompt_template_variables = get_value("prompt_template_variables") - if prompt_template_variables is not None: - for key, value in prompt_template_variables.items(): - span.set_attribute( - f"{SpanAttributes.TRACELOOP_PROMPT_TEMPLATE_VARIABLES}.{key}", - value, - ) - @staticmethod def set_static_params( resource_attributes: dict, From 5ba3d5a53d7dff9b070d8ceaa74e1d1cb6024d4c Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Sun, 3 Aug 2025 16:53:11 +0300 Subject: [PATCH 11/12] pythonic init --- .../traceloop-sdk/traceloop/sdk/__init__.py | 6 ++---- .../traceloop/sdk/tracing/tracing.py | 17 ++++++----------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/packages/traceloop-sdk/traceloop/sdk/__init__.py b/packages/traceloop-sdk/traceloop/sdk/__init__.py index 992d4d4b27..22e26181c2 100644 --- a/packages/traceloop-sdk/traceloop/sdk/__init__.py +++ b/packages/traceloop-sdk/traceloop/sdk/__init__.py @@ -2,7 +2,7 @@ import sys from pathlib import Path -from typing import Callable, List, Optional, Set +from typing import Callable, List, Optional, Set, Union from colorama import Fore from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan from opentelemetry.sdk.trace.sampling import Sampler @@ -59,8 +59,7 @@ def init( metrics_headers: Dict[str, str] = None, logging_exporter: LogExporter = None, logging_headers: Dict[str, str] = None, - processor: Optional[SpanProcessor] = None, - processors: Optional[List[SpanProcessor]] = None, + processor: Optional[Union[SpanProcessor, List[SpanProcessor]]] = None, propagator: TextMapPropagator = None, sampler: Optional[Sampler] = None, traceloop_sync_enabled: bool = False, @@ -145,7 +144,6 @@ def init( Traceloop.__tracer_wrapper = TracerWrapper( disable_batch=disable_batch, processor=processor, - processors=processors, propagator=propagator, exporter=exporter, sampler=sampler, diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index 5cab0d2e27..d27cca029e 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -34,7 +34,7 @@ from traceloop.sdk.tracing.content_allow_list import ContentAllowList from traceloop.sdk.utils import is_notebook from traceloop.sdk.utils.package_check import is_package_installed -from typing import Callable, Dict, List, Optional, Set +from typing import Callable, Dict, List, Optional, Set, Union TRACER_NAME = "traceloop.tracer" @@ -69,8 +69,7 @@ class TracerWrapper(object): def __new__( cls, disable_batch=False, - processor: Optional[SpanProcessor] = None, - processors: Optional[List[SpanProcessor]] = None, + processor: Optional[Union[SpanProcessor, List[SpanProcessor]]] = None, propagator: TextMapPropagator = None, exporter: SpanExporter = None, sampler: Optional[Sampler] = None, @@ -89,14 +88,10 @@ def __new__( obj.__resource = Resource(attributes=TracerWrapper.resource_attributes) obj.__tracer_provider = init_tracer_provider(resource=obj.__resource, sampler=sampler) - # Validate that only one processor parameter is provided - if processor is not None and processors is not None: - raise ValueError("Cannot specify both 'processor' and 'processors' parameters. Use only one.") - # Handle multiple processors case - if processors is not None: + if processor is not None and isinstance(processor, list): obj.__spans_processors = [] - for proc in processors: + for proc in processor: original_on_start = proc.on_start def chained_on_start(span, parent_context=None, orig=original_on_start): @@ -109,7 +104,7 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): obj.__tracer_provider.add_span_processor(proc) - Telemetry().capture("tracer:init", {"processor": "multiple", "count": len(processors)}) + Telemetry().capture("tracer:init", {"processor": "multiple", "count": len(processor)}) # Handle single processor case (backward compatibility) elif processor is not None: @@ -145,7 +140,7 @@ def chained_on_start(span, parent_context=None, orig=original_on_start): }, ) - obj.__spans_processor: SpanProcessor = get_default_span_processor( + obj.__spans_processor = get_default_span_processor( disable_batch=disable_batch, exporter=exporter ) From 4b5bd9f3d64161795938a3b97df7be52cce7cd05 Mon Sep 17 00:00:00 2001 From: doronkopit5 Date: Sun, 3 Aug 2025 17:00:12 +0300 Subject: [PATCH 12/12] fix tests --- .../sample_app/multiple_span_processors.py | 2 +- packages/traceloop-sdk/tests/conftest.py | 4 +-- .../tests/test_sdk_initialization.py | 30 ------------------- 3 files changed, 3 insertions(+), 33 deletions(-) diff --git a/packages/sample-app/sample_app/multiple_span_processors.py b/packages/sample-app/sample_app/multiple_span_processors.py index 589e1c74e8..25646d801e 100644 --- a/packages/sample-app/sample_app/multiple_span_processors.py +++ b/packages/sample-app/sample_app/multiple_span_processors.py @@ -12,7 +12,7 @@ console_processor = SimpleSpanProcessor(ConsoleSpanExporter()) -Traceloop.init(processors=[traceloop_processor, console_processor]) +Traceloop.init(processor=[traceloop_processor, console_processor]) @task(name="joke_creation", version=1) diff --git a/packages/traceloop-sdk/tests/conftest.py b/packages/traceloop-sdk/tests/conftest.py index adc62335ab..572c3e5e74 100644 --- a/packages/traceloop-sdk/tests/conftest.py +++ b/packages/traceloop-sdk/tests/conftest.py @@ -200,7 +200,7 @@ def on_start(self, span, parent_context=None): Traceloop.init( app_name="test_multiple_processors", api_endpoint="http://localhost:4318", # Use local endpoint to avoid API key requirement - processors=processors, + processor=processors, disable_batch=True, ) @@ -209,7 +209,7 @@ def on_start(self, span, parent_context=None): "default": default_exporter, "custom": custom_exporter, "metrics": metrics_exporter, - "processors": processors + "processor": processors } # Restore singleton if any diff --git a/packages/traceloop-sdk/tests/test_sdk_initialization.py b/packages/traceloop-sdk/tests/test_sdk_initialization.py index 3f1713fcbe..5c1dcd30e0 100644 --- a/packages/traceloop-sdk/tests/test_sdk_initialization.py +++ b/packages/traceloop-sdk/tests/test_sdk_initialization.py @@ -143,33 +143,3 @@ def test_get_default_span_processor(): assert isinstance(processor, BatchSpanProcessor) assert hasattr(processor, "_traceloop_processor") assert getattr(processor, "_traceloop_processor") is True - - -def test_processors_parameter_validation(): - """Test that using both processor and processors parameters raises an error.""" - from traceloop.sdk import Traceloop - from traceloop.sdk.tracing.tracing import TracerWrapper - from opentelemetry.sdk.trace.export import SimpleSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - - # Clear singleton if existed - if hasattr(TracerWrapper, "instance"): - _trace_wrapper_instance = TracerWrapper.instance - del TracerWrapper.instance - - try: - exporter = InMemorySpanExporter() - processor = SimpleSpanProcessor(exporter) - - # This should raise a ValueError - with pytest.raises(ValueError, match="Cannot specify both 'processor' and 'processors' parameters"): - Traceloop.init( - app_name="test_validation", - processor=processor, - processors=[processor], - disable_batch=True, - ) - finally: - # Restore singleton if any - if '_trace_wrapper_instance' in locals(): - TracerWrapper.instance = _trace_wrapper_instance