From 4ed0cf5b1d04c1a8783f20b89af14f8feba5c0ef Mon Sep 17 00:00:00 2001 From: sapkota-aayush Date: Thu, 17 Jul 2025 22:14:54 -0400 Subject: [PATCH] fix(sync-server): thread-safe shutdown and error reporting in SyncMapServicer; remove temporary race condition test files Signed-off-by: sapkota-aayush --- pynumaflow/mapper/_servicer/_sync_servicer.py | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/pynumaflow/mapper/_servicer/_sync_servicer.py b/pynumaflow/mapper/_servicer/_sync_servicer.py index 40b6c7ca..df818af0 100644 --- a/pynumaflow/mapper/_servicer/_sync_servicer.py +++ b/pynumaflow/mapper/_servicer/_sync_servicer.py @@ -25,6 +25,22 @@ def __init__(self, handler: MapSyncCallable, multiproc: bool = False): self.multiproc = multiproc # create a thread pool for executing UDF code self.executor = ThreadPoolExecutor(max_workers=NUM_THREADS_DEFAULT) + # Thread-safe event to track shutdown state and prevent race conditions + self._shutdown_event = threading.Event() + self._shutdown_lock = threading.Lock() # NEW: lock for shutdown/error handling + + def _handle_error(self, context, error): + """ + Ensures only one thread triggers shutdown and error reporting. + """ + with self._shutdown_lock: + if not self._shutdown_event.is_set(): + self._shutdown_event.set() + exit_on_error( + context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(error)}", parent=self.multiproc + ) + else: + _LOGGER.info("Shutdown already initiated by another thread, exiting quietly") def MapFn( self, @@ -56,10 +72,7 @@ def MapFn( for res in result_queue.read_iterator(): # if error handler accordingly if isinstance(res, BaseException): - # Terminate the current server process due to exception - exit_on_error( - context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(res)}", parent=self.multiproc - ) + self._handle_error(context, res) return # return the result yield res @@ -70,10 +83,7 @@ def MapFn( except BaseException as err: _LOGGER.critical("UDFError, re-raising the error", exc_info=True) - # Terminate the current server process due to exception - exit_on_error( - context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(err)}", parent=self.multiproc - ) + self._handle_error(context, err) return def _process_requests( @@ -86,6 +96,10 @@ def _process_requests( # read through all incoming requests and submit to the # threadpool for invocation for request in request_iterator: + # Check if shutdown has been initiated before submitting new tasks + if self._shutdown_event.is_set(): + _LOGGER.info("Shutdown initiated, stopping request processing") + break _ = self.executor.submit(self._invoke_map, context, request, result_queue) # wait for all tasks to finish after all requests exhausted self.executor.shutdown(wait=True) @@ -101,6 +115,11 @@ def _invoke_map( result_queue: SyncIterator, ): try: + # Check if shutdown has been initiated before processing + if self._shutdown_event.is_set(): + _LOGGER.info("Shutdown initiated, skipping map invocation") + return + d = Datum( keys=list(request.request.keys), value=request.request.value, @@ -123,7 +142,11 @@ def _invoke_map( except BaseException as e: _LOGGER.critical("MapFn handler error", exc_info=True) - result_queue.put(e) + # Only put the exception in the queue if shutdown hasn't been initiated + if not self._shutdown_event.is_set(): + result_queue.put(e) + else: + _LOGGER.info("Shutdown already initiated, not queuing additional error") return def IsReady(