From edee89d6b8ab2664a7f0cfaaed13a9c8ef727fab Mon Sep 17 00:00:00 2001 From: igorek Date: Thu, 10 Jul 2025 23:35:49 +0700 Subject: [PATCH 1/4] feat(integrations): Add tracing to DramatiqIntegration Adds tracing support to DramatiqIntegration #3454 --- sentry_sdk/integrations/dramatiq.py | 59 +++++++++++++++++++---------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py index a756b4c669..b978044ee2 100644 --- a/sentry_sdk/integrations/dramatiq.py +++ b/sentry_sdk/integrations/dramatiq.py @@ -1,8 +1,11 @@ import json +import contextvars import sentry_sdk +from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration from sentry_sdk.integrations._wsgi_common import request_body_within_bounds +from sentry_sdk.tracing import TransactionSource from sentry_sdk.utils import ( AnnotatedValue, capture_internal_exceptions, @@ -18,6 +21,7 @@ if TYPE_CHECKING: from typing import Any, Callable, Dict, Optional, Union + from sentry_sdk.tracing import Transaction from sentry_sdk._types import Event, Hint @@ -85,20 +89,27 @@ class SentryMiddleware(Middleware): # type: ignore[misc] DramatiqIntegration. """ + # type: contextvars.ContextVar[Transaction] + _transaction = contextvars.ContextVar("_transaction", default=None) + def before_process_message(self, broker, message): # type: (Broker, Message) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return - message._scope_manager = sentry_sdk.new_scope() - message._scope_manager.__enter__() - scope = sentry_sdk.get_current_scope() - scope.set_transaction_name(message.actor_name) scope.set_extra("dramatiq_message_id", message.message_id) scope.add_event_processor(_make_message_event_processor(message, integration)) + transaction = sentry_sdk.start_transaction( + name=message.actor_name, + op=OP.QUEUE_PROCESS, + source=TransactionSource.TASK, + ) + transaction.__enter__() + self._transaction.set(transaction) + def after_process_message(self, broker, message, *, result=None, exception=None): # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) @@ -108,23 +119,29 @@ def after_process_message(self, broker, message, *, result=None, exception=None) actor = broker.get_actor(message.actor_name) throws = message.options.get("throws") or actor.options.get("throws") - try: - if ( - exception is not None - and not (throws and isinstance(exception, throws)) - and not isinstance(exception, Retry) - ): - event, hint = event_from_exception( - exception, - client_options=sentry_sdk.get_client().options, - mechanism={ - "type": DramatiqIntegration.identifier, - "handled": False, - }, - ) - sentry_sdk.capture_event(event, hint=hint) - finally: - message._scope_manager.__exit__(None, None, None) + transaction = self._transaction.get() + + is_event_capture_required = ( + exception is not None + and not (throws and isinstance(exception, throws)) + and not isinstance(exception, Retry) + ) + if not is_event_capture_required: + # normal transaction finish + transaction.__exit__(None, None, None) + return + + event, hint = event_from_exception( + exception, + client_options=sentry_sdk.get_client().options, + mechanism={ + "type": DramatiqIntegration.identifier, + "handled": False, + }, + ) + sentry_sdk.capture_event(event, hint=hint) + # transaction error + transaction.__exit__(type(exception), exception, None) def _make_message_event_processor(message, integration): From 5562a9c4bb2eb34197e2930ac84b80be64106460 Mon Sep 17 00:00:00 2001 From: igorek Date: Wed, 16 Jul 2025 02:12:39 +0700 Subject: [PATCH 2/4] feat(integrations): Add trace propagation to DramatiqIntegration - add trace propagation - set dramatiq_task_id as tag instead of extra - new tests - fix mypy issues Issue: #3454 --- sentry_sdk/consts.py | 1 + sentry_sdk/integrations/dramatiq.py | 65 ++++++++++++------ tests/integrations/dramatiq/test_dramatiq.py | 71 +++++++++++++++++--- 3 files changed, 105 insertions(+), 32 deletions(-) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 01f72e2887..34f6d2e100 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -695,6 +695,7 @@ class OP: QUEUE_TASK_HUEY = "queue.task.huey" QUEUE_SUBMIT_RAY = "queue.submit.ray" QUEUE_TASK_RAY = "queue.task.ray" + QUEUE_TASK_DRAMATIQ = "queue.task.dramatiq" SUBPROCESS = "subprocess" SUBPROCESS_WAIT = "subprocess.wait" SUBPROCESS_COMMUNICATE = "subprocess.communicate" diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py index b978044ee2..c62dd44c7f 100644 --- a/sentry_sdk/integrations/dramatiq.py +++ b/sentry_sdk/integrations/dramatiq.py @@ -2,20 +2,28 @@ import contextvars import sentry_sdk -from sentry_sdk.consts import OP -from sentry_sdk.integrations import Integration +from sentry_sdk.consts import OP, SPANSTATUS +from sentry_sdk.api import continue_trace, get_baggage, get_traceparent +from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.integrations._wsgi_common import request_body_within_bounds -from sentry_sdk.tracing import TransactionSource +from sentry_sdk.tracing import ( + BAGGAGE_HEADER_NAME, + SENTRY_TRACE_HEADER_NAME, + TransactionSource, +) from sentry_sdk.utils import ( AnnotatedValue, capture_internal_exceptions, event_from_exception, ) -from dramatiq.broker import Broker # type: ignore -from dramatiq.message import Message # type: ignore -from dramatiq.middleware import Middleware, default_middleware # type: ignore -from dramatiq.errors import Retry # type: ignore +try: + from dramatiq.broker import Broker + from dramatiq.message import Message, R + from dramatiq.middleware import Middleware, default_middleware + from dramatiq.errors import Retry +except ImportError: + raise DidNotEnable("Dramatiq is not installed") from typing import TYPE_CHECKING @@ -77,10 +85,10 @@ def sentry_patched_broker__init__(self, *args, **kw): kw["middleware"] = middleware original_broker__init__(self, *args, **kw) - Broker.__init__ = sentry_patched_broker__init__ + Broker.__init__ = sentry_patched_broker__init__ # type: ignore[method-assign] -class SentryMiddleware(Middleware): # type: ignore[misc] +class SentryMiddleware(Middleware): """ A Dramatiq middleware that automatically captures and sends exceptions to Sentry. @@ -89,20 +97,38 @@ class SentryMiddleware(Middleware): # type: ignore[misc] DramatiqIntegration. """ - # type: contextvars.ContextVar[Transaction] - _transaction = contextvars.ContextVar("_transaction", default=None) + _transaction = contextvars.ContextVar( + "_transaction" + ) # type: contextvars.ContextVar[Transaction] + + def before_enqueue(self, broker, message, delay): + # type: (Broker, Message[R], int) -> None + message.options["sentry_headers"] = { + BAGGAGE_HEADER_NAME: get_baggage(), + SENTRY_TRACE_HEADER_NAME: get_traceparent(), + } def before_process_message(self, broker, message): - # type: (Broker, Message) -> None + # type: (Broker, Message[R]) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return scope = sentry_sdk.get_current_scope() - scope.set_extra("dramatiq_message_id", message.message_id) + scope.set_tag("dramatiq_message_id", message.message_id) + scope.clear_breadcrumbs() scope.add_event_processor(_make_message_event_processor(message, integration)) - transaction = sentry_sdk.start_transaction( + transaction = continue_trace( + message.options.get("sentry_headers") or {}, + name=message.actor_name, + op=OP.QUEUE_TASK_DRAMATIQ, + source=TransactionSource.TASK, + # origin=DramatiqIntegration.origin, + ) + transaction.set_status(SPANSTATUS.OK) + sentry_sdk.start_transaction( + transaction, name=message.actor_name, op=OP.QUEUE_PROCESS, source=TransactionSource.TASK, @@ -111,10 +137,7 @@ def before_process_message(self, broker, message): self._transaction.set(transaction) def after_process_message(self, broker, message, *, result=None, exception=None): - # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None - integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) - if integration is None: - return + # type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None actor = broker.get_actor(message.actor_name) throws = message.options.get("throws") or actor.options.get("throws") @@ -132,7 +155,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None) return event, hint = event_from_exception( - exception, + exception, # type: ignore[arg-type] client_options=sentry_sdk.get_client().options, mechanism={ "type": DramatiqIntegration.identifier, @@ -145,7 +168,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None) def _make_message_event_processor(message, integration): - # type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] + # type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] def inner(event, hint): # type: (Event, Hint) -> Optional[Event] @@ -159,7 +182,7 @@ def inner(event, hint): class DramatiqMessageExtractor: def __init__(self, message): - # type: (Message) -> None + # type: (Message[R]) -> None self.message_data = dict(message.asdict()) def content_length(self): diff --git a/tests/integrations/dramatiq/test_dramatiq.py b/tests/integrations/dramatiq/test_dramatiq.py index d7917cbd00..fdbf64a18e 100644 --- a/tests/integrations/dramatiq/test_dramatiq.py +++ b/tests/integrations/dramatiq/test_dramatiq.py @@ -5,12 +5,21 @@ from dramatiq.brokers.stub import StubBroker import sentry_sdk +from sentry_sdk.tracing import TransactionSource +from sentry_sdk import start_transaction +from sentry_sdk.consts import SPANSTATUS from sentry_sdk.integrations.dramatiq import DramatiqIntegration +# from sentry_sdk.integrations.logging import LoggingIntegration -@pytest.fixture -def broker(sentry_init): - sentry_init(integrations=[DramatiqIntegration()]) + +@pytest.fixture(scope="function") +def broker(request, sentry_init): + sentry_init( + integrations=[DramatiqIntegration()], + traces_sample_rate=getattr(request, "param", None), + # disabled_integrations=[LoggingIntegration()], + ) broker = StubBroker() broker.emit_after("process_boot") dramatiq.set_broker(broker) @@ -44,22 +53,61 @@ def dummy_actor(x, y): assert exception["type"] == "ZeroDivisionError" -def test_that_actor_name_is_set_as_transaction(broker, worker, capture_events): +@pytest.mark.parametrize( + "broker,expected_span_status", + [ + (1.0, SPANSTATUS.INTERNAL_ERROR), + (1.0, SPANSTATUS.OK), + ], + ids=["error", "success"], + indirect=["broker"], +) +def test_task_transaction(broker, worker, capture_events, expected_span_status): events = capture_events() + task_fails = expected_span_status == SPANSTATUS.INTERNAL_ERROR @dramatiq.actor(max_retries=0) def dummy_actor(x, y): return x / y - dummy_actor.send(1, 0) + dummy_actor.send(1, int(not task_fails)) broker.join(dummy_actor.queue_name) worker.join() + if task_fails: + error_event = events.pop(0) + exception = error_event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + # todo: failed assert. Logging instead of dramatiq + # assert exception["mechanism"]["type"] == DramatiqIntegration.identifier + (event,) = events + assert event["type"] == "transaction" assert event["transaction"] == "dummy_actor" + assert event["transaction_info"] == {"source": TransactionSource.TASK} + assert event["contexts"]["trace"]["status"] == expected_span_status -def test_that_dramatiq_message_id_is_set_as_extra(broker, worker, capture_events): +@pytest.mark.parametrize("broker", [1.0], indirect=True) +def test_dramatiq_propagate_trace(broker, worker, capture_events): + events = capture_events() + + @dramatiq.actor(max_retries=0) + def propagated_trace_task(): + pass + + with start_transaction() as outer_transaction: + propagated_trace_task.send() + broker.join(propagated_trace_task.queue_name) + worker.join() + + assert ( + events[0]["transaction"] == "propagated_trace_task" + ) # the "inner" transaction + assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id + + +def test_that_dramatiq_message_id_is_set_as_tag(broker, worker, capture_events): events = capture_events() @dramatiq.actor(max_retries=0) @@ -72,13 +120,14 @@ def dummy_actor(x, y): worker.join() event_message, event_error = events - assert "dramatiq_message_id" in event_message["extra"] - assert "dramatiq_message_id" in event_error["extra"] + + assert "dramatiq_message_id" in event_message["tags"] + assert "dramatiq_message_id" in event_error["tags"] assert ( - event_message["extra"]["dramatiq_message_id"] - == event_error["extra"]["dramatiq_message_id"] + event_message["tags"]["dramatiq_message_id"] + == event_error["tags"]["dramatiq_message_id"] ) - msg_ids = [e["extra"]["dramatiq_message_id"] for e in events] + msg_ids = [e["tags"]["dramatiq_message_id"] for e in events] assert all(uuid.UUID(msg_id) and isinstance(msg_id, str) for msg_id in msg_ids) From 7156ac4a8e0ff239a87b26e36f3b2651b99a555b Mon Sep 17 00:00:00 2001 From: igorek Date: Mon, 15 Sep 2025 18:56:52 +0700 Subject: [PATCH 3/4] feat(integration): Fix dramatiq py3.6 and linters issues (#3454) - import ContextVar from utils for python3.6 support - do not import type "R" from dramatiq package because it was introduced only in newer version of dramatiq - fix mypy settings to ignore missing imports - fix tox.ini to include aiocontextvars for py3.6 --- pyproject.toml | 4 ++++ scripts/populate_tox/config.py | 3 +++ sentry_sdk/integrations/dramatiq.py | 30 +++++++++++++++++++---------- tox.ini | 1 + 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 44eded7641..b38e5a0ddb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -195,6 +195,10 @@ ignore_missing_imports = true module = "agents.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "dramatiq.*" +ignore_missing_imports = true + # # Tool: Flake8 # diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index bc20d531b3..3746364c55 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -100,6 +100,9 @@ }, "dramatiq": { "package": "dramatiq", + "deps": { + "py3.6": ["aiocontextvars"], + }, }, "falcon": { "package": "falcon", diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py index c62dd44c7f..73089d9ced 100644 --- a/sentry_sdk/integrations/dramatiq.py +++ b/sentry_sdk/integrations/dramatiq.py @@ -1,5 +1,4 @@ import json -import contextvars import sentry_sdk from sentry_sdk.consts import OP, SPANSTATUS @@ -15,13 +14,19 @@ AnnotatedValue, capture_internal_exceptions, event_from_exception, + ContextVar, + HAS_REAL_CONTEXTVARS, + CONTEXTVARS_ERROR_MESSAGE, ) +from typing import TypeVar + +R = TypeVar("R") try: from dramatiq.broker import Broker - from dramatiq.message import Message, R from dramatiq.middleware import Middleware, default_middleware from dramatiq.errors import Retry + from dramatiq.message import Message except ImportError: raise DidNotEnable("Dramatiq is not installed") @@ -29,7 +34,6 @@ if TYPE_CHECKING: from typing import Any, Callable, Dict, Optional, Union - from sentry_sdk.tracing import Transaction from sentry_sdk._types import Event, Hint @@ -50,6 +54,12 @@ class DramatiqIntegration(Integration): @staticmethod def setup_once(): # type: () -> None + if not HAS_REAL_CONTEXTVARS: + raise DidNotEnable( + "The dramatiq integration for Sentry requires Python 3.7+ " + " or aiocontextvars package." + CONTEXTVARS_ERROR_MESSAGE + ) + _patch_dramatiq_broker() @@ -85,10 +95,10 @@ def sentry_patched_broker__init__(self, *args, **kw): kw["middleware"] = middleware original_broker__init__(self, *args, **kw) - Broker.__init__ = sentry_patched_broker__init__ # type: ignore[method-assign] + Broker.__init__ = sentry_patched_broker__init__ -class SentryMiddleware(Middleware): +class SentryMiddleware(Middleware): # type: ignore[misc] """ A Dramatiq middleware that automatically captures and sends exceptions to Sentry. @@ -97,9 +107,7 @@ class SentryMiddleware(Middleware): DramatiqIntegration. """ - _transaction = contextvars.ContextVar( - "_transaction" - ) # type: contextvars.ContextVar[Transaction] + _transaction = ContextVar("_transaction") def before_enqueue(self, broker, message, delay): # type: (Broker, Message[R], int) -> None @@ -130,7 +138,7 @@ def before_process_message(self, broker, message): sentry_sdk.start_transaction( transaction, name=message.actor_name, - op=OP.QUEUE_PROCESS, + op=OP.QUEUE_TASK_DRAMATIQ, source=TransactionSource.TASK, ) transaction.__enter__() @@ -142,7 +150,9 @@ def after_process_message(self, broker, message, *, result=None, exception=None) actor = broker.get_actor(message.actor_name) throws = message.options.get("throws") or actor.options.get("throws") - transaction = self._transaction.get() + transaction = self._transaction.get(None) + if not transaction: + return None is_event_capture_required = ( exception is not None diff --git a/tox.ini b/tox.ini index 39ef4785b3..e359e11ad6 100644 --- a/tox.ini +++ b/tox.ini @@ -647,6 +647,7 @@ deps = dramatiq-v1.12.3: dramatiq==1.12.3 dramatiq-v1.15.0: dramatiq==1.15.0 dramatiq-v1.18.0: dramatiq==1.18.0 + {py3.6}-dramatiq: aiocontextvars huey-v2.1.3: huey==2.1.3 huey-v2.2.0: huey==2.2.0 From 39e3a506a3c0300d26ac5bbbd19c4ec913386810 Mon Sep 17 00:00:00 2001 From: igorek Date: Tue, 16 Sep 2025 14:29:56 +0700 Subject: [PATCH 4/4] feat(integrations): Start new trace on retrying - Start new trace in case of retrying task. Such decision makes life easier to investigate failed transactions in complicated traces. - also fix logging issue in tests Issue: getsentry#3454 --- sentry_sdk/integrations/dramatiq.py | 7 ++++++- tests/integrations/dramatiq/test_dramatiq.py | 7 +++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py index 73089d9ced..6ad96337f3 100644 --- a/sentry_sdk/integrations/dramatiq.py +++ b/sentry_sdk/integrations/dramatiq.py @@ -127,8 +127,13 @@ def before_process_message(self, broker, message): scope.clear_breadcrumbs() scope.add_event_processor(_make_message_event_processor(message, integration)) + sentry_headers = message.options.get("sentry_headers") or {} + if "retries" in message.options: + # start new trace in case of retrying + sentry_headers = {} + transaction = continue_trace( - message.options.get("sentry_headers") or {}, + sentry_headers, name=message.actor_name, op=OP.QUEUE_TASK_DRAMATIQ, source=TransactionSource.TASK, diff --git a/tests/integrations/dramatiq/test_dramatiq.py b/tests/integrations/dramatiq/test_dramatiq.py index fdbf64a18e..a4f2a3d678 100644 --- a/tests/integrations/dramatiq/test_dramatiq.py +++ b/tests/integrations/dramatiq/test_dramatiq.py @@ -9,8 +9,9 @@ from sentry_sdk import start_transaction from sentry_sdk.consts import SPANSTATUS from sentry_sdk.integrations.dramatiq import DramatiqIntegration +from sentry_sdk.integrations.logging import ignore_logger -# from sentry_sdk.integrations.logging import LoggingIntegration +ignore_logger("dramatiq.worker.WorkerThread") @pytest.fixture(scope="function") @@ -18,7 +19,6 @@ def broker(request, sentry_init): sentry_init( integrations=[DramatiqIntegration()], traces_sample_rate=getattr(request, "param", None), - # disabled_integrations=[LoggingIntegration()], ) broker = StubBroker() broker.emit_after("process_boot") @@ -78,8 +78,7 @@ def dummy_actor(x, y): error_event = events.pop(0) exception = error_event["exception"]["values"][0] assert exception["type"] == "ZeroDivisionError" - # todo: failed assert. Logging instead of dramatiq - # assert exception["mechanism"]["type"] == DramatiqIntegration.identifier + assert exception["mechanism"]["type"] == DramatiqIntegration.identifier (event,) = events assert event["type"] == "transaction"