From 818b74f568c5692cefd48918d58da3b089ae48ef Mon Sep 17 00:00:00 2001 From: Mike Krieger Date: Mon, 11 Jun 2012 12:55:41 -0700 Subject: [PATCH] Add RetryJob error which can be used to force a disconnection/reconnection from gearman server & subsequent job retry --- gearman/errors.py | 3 +++ gearman/worker.py | 18 ++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/gearman/errors.py b/gearman/errors.py index 5671033..c5fc7ce 100644 --- a/gearman/errors.py +++ b/gearman/errors.py @@ -24,3 +24,6 @@ class InvalidWorkerState(GearmanError): class InvalidAdminClientState(GearmanError): pass + +class RetryJob(Exception): + pass diff --git a/gearman/worker.py b/gearman/worker.py index e221914..4d860a9 100644 --- a/gearman/worker.py +++ b/gearman/worker.py @@ -5,7 +5,7 @@ from gearman import compat from gearman.connection_manager import GearmanConnectionManager from gearman.worker_handler import GearmanWorkerCommandHandler -from gearman.errors import ConnectionError +from gearman.errors import ConnectionError, RetryJob gearman_logger = logging.getLogger(__name__) @@ -108,6 +108,13 @@ def shutdown(self): ############################################################### ## Methods to override when dealing with connection polling ## ############################################################## + def force_reconnect(self, current_job): + current_handler = self._get_handler_for_job(current_job) + current_connection = self.handler_to_connection_map[current_handler] + current_connection.close() + self.establish_connection(current_connection) + + def establish_worker_connections(self): """Return a shuffled list of connections that are alive, and try to reconnect to dead connections if necessary.""" self.randomized_connections = list(self.connection_list) @@ -216,6 +223,9 @@ def on_job_execute(self, current_job): try: function_callback = self.worker_abilities[current_job.task] job_result = function_callback(self, current_job) + except RetryJob: + self.force_reconnect(current_job) + return except Exception: return self.on_job_exception(current_job, sys.exc_info()) @@ -248,10 +258,10 @@ def set_job_lock(self, command_handler, lock): self.command_handler_holding_job_lock = None return True - + def has_job_lock(self): return bool(self.command_handler_holding_job_lock is not None) - + def check_job_lock(self, command_handler): """Check to see if we hold the job lock""" - return bool(self.command_handler_holding_job_lock == command_handler) + return bool(self.command_handler_holding_job_lock == command_handler) \ No newline at end of file