Skip to content

Commit 2a76d82

Browse files
committed
refactor(ProcessExecution): remove io as a field on the class and supply it as a parm to all_output_data instead
The main reason is that the io object isn't serialisable, making it harder to keep track of. Instead, we pass it in as a param where it is needed. BREAKING CHANGE: removes `io` field from `ProcessExecution` and instead adds it as a parameter to `ProcessExecution.all_output_data`.
1 parent ae60c4e commit 2a76d82

File tree

6 files changed

+40
-49
lines changed

6 files changed

+40
-49
lines changed

nextflow/command.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ def get_execution(execution_path, log_path, nextflow_command, execution=None, lo
348348
if not log: return None, 0
349349
log = log[log_start:]
350350
execution = make_or_update_execution(log, execution_path, nextflow_command, execution, io)
351-
process_executions, changed = get_initial_process_executions(log, execution, io)
351+
process_executions, changed = get_initial_process_executions(log, execution)
352352
no_path = [k for k, v in process_executions.items() if not v.path]
353353
process_ids_to_paths = get_process_ids_to_paths(no_path, execution_path, io)
354354
for process_id, path in process_ids_to_paths.items():
@@ -392,7 +392,7 @@ def make_or_update_execution(log, execution_path, nextflow_command, execution, i
392392
return execution
393393

394394

395-
def get_initial_process_executions(log, execution, io):
395+
def get_initial_process_executions(log, execution):
396396
"""Parses a section of a log file and looks for new process executions not
397397
currently in the list, or uncompleted ones which can now be completed. Some
398398
attributes are not yet filled in.
@@ -401,7 +401,6 @@ def get_initial_process_executions(log, execution, io):
401401
402402
:param str log: a section of the log file.
403403
:param nextflow.models.Execution execution: the containing execution.
404-
:param io: an optional custom io object to handle file operations.
405404
:rtype: ``tuple``"""
406405

407406
lines = log.splitlines()
@@ -410,7 +409,7 @@ def get_initial_process_executions(log, execution, io):
410409
for line in lines:
411410
if "Submitted process" in line or "Cached process" in line:
412411
is_cached = "Cached process" in line
413-
proc_ex = create_process_execution_from_line(line, is_cached, io)
412+
proc_ex = create_process_execution_from_line(line, is_cached)
414413
if not proc_ex: continue
415414
proc_ex.execution = execution
416415
process_executions[proc_ex.identifier] = proc_ex
@@ -422,13 +421,12 @@ def get_initial_process_executions(log, execution, io):
422421
return process_executions, just_updated
423422

424423

425-
def create_process_execution_from_line(line, cached=False, io=None):
424+
def create_process_execution_from_line(line, cached=False):
426425
"""Creates a process execution from a line of the log file in which its
427426
submission (or previous caching) is reported.
428427
429428
:param str line: a line from the log file.
430429
:param bool cached: whether the process is cached.
431-
:param io: an optional custom io object to handle file operations.
432430
:rtype: ``nextflow.models.ProcessExecution``"""
433431

434432
if cached:
@@ -442,7 +440,7 @@ def create_process_execution_from_line(line, cached=False, io=None):
442440
path="", stdout="", stderr="", bash="", started=None, finished=None,
443441
return_code="0" if cached else "",
444442
status="COMPLETED" if cached else "-",
445-
cached=cached, io=io
443+
cached=cached
446444
)
447445

448446

nextflow/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ class ProcessExecution:
7878
finished: datetime | None
7979
status: str
8080
cached: bool
81-
io: Any
8281

8382

8483
def __repr__(self):
@@ -126,17 +125,18 @@ def input_data(self, include_path=True):
126125
return [os.path.basename(f) for f in inputs]
127126

128127

129-
def all_output_data(self, include_path=True):
128+
def all_output_data(self, include_path=True, io=None):
130129
"""A list of all output data produced by the process execution,
131130
including unpublished staging files.
132131
133132
:param bool include_path: if ``False``, only filenames returned.
133+
:param io: an optional custom io object to handle file operations.
134134
:type: ``list``"""
135135

136136
outputs = []
137137
if not self.path: return []
138138
inputs = self.input_data(include_path=False)
139-
listdir = self.io.listdir if self.io else os.listdir
139+
listdir = io.listdir if io else os.listdir
140140
for f in listdir(self.full_path):
141141
full_path = Path(f"{self.full_path}/{f}")
142142
if not f.startswith(".command") and f != ".exitcode":

tests/integration/base.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def check_running_execution(self, execution, last_stdout, output_path=None):
3333
return execution.stdout
3434

3535

36-
def check_execution(self, execution, line_count=24, output_path=None, log_path=None, version=None, timezone=None, report=None, timeline=None, dag=None, trace=None, check_stderr=True):
36+
def check_execution(self, execution, line_count=24, output_path=None, log_path=None, version=None, timezone=None, report=None, timeline=None, dag=None, trace=None, check_stderr=True, io=None):
3737
# Files created
3838
if not output_path: self.assertIn(".nextflow", os.listdir(self.get_path("rundirectory")))
3939
if log_path:
@@ -105,10 +105,10 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
105105
self.assertEqual(proc_ex.input_data(), [self.get_path("files/data.txt")])
106106
self.assertEqual(proc_ex.input_data(include_path=False), ["data.txt"])
107107
self.assertEqual(
108-
set(proc_ex.all_output_data(include_path=False)),
108+
set(proc_ex.all_output_data(include_path=False, io=io)),
109109
{"abc.dat", "xyz.dat", "log.txt"}
110110
)
111-
self.assertIn(proc_ex.identifier, proc_ex.all_output_data()[0])
111+
self.assertIn(proc_ex.identifier, proc_ex.all_output_data(io=io)[0])
112112

113113
proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:DUPLICATE (abc.dat)")
114114
self.check_process_execution(proc_ex, execution, False, check_time=not timezone)
@@ -121,9 +121,9 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
121121
proc_ex.input_data()[0]
122122
)
123123
self.assertEqual(
124-
set(proc_ex.all_output_data(include_path=False)), {"duplicated_abc.dat"}
124+
set(proc_ex.all_output_data(include_path=False, io=io)), {"duplicated_abc.dat"}
125125
)
126-
with open(proc_ex.all_output_data(include_path=True)[0]) as f:
126+
with open(proc_ex.all_output_data(include_path=True, io=io)[0]) as f:
127127
self.assertEqual(len(f.read().splitlines()), line_count)
128128

129129
proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:DUPLICATE (xyz.dat)")
@@ -137,7 +137,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
137137
proc_ex.input_data()[0]
138138
)
139139
self.assertEqual(
140-
set(proc_ex.all_output_data(include_path=False)), {"duplicated_xyz.dat"}
140+
set(proc_ex.all_output_data(include_path=False, io=io)), {"duplicated_xyz.dat"}
141141
)
142142

143143
proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:LOWER (duplicated_abc.dat)")
@@ -151,7 +151,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
151151
proc_ex.input_data()[0]
152152
)
153153
self.assertEqual(
154-
set(proc_ex.all_output_data(include_path=False)), {"lowered_duplicated_abc.dat"}
154+
set(proc_ex.all_output_data(include_path=False, io=io)), {"lowered_duplicated_abc.dat"}
155155
)
156156

157157
proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:LOWER (duplicated_xyz.dat)")
@@ -165,7 +165,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
165165
proc_ex.input_data()[0]
166166
)
167167
self.assertEqual(
168-
set(proc_ex.all_output_data(include_path=False)), {"lowered_duplicated_xyz.dat"}
168+
set(proc_ex.all_output_data(include_path=False, io=io)), {"lowered_duplicated_xyz.dat"}
169169
)
170170

171171
proc_ex = self.get_process_execution(execution, "PROCESS_DATA:APPEND (lowered_duplicated_abc.dat)")
@@ -175,7 +175,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
175175
self.assertEqual(proc_ex.process, "PROCESS_DATA:APPEND")
176176
self.assertEqual(set(proc_ex.input_data(include_path=False)), {"lowered_duplicated_abc.dat", "suffix.txt"})
177177
self.assertEqual(
178-
set(proc_ex.all_output_data(include_path=False)), {"suffix_lowered_duplicated_abc.dat"}
178+
set(proc_ex.all_output_data(include_path=False, io=io)), {"suffix_lowered_duplicated_abc.dat"}
179179
)
180180

181181
proc_ex = self.get_process_execution(execution, "PROCESS_DATA:APPEND (lowered_duplicated_xyz.dat)")
@@ -185,7 +185,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
185185
self.assertEqual(proc_ex.process, "PROCESS_DATA:APPEND")
186186
self.assertEqual(set(proc_ex.input_data(include_path=False)), {"lowered_duplicated_xyz.dat", "suffix.txt"})
187187
self.assertEqual(
188-
set(proc_ex.all_output_data(include_path=False)), {"suffix_lowered_duplicated_xyz.dat"}
188+
set(proc_ex.all_output_data(include_path=False, io=io)), {"suffix_lowered_duplicated_xyz.dat"}
189189
)
190190

191191
proc_ex = self.get_process_execution(execution, "JOIN:COMBINE_FILES")
@@ -198,7 +198,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
198198
{"suffix_lowered_duplicated_abc.dat", "suffix_lowered_duplicated_xyz.dat"}
199199
)
200200
self.assertEqual(
201-
set(proc_ex.all_output_data(include_path=False)), {"combined.txt"}
201+
set(proc_ex.all_output_data(include_path=False, io=io)), {"combined.txt"}
202202
)
203203

204204

tests/integration/test_run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ def glob(self, path):
291291
)
292292

293293
# Execution is fine
294-
self.check_execution(execution)
294+
self.check_execution(execution, io=io)
295295

296296
# The custom io functions were used
297297
with open(f"{self.rundirectory}/log.txt", "r") as f:

tests/unit/test_command.py

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ def test_can_get_first_execution(self, mock_update, mock_paths, mock_init, mock_
413413
self.assertEqual(size, 3)
414414
mock_text.assert_called_with(os.path.join("/log", ".nextflow.log"), io)
415415
mock_make.assert_called_with("LOG", "/ex", "nf run", None, io)
416-
mock_init.assert_called_with("LOG", mock_execution, io)
416+
mock_init.assert_called_with("LOG", mock_execution)
417417
mock_paths.assert_called_with(["cc/dd","gg/hh"], "/ex", io)
418418
self.assertEqual([c[0] for c in mock_update.call_args_list], [
419419
(process_executions["aa/bb"], "/ex", "UTC", io),
@@ -454,7 +454,7 @@ def test_can_get_subsequent_execution(self, mock_update, mock_paths, mock_init,
454454
self.assertEqual(size, 3)
455455
mock_text.assert_called_with(os.path.join("/log", ".nextflow.log"), io)
456456
mock_make.assert_called_with("LOG", "/ex", "nf run", mock_execution, io)
457-
mock_init.assert_called_with("LOG", mock_execution, io)
457+
mock_init.assert_called_with("LOG", mock_execution)
458458
mock_paths.assert_called_with(["cc/dd","gg/hh"], "/ex", io)
459459
self.assertEqual([c[0] for c in mock_update.call_args_list], [
460460
(process_executions["aa/bb"], "/ex", "UTC", io),
@@ -555,14 +555,13 @@ def test_can_create_first_pass(self, mock_update, mock_create):
555555
mock_create.side_effect = [p1, p2, None]
556556
mock_update.return_value = "cc/dd"
557557
log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Submitted process\nTask completed\nSubmitted process"
558-
io = Mock()
559-
process_executions, updated = get_initial_process_executions(log, execution, io)
558+
process_executions, updated = get_initial_process_executions(log, execution)
560559
self.assertEqual(process_executions, {"aa/bb": p1, "xx/yy": p2})
561560
self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"])
562561
self.assertEqual([c[0] for c in mock_create.call_args_list], [
563-
("Submitted process a/bb", False, io),
564-
("[cd/789012] Submitted process", False, io),
565-
("Submitted process", False, io),
562+
("Submitted process a/bb", False),
563+
("[cd/789012] Submitted process", False),
564+
("Submitted process", False),
566565
])
567566
mock_update.assert_called_with({"aa/bb": p1, "xx/yy": p2}, "Task completed")
568567

@@ -575,14 +574,13 @@ def test_can_create_first_pass_cached(self, mock_update, mock_create):
575574
mock_create.side_effect = [p1, p2, None]
576575
mock_update.return_value = "cc/dd"
577576
log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Cached process\nTask completed\nSubmitted process"
578-
io = Mock()
579-
process_executions, updated = get_initial_process_executions(log, execution, io)
577+
process_executions, updated = get_initial_process_executions(log, execution)
580578
self.assertEqual(process_executions, {"aa/bb": p1, "xx/yy": p2})
581579
self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"])
582580
self.assertEqual([c[0] for c in mock_create.call_args_list], [
583-
("Submitted process a/bb", False, io),
584-
("[cd/789012] Cached process", True, io),
585-
("Submitted process", False, io),
581+
("Submitted process a/bb", False),
582+
("[cd/789012] Cached process", True),
583+
("Submitted process", False),
586584
])
587585
mock_update.assert_called_with({"aa/bb": p1, "xx/yy": p2}, "Task completed")
588586

@@ -596,14 +594,13 @@ def test_can_update_existing(self, mock_update, mock_create):
596594
mock_create.side_effect = [p1, p2, None]
597595
mock_update.return_value = "cc/dd"
598596
log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Submitted process\nTask completed\nSubmitted process"
599-
io = Mock()
600-
process_executions, updated = get_initial_process_executions(log, execution, io)
597+
process_executions, updated = get_initial_process_executions(log, execution)
601598
self.assertEqual(process_executions, {"aa/bb": p1, "cc/dd": p3, "xx/yy": p2})
602599
self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"])
603600
self.assertEqual([c[0] for c in mock_create.call_args_list], [
604-
("Submitted process a/bb", False, io),
605-
("[cd/789012] Submitted process", False, io),
606-
("Submitted process", False, io),
601+
("Submitted process a/bb", False),
602+
("[cd/789012] Submitted process", False),
603+
("Submitted process", False),
607604
])
608605
mock_update.assert_called_with({"aa/bb": p1, "cc/dd": p3, "xx/yy": p2}, "Task completed")
609606

@@ -614,8 +611,7 @@ class CreateProcessExecutionFromLineTests(TestCase):
614611
@patch("nextflow.command.parse_submitted_line")
615612
def test_can_create_process_execution(self, mock_parse):
616613
mock_parse.return_value = ("aa/bb", "PROC (123)", "PROC", "NOW")
617-
io = Mock()
618-
proc_ex = create_process_execution_from_line("line1", io=io)
614+
proc_ex = create_process_execution_from_line("line1")
619615
self.assertEqual(proc_ex.identifier, "aa/bb")
620616
self.assertEqual(proc_ex.name, "PROC (123)")
621617
self.assertEqual(proc_ex.process, "PROC")
@@ -629,14 +625,12 @@ def test_can_create_process_execution(self, mock_parse):
629625
self.assertEqual(proc_ex.status, "-")
630626
self.assertEqual(proc_ex.path, "")
631627
self.assertFalse(proc_ex.cached)
632-
self.assertIs(proc_ex.io, io)
633-
628+
634629

635630
@patch("nextflow.command.parse_cached_line")
636631
def test_can_create_cached_process_execution(self, mock_parse):
637632
mock_parse.return_value = ("aa/bb", "PROC (123)", "PROC")
638-
io = Mock()
639-
proc_ex = create_process_execution_from_line("line1", cached=True, io=io)
633+
proc_ex = create_process_execution_from_line("line1", cached=True)
640634
self.assertEqual(proc_ex.identifier, "aa/bb")
641635
self.assertEqual(proc_ex.name, "PROC (123)")
642636
self.assertEqual(proc_ex.process, "PROC")
@@ -650,7 +644,6 @@ def test_can_create_cached_process_execution(self, mock_parse):
650644
self.assertEqual(proc_ex.status, "COMPLETED")
651645
self.assertEqual(proc_ex.path, "")
652646
self.assertTrue(proc_ex.cached)
653-
self.assertIs(proc_ex.io, io)
654647

655648

656649
@patch("nextflow.command.parse_submitted_line")

tests/unit/test_process_executions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def make_process_execution(self, **kwargs):
1111
"identifier": "12/3456", "name": "FASTQC (1)", "submitted": datetime(2021, 7, 4),
1212
"process": "FASTQC", "path": "12/34567890", "stdout": "good", "stderr": "bad",
1313
"return_code": "0", "bash": "$", "started": datetime(2021, 7, 5), "cached": False,
14-
"finished": datetime(2021, 7, 6), "status": "COMPLETED", "io": None, **kwargs
14+
"finished": datetime(2021, 7, 6), "status": "COMPLETED", **kwargs
1515
}
1616
return ProcessExecution(**kwargs)
1717

@@ -24,7 +24,7 @@ def test_can_make_process_execution(self):
2424
identifier="12/3456", name="FASTQC (1)", submitted=datetime(2021, 7, 4),
2525
process="FASTQC", path="12/34567890", stdout="good", stderr="bad",
2626
return_code="0", bash="$", started=datetime(2021, 7, 5), cached=True,
27-
finished=datetime(2021, 7, 6), status="COMPLETED", io=None
27+
finished=datetime(2021, 7, 6), status="COMPLETED"
2828
)
2929
self.assertEqual(process_execution.identifier, "12/3456")
3030
self.assertEqual(process_execution.name, "FASTQC (1)")
@@ -236,7 +236,7 @@ def test_can_use_custom_io(self, mock_path, mock_input):
236236
io = Mock()
237237
io.listdir.return_value = ["file1", "file2", ".command.run", ".exitcode", "file3"]
238238
self.assertEqual(
239-
self.make_process_execution(io=io).all_output_data(),
239+
self.make_process_execution().all_output_data(io=io),
240240
[str(Path("/loc/file1")), str(Path("/loc/file3"))]
241241
)
242242
mock_input.assert_called_with(include_path=False)

0 commit comments

Comments
 (0)