Skip to content

Commit 131b637

Browse files
fix(sync-server): thread-safe shutdown and error reporting in SyncMapServicer; remove temporary race condition test files
Signed-off-by: sapkota-aayush <[email protected]>
1 parent 42f9fbd commit 131b637

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

pynumaflow/mapper/_servicer/_sync_servicer.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ def __init__(self, handler: MapSyncCallable, multiproc: bool = False):
2525
self.multiproc = multiproc
2626
# create a thread pool for executing UDF code
2727
self.executor = ThreadPoolExecutor(max_workers=NUM_THREADS_DEFAULT)
28+
# Thread-safe event to track shutdown state and prevent race conditions
29+
self._shutdown_event = threading.Event()
30+
self._shutdown_lock = threading.Lock() # NEW: lock for shutdown/error handling
31+
32+
def _handle_error(self, context, error):
33+
"""
34+
Ensures only one thread triggers shutdown and error reporting.
35+
"""
36+
with self._shutdown_lock:
37+
if not self._shutdown_event.is_set():
38+
self._shutdown_event.set()
39+
exit_on_error(
40+
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(error)}", parent=self.multiproc
41+
)
42+
else:
43+
_LOGGER.info("Shutdown already initiated by another thread, exiting quietly")
2844

2945
def MapFn(
3046
self,
@@ -56,10 +72,7 @@ def MapFn(
5672
for res in result_queue.read_iterator():
5773
# if error handler accordingly
5874
if isinstance(res, BaseException):
59-
# Terminate the current server process due to exception
60-
exit_on_error(
61-
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(res)}", parent=self.multiproc
62-
)
75+
self._handle_error(context, res)
6376
return
6477
# return the result
6578
yield res
@@ -70,10 +83,7 @@ def MapFn(
7083

7184
except BaseException as err:
7285
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
73-
# Terminate the current server process due to exception
74-
exit_on_error(
75-
context, f"{ERR_UDF_EXCEPTION_STRING}: {repr(err)}", parent=self.multiproc
76-
)
86+
self._handle_error(context, err)
7787
return
7888

7989
def _process_requests(
@@ -86,6 +96,10 @@ def _process_requests(
8696
# read through all incoming requests and submit to the
8797
# threadpool for invocation
8898
for request in request_iterator:
99+
# Check if shutdown has been initiated before submitting new tasks
100+
if self._shutdown_event.is_set():
101+
_LOGGER.info("Shutdown initiated, stopping request processing")
102+
break
89103
_ = self.executor.submit(self._invoke_map, context, request, result_queue)
90104
# wait for all tasks to finish after all requests exhausted
91105
self.executor.shutdown(wait=True)
@@ -101,6 +115,11 @@ def _invoke_map(
101115
result_queue: SyncIterator,
102116
):
103117
try:
118+
# Check if shutdown has been initiated before processing
119+
if self._shutdown_event.is_set():
120+
_LOGGER.info("Shutdown initiated, skipping map invocation")
121+
return
122+
104123
d = Datum(
105124
keys=list(request.request.keys),
106125
value=request.request.value,
@@ -123,7 +142,11 @@ def _invoke_map(
123142

124143
except BaseException as e:
125144
_LOGGER.critical("MapFn handler error", exc_info=True)
126-
result_queue.put(e)
145+
# Only put the exception in the queue if shutdown hasn't been initiated
146+
if not self._shutdown_event.is_set():
147+
result_queue.put(e)
148+
else:
149+
_LOGGER.info("Shutdown already initiated, not queuing additional error")
127150
return
128151

129152
def IsReady(

0 commit comments

Comments
 (0)