|
1 | 1 | import asyncio
|
2 |
| -import concurrent |
3 | 2 | import logging
|
4 | 3 | import platform
|
5 | 4 | import threading
|
@@ -106,34 +105,66 @@ async def _get_header_value(
|
106 | 105 |
|
107 | 106 | if platform.system() == "Windows":
|
108 | 107 |
|
109 |
| - class _WindowsAdapter: |
110 |
| - """Utility class to redirect coroutines to an asyncio event loop running |
111 |
| - in a different thread. This allows to use a ProactorEventLoop, which is |
112 |
| - supported by Playwright on Windows. |
| 108 | + class _ThreadedLoopAdapter: |
| 109 | + """Utility class to start an asyncio event loop in a new thread and redirect coroutines. |
| 110 | + This allows to run Playwright in a different loop than the Scrapy crawler, allowing to |
| 111 | + use ProactorEventLoop which is supported by Playwright on Windows. |
113 | 112 | """
|
114 | 113 |
|
115 |
| - loop = None |
116 |
| - thread = None |
| 114 | + _loop: asyncio.AbstractEventLoop |
| 115 | + _thread: threading.Thread |
| 116 | + _coro_queue: asyncio.Queue = asyncio.Queue() |
| 117 | + _stop_event: asyncio.Event = asyncio.Event() |
117 | 118 |
|
118 | 119 | @classmethod
|
119 |
| - def get_event_loop(cls) -> asyncio.AbstractEventLoop: |
120 |
| - if cls.thread is None: |
121 |
| - if cls.loop is None: |
122 |
| - policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore |
123 |
| - cls.loop = policy.new_event_loop() |
124 |
| - asyncio.set_event_loop(cls.loop) |
125 |
| - if not cls.loop.is_running(): |
126 |
| - cls.thread = threading.Thread(target=cls.loop.run_forever, daemon=True) |
127 |
| - cls.thread.start() |
128 |
| - logger.info("Started loop on separate thread: %s", cls.loop) |
129 |
| - return cls.loop |
| 120 | + async def _handle_coro(cls, coro, future) -> None: |
| 121 | + try: |
| 122 | + future.set_result(await coro) |
| 123 | + except Exception as exc: |
| 124 | + future.set_exception(exc) |
130 | 125 |
|
131 | 126 | @classmethod
|
132 |
| - async def get_result(cls, coro) -> concurrent.futures.Future: |
133 |
| - return asyncio.run_coroutine_threadsafe(coro=coro, loop=cls.get_event_loop()).result() |
| 127 | + async def _process_queue(cls) -> None: |
| 128 | + while not cls._stop_event.is_set(): |
| 129 | + coro, future = await cls._coro_queue.get() |
| 130 | + asyncio.create_task(cls._handle_coro(coro, future)) |
| 131 | + cls._coro_queue.task_done() |
134 | 132 |
|
135 |
| - def _deferred_from_coro(coro) -> Deferred: |
136 |
| - return scrapy.utils.defer.deferred_from_coro(_WindowsAdapter.get_result(coro)) |
| 133 | + @classmethod |
| 134 | + def _deferred_from_coro(cls, coro) -> Deferred: |
| 135 | + future: asyncio.Future = asyncio.Future() |
| 136 | + asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop) |
| 137 | + return scrapy.utils.defer.deferred_from_coro(future) |
| 138 | + |
| 139 | + @classmethod |
| 140 | + def start(cls) -> None: |
| 141 | + policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore[attr-defined] |
| 142 | + cls._loop = policy.new_event_loop() |
| 143 | + asyncio.set_event_loop(cls._loop) |
| 144 | + |
| 145 | + cls._thread = threading.Thread(target=cls._loop.run_forever, daemon=True) |
| 146 | + cls._thread.start() |
| 147 | + logger.info("Started loop on separate thread: %s", cls._loop) |
| 148 | + |
| 149 | + asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop) |
| 150 | + |
| 151 | + @classmethod |
| 152 | + def stop(cls) -> None: |
| 153 | + cls._stop_event.set() |
| 154 | + asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop) |
| 155 | + cls._loop.call_soon_threadsafe(cls._loop.stop) |
| 156 | + cls._thread.join() |
137 | 157 |
|
| 158 | + _deferred_from_coro = _ThreadedLoopAdapter._deferred_from_coro |
138 | 159 | else:
|
| 160 | + |
| 161 | + class _ThreadedLoopAdapter: # type: ignore[no-redef] |
| 162 | + @classmethod |
| 163 | + def start(cls) -> None: |
| 164 | + pass |
| 165 | + |
| 166 | + @classmethod |
| 167 | + def stop(cls) -> None: |
| 168 | + pass |
| 169 | + |
139 | 170 | _deferred_from_coro = scrapy.utils.defer.deferred_from_coro
|
0 commit comments