Skip to content

Commit 6f4ea54

Browse files
committed
Eliminate copy-paste from Kubernetes runner.
1 parent 262c589 commit 6f4ea54

File tree

2 files changed

+17
-30
lines changed

2 files changed

+17
-30
lines changed

lib/galaxy/jobs/runners/__init__.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -906,17 +906,7 @@ def check_watched_items(self):
906906
def check_watched_item(self, job_state):
907907
raise NotImplementedError()
908908

909-
def finish_job(self, job_state: AsynchronousJobState):
910-
"""
911-
Get the output/error for a finished job, pass to `job_wrapper.finish`
912-
and cleanup all the job's temporary files.
913-
"""
914-
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
915-
external_job_id = job_state.job_id
916-
917-
# To ensure that files below are readable, ownership must be reclaimed first
918-
job_state.job_wrapper.reclaim_ownership()
919-
909+
def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState):
920910
# wait for the files to appear
921911
which_try = 0
922912
collect_output_success = True
@@ -930,11 +920,25 @@ def finish_job(self, job_state: AsynchronousJobState):
930920
if which_try == self.app.config.retry_job_output_collection:
931921
stdout = ""
932922
stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
933-
log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e))
923+
log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e))
934924
collect_output_success = False
935925
else:
936926
time.sleep(1)
937927
which_try += 1
928+
return collect_output_success, stdout, stderr
929+
930+
def finish_job(self, job_state: AsynchronousJobState):
931+
"""
932+
Get the output/error for a finished job, pass to `job_wrapper.finish`
933+
and cleanup all the job's temporary files.
934+
"""
935+
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
936+
external_job_id = job_state.job_id
937+
938+
# To ensure that files below are readable, ownership must be reclaimed first
939+
job_state.job_wrapper.reclaim_ownership()
940+
941+
collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state)
938942

939943
if not collect_output_success:
940944
job_state.fail_message = stderr

lib/galaxy/jobs/runners/kubernetes.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import math
88
import os
99
import re
10-
import time
1110
from dataclasses import dataclass
1211
from datetime import datetime
1312

@@ -49,7 +48,6 @@
4948
Service,
5049
service_object_dict,
5150
)
52-
from galaxy.util import unicodify
5351
from galaxy.util.bytesize import ByteSize
5452

5553
log = logging.getLogger(__name__)
@@ -1094,22 +1092,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed",
10941092
# To ensure that files below are readable, ownership must be reclaimed first
10951093
job_state.job_wrapper.reclaim_ownership()
10961094

1097-
# wait for the files to appear
1098-
which_try = 0
1099-
while which_try < self.app.config.retry_job_output_collection + 1:
1100-
try:
1101-
with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file:
1102-
job_stdout = self._job_io_for_db(stdout_file)
1103-
job_stderr = self._job_io_for_db(stderr_file)
1104-
break
1105-
except Exception as e:
1106-
if which_try == self.app.config.retry_job_output_collection:
1107-
job_stdout = ""
1108-
job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
1109-
log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}")
1110-
else:
1111-
time.sleep(1)
1112-
which_try += 1
1095+
_, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state)
11131096

11141097
# get stderr and stdout to database
11151098
outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs")

0 commit comments

Comments
 (0)