Skip to content

Commit 7c5dec0

Browse files
committed
ref(transport): Fix event loop handling in async transport
Async Transport now properly checks for the presence of the event loop in capture_envelop, and drops items in case the event loop is no longer running for some reason. GH-4582
1 parent 8809b08 commit 7c5dec0

File tree

1 file changed

+25
-14
lines changed

1 file changed

+25
-14
lines changed

sentry_sdk/transport.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from sentry_sdk.consts import EndpointType
3131
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
32-
from sentry_sdk.worker import BackgroundWorker, Worker
32+
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
3333
from sentry_sdk.envelope import Envelope, Item, PayloadRef
3434

3535
from typing import TYPE_CHECKING
@@ -225,9 +225,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
225225
elif self._compression_algo == "br":
226226
self._compression_level = 4
227227

228-
def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
229-
# For now, we only support the threaded sync background worker.
230-
return BackgroundWorker(queue_size=options["transport_queue_size"])
228+
def _create_worker(self, options: dict[str, Any]) -> Worker:
229+
async_enabled = options.get("_experiments", {}).get("transport_async", False)
230+
worker_cls = AsyncWorker if async_enabled else BackgroundWorker
231+
return worker_cls(queue_size=options["transport_queue_size"])
231232

232233
def record_lost_event(
233234
self: Self,
@@ -645,18 +646,26 @@ async def send_envelope_wrapper() -> None:
645646

646647
def capture_envelope(self: Self, envelope: Envelope) -> None:
647648
# Synchronous entry point
648-
if asyncio.get_running_loop() is not None:
649+
try:
650+
asyncio.get_running_loop()
649651
# We are on the main thread running the event loop
650652
task = asyncio.create_task(self._capture_envelope(envelope))
651653
self.background_tasks.add(task)
652654
task.add_done_callback(self.background_tasks.discard)
653-
else:
655+
except RuntimeError:
654656
# We are in a background thread, not running an event loop,
655657
# have to launch the task on the loop in a threadsafe way.
656-
asyncio.run_coroutine_threadsafe(
657-
self._capture_envelope(envelope),
658-
self._loop,
659-
)
658+
if self._loop and self._loop.is_running():
659+
asyncio.run_coroutine_threadsafe(
660+
self._capture_envelope(envelope),
661+
self._loop,
662+
)
663+
else:
664+
# The event loop is no longer running
665+
logger.warning("Async Transport is not running in an event loop.")
666+
self.on_dropped_event("no_async_context")
667+
for item in envelope.items:
668+
self.record_lost_event("no_async_context", item=item)
660669

661670
async def flush_async(
662671
self: Self,
@@ -996,11 +1005,13 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
9961005
ref_transport = options["transport"]
9971006

9981007
use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
999-
1008+
use_async_transport = options.get("_experiments", {}).get("transport_async", False)
10001009
# By default, we use the http transport class
1001-
transport_cls: Type[Transport] = (
1002-
Http2Transport if use_http2_transport else HttpTransport
1003-
)
1010+
if use_async_transport and asyncio.get_running_loop() is not None:
1011+
transport_cls: Type[Transport] = AsyncHttpTransport
1012+
else:
1013+
use_http2 = use_http2_transport
1014+
transport_cls = Http2Transport if use_http2 else HttpTransport
10041015

10051016
if isinstance(ref_transport, Transport):
10061017
return ref_transport

0 commit comments

Comments
 (0)