Skip to content

Commit 86b58fc

Browse files
AlexTatemr-c
authored andcommitted
Actively running jobs respond to the kill switch by checking the switch's status in the monitor function. The monitor function, up to this point, has been for gathering memory usage statistics via a timer thread. A second timer thread now monitors the kill switch.
1 parent bc2ee0f commit 86b58fc

File tree

1 file changed

+54
-5
lines changed

1 file changed

+54
-5
lines changed

cwltool/job.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading
521521
memory_usage: MutableSequence[Optional[int]] = [None]
522522

523523
mem_tm: "Optional[Timer]" = None
524+
ks_tm: "Optional[Timer]" = None
524525

525526
def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
526527
nonlocal mem_tm
@@ -542,10 +543,27 @@ def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
542543
if mem_tm is not None:
543544
mem_tm.cancel()
544545

546+
def monitor_kill_switch() -> None:
547+
nonlocal ks_tm
548+
if kill_switch.is_set():
549+
_logger.error("[job %s] terminating by kill switch", self.name)
550+
if sproc.stdin: sproc.stdin.close()
551+
sproc.terminate()
552+
else:
553+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
554+
ks_tm.daemon = True
555+
ks_tm.start()
556+
557+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
558+
ks_tm.daemon = True
559+
ks_tm.start()
560+
545561
mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
546562
mem_tm.daemon = True
547563
mem_tm.start()
564+
548565
sproc.wait()
566+
ks_tm.cancel()
549567
mem_tm.cancel()
550568
if memory_usage[0] is not None:
551569
_logger.info(
@@ -859,20 +877,48 @@ def docker_monitor(
859877
process: "subprocess.Popen[str]",
860878
kill_switch: threading.Event,
861879
) -> None:
862-
"""Record memory usage of the running Docker container."""
880+
"""Record memory usage of the running Docker container. Terminate if kill_switch is activated."""
881+
882+
ks_tm: "Optional[Timer]" = None
883+
cid: Optional[str] = None
884+
885+
def monitor_kill_switch() -> None:
886+
nonlocal ks_tm
887+
if kill_switch.is_set():
888+
_logger.error("[job %s] terminating by kill switch", self.name)
889+
if process.stdin:
890+
process.stdin.close()
891+
if cid is not None:
892+
kill_proc = subprocess.Popen( # nosec
893+
[docker_exe, "kill", cid], shell=False # nosec
894+
)
895+
try:
896+
kill_proc.wait(timeout=10)
897+
except subprocess.TimeoutExpired:
898+
kill_proc.kill()
899+
process.terminate() # Always terminate, even if we tried with the cidfile
900+
else:
901+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
902+
ks_tm.daemon = True
903+
ks_tm.start()
904+
905+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
906+
ks_tm.daemon = True
907+
ks_tm.start()
908+
863909
# Todo: consider switching to `docker create` / `docker start`
864910
# instead of `docker run` as `docker create` outputs the container ID
865911
# to stdout, but the container is frozen, thus allowing us to start the
866912
# monitoring process without dealing with the cidfile or too-fast
867913
# container execution
868-
cid: Optional[str] = None
869914
while cid is None:
870915
time.sleep(1)
871916
# This is needed to avoid a race condition where the job
872917
# was so fast that it already finished when it arrives here
873918
if process.returncode is None:
874919
process.poll()
875920
if process.returncode is not None:
921+
ks_tm.cancel()
876922
if cleanup_cidfile:
877923
try:
878924
os.remove(cidfile)
@@ -904,6 +950,9 @@ def docker_monitor(
904950
except OSError as exc:
905951
_logger.warning("Ignored error with %s stats: %s", docker_exe, exc)
906952
return
953+
finally:
954+
ks_tm.cancel()
955+
907956
max_mem_percent: float = 0.0
908957
mem_percent: float = 0.0
909958
with open(stats_file_name) as stats:
@@ -938,7 +987,7 @@ def _job_popen(
938987
job_script_contents: Optional[str] = None,
939988
timelimit: Optional[int] = None,
940989
name: Optional[str] = None,
941-
monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
990+
monitor_function: Optional[Callable[["subprocess.Popen[str]", "threading.Event"], None]] = None,
942991
default_stdout: Optional[Union[IO[bytes], TextIO]] = None,
943992
default_stderr: Optional[Union[IO[bytes], TextIO]] = None,
944993
) -> int:
@@ -993,7 +1042,7 @@ def terminate(): # type: () -> None
9931042
tm.daemon = True
9941043
tm.start()
9951044
if monitor_function:
996-
monitor_function(sproc)
1045+
monitor_function(sproc, kill_switch)
9971046
rcode = sproc.wait()
9981047

9991048
if tm is not None:
@@ -1069,7 +1118,7 @@ def terminate(): # type: () -> None
10691118
tm.daemon = True
10701119
tm.start()
10711120
if monitor_function:
1072-
monitor_function(sproc)
1121+
monitor_function(sproc, kill_switch)
10731122

10741123
rcode = sproc.wait()
10751124

0 commit comments

Comments
 (0)