Skip to content

Commit 7ebb964

Browse files
committed
Eliminate copy-paste from Kubernetes runner.
1 parent fcf1654 commit 7ebb964

File tree

2 files changed

+17
-30
lines changed

2 files changed

+17
-30
lines changed

lib/galaxy/jobs/runners/__init__.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -907,17 +907,7 @@ def check_watched_items(self):
907907
def check_watched_item(self, job_state: AsynchronousJobState) -> Union[AsynchronousJobState, None]:
908908
raise NotImplementedError()
909909

910-
def finish_job(self, job_state: AsynchronousJobState):
911-
"""
912-
Get the output/error for a finished job, pass to `job_wrapper.finish`
913-
and cleanup all the job's temporary files.
914-
"""
915-
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
916-
external_job_id = job_state.job_id
917-
918-
# To ensure that files below are readable, ownership must be reclaimed first
919-
job_state.job_wrapper.reclaim_ownership()
920-
910+
def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState):
921911
# wait for the files to appear
922912
which_try = 0
923913
collect_output_success = True
@@ -931,11 +921,25 @@ def finish_job(self, job_state: AsynchronousJobState):
931921
if which_try == self.app.config.retry_job_output_collection:
932922
stdout = ""
933923
stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
934-
log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e))
924+
log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e))
935925
collect_output_success = False
936926
else:
937927
time.sleep(1)
938928
which_try += 1
929+
return collect_output_success, stdout, stderr
930+
931+
def finish_job(self, job_state: AsynchronousJobState):
932+
"""
933+
Get the output/error for a finished job, pass to `job_wrapper.finish`
934+
and cleanup all the job's temporary files.
935+
"""
936+
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
937+
external_job_id = job_state.job_id
938+
939+
# To ensure that files below are readable, ownership must be reclaimed first
940+
job_state.job_wrapper.reclaim_ownership()
941+
942+
collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state)
939943

940944
if not collect_output_success:
941945
job_state.fail_message = stderr

lib/galaxy/jobs/runners/kubernetes.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import math
88
import os
99
import re
10-
import time
1110
from dataclasses import dataclass
1211
from datetime import datetime
1312
from typing import Union
@@ -50,7 +49,6 @@
5049
Service,
5150
service_object_dict,
5251
)
53-
from galaxy.util import unicodify
5452
from galaxy.util.bytesize import ByteSize
5553

5654
log = logging.getLogger(__name__)
@@ -1095,22 +1093,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed",
10951093
# To ensure that files below are readable, ownership must be reclaimed first
10961094
job_state.job_wrapper.reclaim_ownership()
10971095

1098-
# wait for the files to appear
1099-
which_try = 0
1100-
while which_try < self.app.config.retry_job_output_collection + 1:
1101-
try:
1102-
with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file:
1103-
job_stdout = self._job_io_for_db(stdout_file)
1104-
job_stderr = self._job_io_for_db(stderr_file)
1105-
break
1106-
except Exception as e:
1107-
if which_try == self.app.config.retry_job_output_collection:
1108-
job_stdout = ""
1109-
job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
1110-
log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}")
1111-
else:
1112-
time.sleep(1)
1113-
which_try += 1
1096+
_, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state)
11141097

11151098
# get stderr and stdout to database
11161099
outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs")

0 commit comments

Comments
 (0)