Skip to content

Commit 656d5a1

Browse files
committed
Don't create workflow outputs to recover input parameter outputs
Just eval the code that determines the outputs of the step again. Avoids modifying the workflow as it executes, which is bad.
1 parent 3118cd2 commit 656d5a1

File tree

2 files changed

+31
-23
lines changed

2 files changed

+31
-23
lines changed

lib/galaxy/workflow/modules.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,21 +1620,13 @@ def get_all_outputs(self, data_only=False):
16201620
]
16211621

16221622
def execute(
1623-
self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False
1623+
self,
1624+
trans,
1625+
progress: "WorkflowProgress",
1626+
invocation_step: "WorkflowInvocationStep",
1627+
use_cached_job: bool = False,
16241628
) -> Optional[bool]:
1625-
step = invocation_step.workflow_step
1626-
if step.id in progress.inputs_by_step_id:
1627-
input_value = progress.inputs_by_step_id[step.id]
1628-
else:
1629-
input_value = step.state.inputs["input"]
1630-
if input_value is NO_REPLACEMENT:
1631-
default_value = step.get_input_default_value(NO_REPLACEMENT)
1632-
# TODO: look at parameter type and infer if value should be a dictionary
1633-
# instead. Guessing only field parameter types in CWL branch would have
1634-
# default as dictionary like this.
1635-
if not isinstance(default_value, dict):
1636-
default_value = {"value": default_value}
1637-
input_value = default_value.get("value", NO_REPLACEMENT)
1629+
input_value = self.get_input_value(progress, invocation_step)
16381630
input_param = self.get_runtime_inputs(self)["input"]
16391631
# TODO: raise DelayedWorkflowEvaluation if replacement not ready ? Need test
16401632
try:
@@ -1654,13 +1646,37 @@ def execute(
16541646
except ValueError as e:
16551647
raise FailWorkflowEvaluation(
16561648
why=InvocationFailureWorkflowParameterInvalid(
1657-
reason=FailureReason.workflow_parameter_invalid, workflow_step_id=step.id, details=str(e)
1649+
reason=FailureReason.workflow_parameter_invalid,
1650+
workflow_step_id=invocation_step.workflow_step_id,
1651+
details=str(e),
16581652
)
16591653
)
16601654
step_outputs = dict(output=input_value)
16611655
progress.set_outputs_for_input(invocation_step, step_outputs)
16621656
return None
16631657

1658+
def get_input_value(self, progress: "WorkflowProgress", invocation_step: "WorkflowInvocationStep"):
1659+
step = invocation_step.workflow_step
1660+
if step.id in progress.inputs_by_step_id:
1661+
input_value = progress.inputs_by_step_id[step.id]
1662+
else:
1663+
assert step.state
1664+
input_value = step.state.inputs["input"]
1665+
if input_value is NO_REPLACEMENT:
1666+
default_value = step.get_input_default_value(NO_REPLACEMENT)
1667+
# TODO: look at parameter type and infer if value should be a dictionary
1668+
# instead. Guessing only field parameter types in CWL branch would have
1669+
# default as dictionary like this.
1670+
if not isinstance(default_value, dict):
1671+
default_value = {"value": default_value}
1672+
input_value = default_value.get("value", NO_REPLACEMENT)
1673+
return input_value
1674+
1675+
def recover_mapping(self, invocation_step: "WorkflowInvocationStep", progress: "WorkflowProgress"):
1676+
input_value = self.get_input_value(progress, invocation_step)
1677+
step_outputs = dict(output=input_value)
1678+
progress.set_outputs_for_input(invocation_step, step_outputs, already_persisted=True)
1679+
16641680
def step_state_to_tool_state(self, state):
16651681
state = safe_loads(state)
16661682
default_set, default_value = False, None

lib/galaxy/workflow/run.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -611,17 +611,9 @@ def set_step_outputs(
611611
outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value
612612
self.outputs[step.id] = outputs
613613
if not already_persisted:
614-
workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs}
615614
for output_name, output_object in outputs.items():
616615
if hasattr(output_object, "history_content_type"):
617616
invocation_step.add_output(output_name, output_object)
618-
else:
619-
# Add this non-data, non workflow-output output to the workflow outputs.
620-
# This is required for recovering the output in the next scheduling iteration,
621-
# and should be replaced with a WorkflowInvocationStepOutputValue ASAP.
622-
if not workflow_outputs_by_name.get(output_name) and output_object is not NO_REPLACEMENT:
623-
workflow_output = model.WorkflowOutput(step, output_name=output_name)
624-
step.workflow_outputs.append(workflow_output)
625617
for workflow_output in step.workflow_outputs:
626618
assert workflow_output.output_name
627619
output_name = workflow_output.output_name

0 commit comments

Comments
 (0)