|
10 | 10 | from scrapy.settings import Settings
|
11 | 11 | from scrapy.utils.python import to_unicode
|
12 | 12 | from twisted.internet.defer import Deferred
|
| 13 | +from twisted.python import failure |
13 | 14 | from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
|
14 | 15 |
|
15 | 16 |
|
@@ -115,24 +116,28 @@ class _ThreadedLoopAdapter:
|
115 | 116 | _stop_events: Dict[int, asyncio.Event] = {}
|
116 | 117 |
|
117 | 118 | @classmethod
|
118 |
| - async def _handle_coro(cls, coro, future) -> None: |
| 119 | + async def _handle_coro(cls, coro: Awaitable, dfd: Deferred) -> None: |
| 120 | + from twisted.internet import reactor |
| 121 | + |
119 | 122 | try:
|
120 |
| - future.set_result(await coro) |
| 123 | + result = await coro |
121 | 124 | except Exception as exc:
|
122 |
| - future.set_exception(exc) |
| 125 | + reactor.callFromThread(dfd.errback, failure.Failure(exc)) |
| 126 | + else: |
| 127 | + reactor.callFromThread(dfd.callback, result) |
123 | 128 |
|
124 | 129 | @classmethod
|
125 | 130 | async def _process_queue(cls) -> None:
|
126 | 131 | while any(not ev.is_set() for ev in cls._stop_events.values()):
|
127 |
| - coro, future = await cls._coro_queue.get() |
128 |
| - asyncio.create_task(cls._handle_coro(coro, future)) |
| 132 | + coro, dfd = await cls._coro_queue.get() |
| 133 | + asyncio.create_task(cls._handle_coro(coro, dfd)) |
129 | 134 | cls._coro_queue.task_done()
|
130 | 135 |
|
131 | 136 | @classmethod
|
132 | 137 | def _deferred_from_coro(cls, coro) -> Deferred:
|
133 |
| - future: asyncio.Future = asyncio.Future() |
134 |
| - asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop) |
135 |
| - return scrapy.utils.defer.deferred_from_coro(future) |
| 138 | + dfd: Deferred = Deferred() |
| 139 | + asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, dfd)), cls._loop) |
| 140 | + return dfd |
136 | 141 |
|
137 | 142 | @classmethod
|
138 | 143 | def start(cls, caller_id: int) -> None:
|
|
0 commit comments