From 7fd99bd8c8dca6684535c74d4f0fd47e78843eee Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 5 Jul 2023 16:11:14 +0300 Subject: [PATCH 1/4] fix: worker hangs --- taskiq/cli/worker/run.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 466886fc..c879ab83 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -79,22 +79,6 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213 :raises ValueError: if broker is not an AsyncBroker instance. :raises ValueError: if receiver is not a Receiver type. """ - if uvloop is not None: - logger.debug("UVLOOP found. Installing policy.") - uvloop.install() - # This option signals that current - # broker is running as a worker. - # We must set this field before importing tasks, - # so broker will remember all tasks it's related to. - AsyncBroker.is_worker_process = True - broker = import_object(args.broker) - import_tasks(args.modules, args.tasks_pattern, args.fs_discover) - if not isinstance(broker, AsyncBroker): - raise ValueError("Unknown broker type. Please use AsyncBroker instance.") - - receiver_type = get_receiver_type(args) - receiver_args = dict(args.receiver_arg) - # Here how we manage interruptions. # We have to remember shutting_down state, # because KeyboardInterrupt can be send multiple @@ -122,6 +106,22 @@ def interrupt_handler(signum: int, _frame: Any) -> None: signal.signal(signal.SIGINT, interrupt_handler) signal.signal(signal.SIGTERM, interrupt_handler) + if uvloop is not None: + logger.debug("UVLOOP found. Installing policy.") + uvloop.install() + # This option signals that current + # broker is running as a worker. + # We must set this field before importing tasks, + # so broker will remember all tasks it's related to. + AsyncBroker.is_worker_process = True + broker = import_object(args.broker) + import_tasks(args.modules, args.tasks_pattern, args.fs_discover) + if not isinstance(broker, AsyncBroker): + raise ValueError("Unknown broker type. Please use AsyncBroker instance.") + + receiver_type = get_receiver_type(args) + receiver_args = dict(args.receiver_arg) + loop = asyncio.get_event_loop() try: From 323138b334e012d4a44afe2433000cf2b3b8120b Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 5 Jul 2023 17:11:08 +0300 Subject: [PATCH 2/4] fix: wait a bit for handler, check current processs in main handler --- taskiq/cli/worker/process_manager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index 557a2857..57a9aea9 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -1,7 +1,7 @@ import logging import signal from dataclasses import dataclass -from multiprocessing import Process, Queue +from multiprocessing import Process, Queue, current_process from time import sleep from typing import Any, Callable, List, Optional @@ -82,6 +82,7 @@ def handle( new_process.start() logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}") workers[self.worker_num] = new_process + sleep(0.1) @dataclass @@ -118,6 +119,9 @@ def get_signal_handler( """ def _signal_handler(signum: int, _frame: Any) -> None: + if current_process().name.startswith("worker"): + raise KeyboardInterrupt + logger.debug(f"Got signal {signum}.") action_queue.put(ShutdownAction()) logger.warn("Workers are scheduled for shutdown.") From dc9acb6437317137adee1f7540dca314465d17a1 Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 5 Jul 2023 17:54:02 +0300 Subject: [PATCH 3/4] fix: notify parent process worker is ready --- taskiq/cli/worker/process_manager.py | 28 ++++++++++++++++++++++------ taskiq/cli/worker/run.py | 7 ++++++- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index 57a9aea9..f6b46a2e 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -1,7 +1,9 @@ import logging import signal +from contextlib import suppress from dataclasses import dataclass -from multiprocessing import Process, Queue, current_process +from multiprocessing import Event, Process, Queue, current_process +from multiprocessing.synchronize import Event as EventType from time import sleep from typing import Any, Callable, List, Optional @@ -54,7 +56,7 @@ def handle( self, workers: List[Process], args: WorkerArgs, - worker_func: Callable[[WorkerArgs], None], + worker_func: Callable[[WorkerArgs, EventType], None], ) -> None: """ This action reloads a single process. @@ -73,16 +75,17 @@ def handle( logger.debug(f"Process {worker.name} is already terminated.") # Waiting worker shutdown. worker.join() + event: EventType = Event() new_process = Process( target=worker_func, - kwargs={"args": args}, + kwargs={"args": args, "event": event}, name=f"worker-{self.worker_num}", daemon=True, ) new_process.start() logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}") workers[self.worker_num] = new_process - sleep(0.1) + _wait_for_worker_startup(new_process, event) @dataclass @@ -90,6 +93,12 @@ class ShutdownAction(ProcessActionBase): """This action shuts down process manager loop.""" +def _wait_for_worker_startup(process: Process, event: EventType) -> None: + while process.is_alive(): + with suppress(TimeoutError): + event.wait(0.1) + + def schedule_workers_reload( action_queue: "Queue[ProcessActionBase]", ) -> None: @@ -141,7 +150,7 @@ class ProcessManager: def __init__( self, args: WorkerArgs, - worker_function: Callable[[WorkerArgs], None], + worker_function: Callable[[WorkerArgs, EventType], None], observer: Optional[Observer] = None, ) -> None: self.worker_function = worker_function @@ -166,10 +175,12 @@ def __init__( def prepare_workers(self) -> None: """Spawn multiple processes.""" + events: List[EventType] = [] for process in range(self.args.workers): + event = Event() work_proc = Process( target=self.worker_function, - kwargs={"args": self.args}, + kwargs={"args": self.args, "event": event}, name=f"worker-{process}", daemon=True, ) @@ -180,6 +191,11 @@ def prepare_workers(self) -> None: work_proc.pid, ) self.workers.append(work_proc) + events.append(event) + + # Wait for workers startup + for worker, event in zip(self.workers, events): + _wait_for_worker_startup(worker, event) def start(self) -> None: # noqa: C901, WPS213 """ diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index c879ab83..93582686 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -2,6 +2,7 @@ import logging import signal from concurrent.futures import ThreadPoolExecutor +from multiprocessing.synchronize import Event from typing import Any, Type from taskiq.abc.broker import AsyncBroker @@ -65,7 +66,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]: return receiver_type -def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213 +def start_listen(args: WorkerArgs, event: Event) -> None: # noqa: WPS210, WPS213 """ This function starts actual listening process. @@ -76,6 +77,7 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213 field. :param args: CLI arguments. + :param event: Event for notification. :raises ValueError: if broker is not an AsyncBroker instance. :raises ValueError: if receiver is not a Receiver type. """ @@ -106,6 +108,9 @@ def interrupt_handler(signum: int, _frame: Any) -> None: signal.signal(signal.SIGINT, interrupt_handler) signal.signal(signal.SIGTERM, interrupt_handler) + # Notify parent process, worker is ready + event.set() + if uvloop is not None: logger.debug("UVLOOP found. Installing policy.") uvloop.install() From da31c33fef27684c6bcd42ce4e6ba29a1db7bcd4 Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 5 Jul 2023 18:08:58 +0300 Subject: [PATCH 4/4] fix: break loop --- taskiq/cli/worker/process_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index f6b46a2e..dab6327f 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -97,6 +97,7 @@ def _wait_for_worker_startup(process: Process, event: EventType) -> None: while process.is_alive(): with suppress(TimeoutError): event.wait(0.1) + return def schedule_workers_reload( @@ -151,7 +152,7 @@ def __init__( self, args: WorkerArgs, worker_function: Callable[[WorkerArgs, EventType], None], - observer: Optional[Observer] = None, + observer: Optional[Observer] = None, # type: ignore[valid-type] ) -> None: self.worker_function = worker_function self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)