diff --git a/tidy3d/components/workflow.py b/tidy3d/components/workflow.py new file mode 100644 index 0000000000..f622c00b2b --- /dev/null +++ b/tidy3d/components/workflow.py @@ -0,0 +1,414 @@ +"""Workflow definition classes for multi-step simulations. + +This module provides the core abstractions for defining execution workflows: +- `Step`: A single unit of work with typed inputs and outputs +- `StepInput`/`StepOutput`: Schema classes for step I/O +- `Workflow`: A DAG of steps that can be executed together +- `HeatChargeWorkflow`: Pre-defined workflow for HeatCharge simulations +""" + +from __future__ import annotations + +from typing import Optional, Union + +import pydantic.v1 as pd + +from tidy3d.components.base import Tidy3dBaseModel +from tidy3d.components.tcad.mesher import VolumeMesher +from tidy3d.components.tcad.simulation.heat_charge import HeatChargeSimulation +from tidy3d.components.types.workflow import WorkflowType + +# ============================================================================= +# Step Input/Output Schema Classes +# ============================================================================= + + +class StepInput(Tidy3dBaseModel): + """Base class for step inputs. + + Defines what data a step requires from a previous step in the workflow. + Subclasses define the specific type of data (mesh, field data, etc.). + Type discrimination is handled automatically via Tidy3dBaseModel's `type` field. + """ + + source_step: str = pd.Field( + ..., + title="Source Step", + description="Name of the step that produces this input.", + ) + + required: bool = pd.Field( + True, + title="Required", + description="Whether this input is required for the step to run.", + ) + + +class MeshInput(StepInput): + """Input: mesh data from a meshing step.""" + + +class FieldDataInput(StepInput): + """Input: field data from a solver step.""" + + +class HeatSourceInput(StepInput): + """Input: heat source distribution for thermal solver.""" + + +class StepOutput(Tidy3dBaseModel): + """Base class for step outputs. + + Defines what data a step produces for subsequent steps (pipeline data). + This is separate from user-requested monitor data. + Type discrimination is handled automatically via Tidy3dBaseModel's `type` field. + """ + + +class MeshOutput(StepOutput): + """Output: mesh data for subsequent steps.""" + + +class SimulationDataOutput(StepOutput): + """Output: simulation results.""" + + # Note: We store the class name as a string to avoid circular imports + # and serialization issues with type objects + data_type_name: str = pd.Field( + ..., + title="Data Type Name", + description="Name of the expected data type (e.g., 'HeatChargeSimulationData').", + ) + + +class FieldDataOutput(StepOutput): + """Output: field data for subsequent steps.""" + + +class HeatSourceOutput(StepOutput): + """Output: heat source data for thermal solver.""" + + +# Union of all input/output types for validation +StepInputType = Union[MeshInput, FieldDataInput, HeatSourceInput] +StepOutputType = Union[MeshOutput, SimulationDataOutput, FieldDataOutput, HeatSourceOutput] + +# Mapping from input class to compatible output class(es) +INPUT_OUTPUT_COMPATIBILITY: dict[type[StepInput], tuple[type[StepOutput], ...]] = { + MeshInput: (MeshOutput,), + FieldDataInput: (FieldDataOutput, SimulationDataOutput), + HeatSourceInput: (HeatSourceOutput,), +} + + +# ============================================================================= +# Step Class +# ============================================================================= + + +class Step(Tidy3dBaseModel): + """A single step in a workflow. + + Each step represents a unit of work (simulation, meshing, conversion) + with explicit typed inputs and outputs. + """ + + name: str = pd.Field( + ..., + title="Name", + description="Unique name for this step within the workflow.", + ) + + simulation: WorkflowType = pd.Field( + ..., + title="Simulation", + description="The simulation or operation to execute in this step.", + discriminator="type", + ) + + allow_async: bool = pd.Field( + True, + title="Allow Async", + description=( + "Whether this step may run in parallel with other async-allowed steps. " + "Data dependencies automatically enforce ordering regardless of this flag." + ), + ) + + inputs: tuple[StepInputType, ...] = pd.Field( + (), + title="Inputs", + description="List of inputs required from previous steps.", + ) + + outputs: tuple[StepOutputType, ...] = pd.Field( + (), + title="Outputs", + description="List of outputs produced by this step for subsequent steps.", + ) + + @property + def input_types(self) -> set[type[StepInput]]: + """Return set of input types.""" + return {type(inp) for inp in self.inputs} + + @property + def output_types(self) -> set[type[StepOutput]]: + """Return set of output types.""" + return {type(out) for out in self.outputs} + + @property + def source_steps(self) -> set[str]: + """Return set of step names this step depends on.""" + return {inp.source_step for inp in self.inputs} + + +# ============================================================================= +# Workflow Class +# ============================================================================= + + +class Workflow(Tidy3dBaseModel): + """A complete workflow definition as a DAG of steps. + + The workflow is validated at construction time to ensure: + - No duplicate step names + - All input references point to earlier steps + - Input types match output types from source steps + + This class is serializable to JSON for persistence and debugging. + """ + + steps: tuple[Step, ...] = pd.Field( + ..., + title="Steps", + description="Ordered list of steps in the workflow.", + ) + + @pd.validator("steps") + def _validate_dag(cls, steps: tuple[Step, ...]) -> tuple[Step, ...]: + """Validate the workflow DAG structure.""" + if not steps: + raise ValueError("Workflow must have at least one step") + + step_names: set[str] = set() + outputs_by_step: dict[str, set[type[StepOutput]]] = {} + + for step in steps: + # Check for duplicate names + if step.name in step_names: + raise ValueError(f"Duplicate step name: '{step.name}'") + step_names.add(step.name) + + # Validate inputs reference existing earlier steps + for inp in step.inputs: + if inp.source_step not in step_names: + if inp.source_step in {s.name for s in steps}: + raise ValueError( + f"Step '{step.name}' depends on '{inp.source_step}' " + "which comes later in the workflow" + ) + else: + raise ValueError( + f"Step '{step.name}' references unknown step '{inp.source_step}'" + ) + + # Check that source step produces a compatible output type + source_outputs = outputs_by_step.get(inp.source_step, set()) + input_type = type(inp) + compatible_outputs = INPUT_OUTPUT_COMPATIBILITY.get(input_type, ()) + if not any(out_type in source_outputs for out_type in compatible_outputs): + source_output_names = {t.__name__ for t in source_outputs} or {"nothing"} + raise ValueError( + f"Step '{step.name}' expects {input_type.__name__} from " + f"'{inp.source_step}', but that step produces: {source_output_names}" + ) + + # Record this step's outputs + outputs_by_step[step.name] = step.output_types + + return steps + + @property + def num_steps(self) -> int: + """Return the number of steps in this workflow.""" + return len(self.steps) + + @property + def is_single_step(self) -> bool: + """Return True if this is a single-step workflow.""" + return self.num_steps == 1 + + @property + def step_names(self) -> list[str]: + """Return list of step names in order.""" + return [step.name for step in self.steps] + + def get_step(self, name: str) -> Optional[Step]: + """Get a step by name.""" + for step in self.steps: + if step.name == name: + return step + return None + + def get_step_index(self, name: str) -> Optional[int]: + """Get the index of a step by name.""" + for i, step in enumerate(self.steps): + if step.name == name: + return i + return None + + def execution_order(self) -> list[list[str]]: + """Return steps grouped by execution phase. + + Steps that can run in parallel (no dependencies between them) + are grouped together. This is for Phase 2+ parallel execution. + + Returns + ------- + list[list[str]] + List of phases, each containing step names that can run in parallel. + """ + # For now, return sequential execution (one step per phase) + # Phase 2 will implement proper parallel grouping based on dependencies + return [[step.name] for step in self.steps] + + +# ============================================================================= +# Predefined Workflow Classes +# ============================================================================= + + +class HeatChargeWorkflow(Workflow): + """Workflow for HeatCharge simulations: mesh → solve. + + This is the standard workflow for HeatChargeSimulation, consisting of: + 1. A meshing step using VolumeMesher + 2. A solve step using the HeatChargeSimulation + """ + + @pd.validator("steps") + def _validate_heat_charge_structure(cls, steps: tuple[Step, ...]) -> tuple[Step, ...]: + """Validate HeatCharge-specific workflow structure.""" + # Must have exactly 2 steps + if len(steps) != 2: + raise ValueError(f"HeatChargeWorkflow must have exactly 2 steps, got {len(steps)}") + + mesh_step, solve_step = steps + + # Validate step names + if mesh_step.name != "mesh": + raise ValueError(f"First step must be named 'mesh', got '{mesh_step.name}'") + if solve_step.name != "solve": + raise ValueError(f"Second step must be named 'solve', got '{solve_step.name}'") + + # Validate simulation types + if not isinstance(mesh_step.simulation, VolumeMesher): + raise ValueError( + f"First step must contain a VolumeMesher, got {type(mesh_step.simulation).__name__}" + ) + if not isinstance(solve_step.simulation, HeatChargeSimulation): + raise ValueError( + f"Second step must contain a HeatChargeSimulation, " + f"got {type(solve_step.simulation).__name__}" + ) + + # Validate the mesher references the same simulation as the solve step + if mesh_step.simulation.simulation is not solve_step.simulation: + raise ValueError( + "The VolumeMesher must reference the same HeatChargeSimulation as the solve step" + ) + + return steps + + @classmethod + def from_simulation(cls, simulation: HeatChargeSimulation) -> HeatChargeWorkflow: + """Construct workflow from a HeatChargeSimulation. + + Parameters + ---------- + simulation : HeatChargeSimulation + The simulation to create a workflow for. + + Returns + ------- + HeatChargeWorkflow + A workflow with mesh and solve steps. + """ + return cls( + steps=( + Step( + name="mesh", + simulation=VolumeMesher(simulation=simulation), + outputs=(MeshOutput(),), + ), + Step( + name="solve", + simulation=simulation, + inputs=(MeshInput(source_step="mesh"),), + outputs=(SimulationDataOutput(data_type_name="HeatChargeSimulationData"),), + ), + ) + ) + + +# ============================================================================= +# Workflow Registry +# ============================================================================= + +# Maps simulation types to their workflow classes +# This allows automatic workflow construction for known multi-step simulations +SIMULATION_TO_WORKFLOW: dict[type, type[Workflow]] = {} + + +def _register_workflows() -> None: + """Register known simulation types with their workflows. + + This is called lazily to avoid import issues. + """ + global SIMULATION_TO_WORKFLOW + + if SIMULATION_TO_WORKFLOW: + return # Already registered + + SIMULATION_TO_WORKFLOW = { + HeatChargeSimulation: HeatChargeWorkflow, + } + + +def get_workflow_for_simulation(simulation: WorkflowType) -> Optional[Workflow]: + """Get the appropriate workflow for a simulation, if it requires one. + + Parameters + ---------- + simulation : WorkflowType + The simulation to check. + + Returns + ------- + Optional[Workflow] + A workflow for the simulation, or None if it's a single-step simulation. + """ + _register_workflows() + + workflow_cls = SIMULATION_TO_WORKFLOW.get(type(simulation)) + if workflow_cls: + return workflow_cls.from_simulation(simulation) + return None + + +def is_multi_step_simulation(simulation: WorkflowType) -> bool: + """Check if a simulation requires a multi-step workflow. + + Parameters + ---------- + simulation : WorkflowType + The simulation to check. + + Returns + ------- + bool + True if the simulation requires multiple steps. + """ + _register_workflows() + return type(simulation) in SIMULATION_TO_WORKFLOW diff --git a/tidy3d/web/__init__.py b/tidy3d/web/__init__.py index dcdf44c9c3..73dffb39ca 100644 --- a/tidy3d/web/__init__.py +++ b/tidy3d/web/__init__.py @@ -36,6 +36,7 @@ # run, # NOTE: use autograd one now (see below) upload, ) +from .api.workflow import WebWorkflow from .cli import tidy3d_cli from .cli.app import configure_fn as configure @@ -43,6 +44,7 @@ "Batch", "BatchData", "Job", + "WebWorkflow", "abort", "account", "configure", diff --git a/tidy3d/web/api/container.py b/tidy3d/web/api/container.py index 93e80438c4..49e3b487fa 100644 --- a/tidy3d/web/api/container.py +++ b/tidy3d/web/api/container.py @@ -555,6 +555,22 @@ def set_task_name_if_none(cls, values: dict[str, Any]) -> dict[str, Any]: values["task_name"] = stub.get_default_task_name() return values + @pd.validator("simulation", always=True) + def _validate_single_step_only(cls, simulation: WorkflowType) -> WorkflowType: + """Ensure the simulation is a single-step workflow. + + Multi-step workflows (like HeatChargeSimulation) should use WebWorkflow instead. + """ + from tidy3d.components.workflow import is_multi_step_simulation + + if is_multi_step_simulation(simulation): + raise ValueError( + "'Job' does not support multi-step workflows. " + "Use 'WebWorkflow' from 'tidy3d.web' for multi-step workflows, " + "or use 'web.run()' which handles multi-step workflows automatically." + ) + return simulation + class BatchData(Tidy3dBaseModel, Mapping): """ diff --git a/tidy3d/web/api/states.py b/tidy3d/web/api/states.py index 167ae9ce5d..827d53ff3f 100644 --- a/tidy3d/web/api/states.py +++ b/tidy3d/web/api/states.py @@ -42,7 +42,7 @@ END_STATES = ERROR_STATES | COMPLETED_STATES -POST_VALIDATE_STATES = {"validate_success", "validate_warn"} +POST_VALIDATE_STATES = {"validate_success", "validate_warn", "warning"} RUNNING_STATES = ( PRE_VALIDATE_STATES | POST_VALIDATE_STATES | {"running"} | POST_RUN_STATES | COMPLETED_STATES diff --git a/tidy3d/web/api/webapi.py b/tidy3d/web/api/webapi.py index 9134fff385..e4e17adcd0 100644 --- a/tidy3d/web/api/webapi.py +++ b/tidy3d/web/api/webapi.py @@ -377,6 +377,28 @@ def run( :meth:`tidy3d.web.api.container.Batch.monitor` Monitor progress of each of the running tasks. """ + # Check if this is a multi-step workflow using the registry + from tidy3d.components.workflow import is_multi_step_simulation + + if is_multi_step_simulation(simulation): + # Delegate to WebWorkflow for multi-step workflows + from tidy3d.web.api.workflow import WebWorkflow + + workflow_executor = WebWorkflow( + simulation=simulation, + task_name=task_name, + folder_name=folder_name, + callback_url=callback_url, + solver_version=solver_version, + verbose=verbose, + pay_type=pay_type if isinstance(pay_type, PayType) else PayType(pay_type), + reduce_simulation=reduce_simulation if isinstance(reduce_simulation, bool) else False, + simulation_type=simulation_type, + worker_group=worker_group, + lazy=lazy, + ) + return workflow_executor.run(path=path, priority=priority) + restored_path, _ = restore_simulation_if_cached( simulation=simulation, path=path, diff --git a/tidy3d/web/api/workflow.py b/tidy3d/web/api/workflow.py new file mode 100644 index 0000000000..279c020e73 --- /dev/null +++ b/tidy3d/web/api/workflow.py @@ -0,0 +1,536 @@ +"""Higher level wrapper for workflows that require multiple sequential server tasks.""" + +from __future__ import annotations + +from os import PathLike +from pathlib import Path +from typing import Any, Optional + +import pydantic.v1 as pd +from pydantic.v1 import PrivateAttr + +from tidy3d.components.base import Tidy3dBaseModel +from tidy3d.components.types.workflow import WorkflowDataType, WorkflowType +from tidy3d.components.workflow import Workflow +from tidy3d.log import log +from tidy3d.web.api import webapi as web +from tidy3d.web.api.container import DEFAULT_DATA_PATH +from tidy3d.web.core.constants import TaskId +from tidy3d.web.core.types import PayType + + +class StepInfo(pd.BaseModel): + """Information about a single workflow step.""" + + name: str + task_id: Optional[TaskId] = None + status: str = "pending" + + class Config: + """Pydantic config.""" + + # Allow mutation of fields + allow_mutation = True + + +class WebWorkflowState(pd.BaseModel): + """Serializable state of a WebWorkflow for saving/loading.""" + + task_name: Optional[str] = None + folder_name: str = "default" + solver_version: Optional[str] = None + simulation_type: str = "tidy3d" + current_step_index: int = 0 + steps: list[StepInfo] = pd.Field(default_factory=list) + + +class WebWorkflow(Tidy3dBaseModel): + """ + Execute a workflow on the server. + + This is the v2 interface for running multi-step workflows. It takes a + `Workflow` object that defines the execution graph and handles the + execution of each step. + + For backward compatibility, you can also pass a simulation directly + and the appropriate workflow will be constructed automatically. + + Examples + -------- + >>> from tidy3d.web import WebWorkflow + >>> from tidy3d.components.workflow import HeatChargeWorkflow + >>> + >>> # Explicit workflow construction + >>> workflow = HeatChargeWorkflow.from_simulation(heat_charge_sim) + >>> wf = WebWorkflow(workflow=workflow, task_name="my_heat_sim") + >>> data = wf.run(path="results.hdf5") + >>> + >>> # Or pass simulation directly (workflow auto-constructed) + >>> wf = WebWorkflow(simulation=heat_charge_sim, task_name="my_heat_sim") + >>> data = wf.run(path="results.hdf5") + """ + + workflow: Workflow = pd.Field( + None, + title="Workflow", + description="The workflow to execute. If not provided, constructed from simulation.", + ) + + simulation: WorkflowType = pd.Field( + None, + title="Simulation", + description="Simulation to run. Used to construct workflow if workflow not provided.", + discriminator="type", + ) + + task_name: str = pd.Field( + None, + title="Task Name", + description="Base name for the workflow tasks. Step names will be appended.", + ) + + folder_name: str = pd.Field( + "default", + title="Folder Name", + description="Name of folder to store tasks on web UI.", + ) + + callback_url: Optional[str] = pd.Field( + None, + title="Callback URL", + description="Http PUT url to receive simulation finish event.", + ) + + solver_version: Optional[str] = pd.Field( + None, + title="Solver Version", + description="Custom solver version to use.", + ) + + verbose: bool = pd.Field( + True, + title="Verbose", + description="Whether to print info messages and progressbars.", + ) + + pay_type: PayType = pd.Field( + PayType.AUTO, + title="Payment Type", + description="Specify the payment method.", + ) + + reduce_simulation: bool = pd.Field( + False, + title="Reduce Simulation", + description="Whether to reduce structures to the simulation domain.", + ) + + simulation_type: str = pd.Field( + "tidy3d", + title="Simulation Type", + description="Type of simulation, used internally only.", + ) + + worker_group: Optional[str] = pd.Field( + None, + title="Worker Group", + description="Worker group for the simulation.", + ) + + lazy: bool = pd.Field( + False, + title="Lazy", + description="Whether to load data lazily.", + ) + + _steps: list[StepInfo] = PrivateAttr(default_factory=list) + _current_step_index: int = PrivateAttr(default=0) + _resolved_workflow: Workflow = PrivateAttr(default=None) + + @pd.root_validator(pre=True) + def _resolve_workflow_or_simulation(cls, values: dict) -> dict: + """Ensure we have either a workflow or a simulation to construct one from.""" + workflow = values.get("workflow") + simulation = values.get("simulation") + + if workflow is None and simulation is None: + raise ValueError("Either 'workflow' or 'simulation' must be provided") + + return values + + def __init__(self, **data: Any) -> None: + super().__init__(**data) + self._initialize_workflow() + + def _initialize_workflow(self) -> None: + """Initialize the workflow and steps.""" + from tidy3d.components.workflow import get_workflow_for_simulation + + if self.workflow is not None: + self._resolved_workflow = self.workflow + elif self.simulation is not None: + # Try to get a workflow for this simulation type + workflow = get_workflow_for_simulation(self.simulation) + if workflow is not None: + self._resolved_workflow = workflow + else: + # Single-step simulation - create trivial workflow + from tidy3d.components.workflow import ( + SimulationDataOutput, + Step, + Workflow, + ) + + self._resolved_workflow = Workflow( + steps=( + Step( + name="solve", + simulation=self.simulation, + outputs=(SimulationDataOutput(data_type_name="SimulationData"),), + ), + ) + ) + + # Initialize step info from workflow + self._steps = [ + StepInfo(name=step.name, task_id=None, status="pending") + for step in self._resolved_workflow.steps + ] + + @property + def steps(self) -> list[StepInfo]: + """Return the list of workflow steps.""" + return self._steps + + @property + def num_steps(self) -> int: + """Return the number of steps in this workflow.""" + return len(self._steps) + + @property + def is_multi_step(self) -> bool: + """Return True if this workflow has more than one step.""" + return self.num_steps > 1 + + @property + def current_step(self) -> Optional[StepInfo]: + """Return the current step being executed.""" + if self._current_step_index < self.num_steps: + return self._steps[self._current_step_index] + return None + + def _get_step_task_name(self, step_name: str) -> str: + """Generate a task name for a specific step.""" + base_name = self.task_name or "workflow_task" + if self.num_steps == 1: + return base_name + return f"{base_name}_{step_name}" + + def _get_step_simulation(self, step_index: int) -> WorkflowType: + """Get the simulation object for a specific step.""" + return self._resolved_workflow.steps[step_index].simulation + + def _get_parent_task_ids(self, step_index: int) -> Optional[tuple[TaskId, ...]]: + """Get parent task IDs for a step based on its inputs.""" + step = self._resolved_workflow.steps[step_index] + if not step.inputs: + return None + + # Build parent task IDs from input source steps + parent_ids = [] + for inp in step.inputs: + source_idx = self._resolved_workflow.get_step_index(inp.source_step) + if source_idx is not None and self._steps[source_idx].task_id: + parent_ids.append(self._steps[source_idx].task_id) + + return tuple(parent_ids) if parent_ids else None + + def upload_step(self, step_index: int) -> TaskId: + """Upload a specific step to the server. + + Parameters + ---------- + step_index : int + Index of the step to upload. + + Returns + ------- + TaskId + The server task ID for this step. + """ + if step_index >= self.num_steps: + raise ValueError(f"Step index {step_index} out of range (max {self.num_steps - 1})") + + step = self._steps[step_index] + step_sim = self._get_step_simulation(step_index) + parent_tasks = self._get_parent_task_ids(step_index) + + task_id = web.upload( + simulation=step_sim, + task_name=self._get_step_task_name(step.name), + folder_name=self.folder_name, + callback_url=self.callback_url, + verbose=self.verbose, + simulation_type=self.simulation_type, + parent_tasks=list(parent_tasks) if parent_tasks else None, + solver_version=self.solver_version, + ) + + step.task_id = task_id + step.status = "uploaded" + + if self.verbose: + log.info(f"Step '{step.name}' uploaded with task_id: {task_id}") + + return task_id + + def start_step(self, step_index: int, priority: Optional[int] = None) -> None: + """Start a specific step on the server. + + Parameters + ---------- + step_index : int + Index of the step to start. + priority : int, optional + Priority in the queue (1-10). + """ + step = self._steps[step_index] + if not step.task_id: + raise ValueError(f"Step '{step.name}' has not been uploaded yet") + + web.start( + step.task_id, + solver_version=self.solver_version, + worker_group=self.worker_group, + pay_type=self.pay_type, + priority=priority, + ) + step.status = "running" + + if self.verbose: + log.info(f"Step '{step.name}' started") + + def monitor_step(self, step_index: int) -> None: + """Monitor progress of a specific step. + + Parameters + ---------- + step_index : int + Index of the step to monitor. + """ + step = self._steps[step_index] + if not step.task_id: + raise ValueError(f"Step '{step.name}' has not been uploaded yet") + + web.monitor(step.task_id, verbose=self.verbose) + step.status = "completed" + + def download_step(self, step_index: int, path: PathLike) -> None: + """Download results from a specific step. + + Parameters + ---------- + step_index : int + Index of the step to download. + path : PathLike + Path to save the results. + """ + step = self._steps[step_index] + if not step.task_id: + raise ValueError(f"Step '{step.name}' has not been uploaded yet") + + web.download(task_id=step.task_id, path=path, verbose=self.verbose) + + def load_step(self, step_index: int, path: PathLike) -> WorkflowDataType: + """Download and load results from a specific step. + + Parameters + ---------- + step_index : int + Index of the step to load. + path : PathLike + Path to save/load the results. + + Returns + ------- + WorkflowDataType + The loaded data for this step. + """ + step = self._steps[step_index] + if not step.task_id: + raise ValueError(f"Step '{step.name}' has not been uploaded yet") + + return web.load(task_id=step.task_id, path=path, verbose=self.verbose, lazy=self.lazy) + + def run_step( + self, + step_index: int, + path: PathLike, + priority: Optional[int] = None, + ) -> WorkflowDataType: + """Run a single step completely: upload, start, monitor, and load. + + Parameters + ---------- + step_index : int + Index of the step to run. + path : PathLike + Path to save the results. + priority : int, optional + Priority in the queue (1-10). + + Returns + ------- + WorkflowDataType + The loaded data for this step. + """ + self.upload_step(step_index) + self.start_step(step_index, priority=priority) + self.monitor_step(step_index) + return self.load_step(step_index, path=path) + + def run( + self, + path: PathLike = DEFAULT_DATA_PATH, + priority: Optional[int] = None, + ) -> WorkflowDataType: + """Run all workflow steps sequentially and return the final result. + + Parameters + ---------- + path : PathLike + Path to download final results file (.hdf5), including filename. + priority : int, optional + Priority in the queue (1-10). + + Returns + ------- + WorkflowDataType + Object containing the final simulation results. + """ + path = Path(path) + parent_dir = path.parent + if parent_dir != Path(".") and not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + + if self.verbose and self.num_steps > 1: + log.info( + f"Running workflow with {self.num_steps} steps: {[s.name for s in self._steps]}" + ) + + # Path for saving intermediate state + state_path = self._get_state_path(path) + + data = None + for i, step in enumerate(self._steps): + if self.verbose and self.num_steps > 1: + log.info(f"Running step {i + 1}/{self.num_steps}: '{step.name}'") + + # Use intermediate path for non-final steps + if i < self.num_steps - 1: + step_path = parent_dir / f"{path.stem}_{step.name}{path.suffix}" + else: + step_path = path + + data = self.run_step(i, path=step_path, priority=priority) + self._current_step_index = i + 1 + + # Save state after each step + self.save_state(state_path) + + return data + + def get_step_status(self, step_index: int) -> str: + """Get the status of a specific step.""" + if step_index >= self.num_steps: + raise ValueError(f"Step index {step_index} out of range") + + step = self._steps[step_index] + if step.task_id: + info = web.get_info(task_id=step.task_id) + return info.status + return step.status + + @property + def status(self) -> dict[str, str]: + """Return status of all steps.""" + return {step.name: self.get_step_status(i) for i, step in enumerate(self._steps)} + + def estimate_cost(self, verbose: bool = True) -> dict[str, float]: + """Estimate cost for all steps.""" + costs = {} + for i, step in enumerate(self._steps): + if step.task_id: + costs[step.name] = web.estimate_cost( + step.task_id, verbose=verbose, solver_version=self.solver_version + ) + else: + self.upload_step(i) + costs[step.name] = web.estimate_cost( + step.task_id, verbose=verbose, solver_version=self.solver_version + ) + return costs + + def real_cost(self, verbose: bool = True) -> dict[str, float]: + """Get actual billed cost for all completed steps.""" + costs = {} + for step in self._steps: + if step.task_id and step.status == "completed": + costs[step.name] = web.real_cost(step.task_id, verbose=verbose) + return costs + + def delete(self) -> None: + """Delete all server-side data associated with this workflow.""" + for step in self._steps: + if step.task_id: + try: + web.delete(step.task_id) + if self.verbose: + log.info(f"Deleted step '{step.name}' (task_id: {step.task_id})") + except Exception as e: + log.warning(f"Failed to delete step '{step.name}': {e}") + + def _get_state_path(self, data_path: PathLike) -> Path: + """Get the path for the state file based on data path.""" + data_path = Path(data_path) + return data_path.parent / f"{data_path.stem}_workflow_state.json" + + def save_state(self, path: PathLike) -> Path: + """Save the current state of the workflow to a JSON file.""" + path = Path(path) + state = WebWorkflowState( + task_name=self.task_name, + folder_name=self.folder_name, + solver_version=self.solver_version, + simulation_type=self.simulation_type, + current_step_index=self._current_step_index, + steps=self._steps, + ) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(state.json(indent=2)) + if self.verbose: + log.info(f"Saved WebWorkflow state to: {path}") + return path + + def load_state(self, path: PathLike) -> None: + """Load workflow state from a JSON file.""" + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"State file not found: {path}") + + state = WebWorkflowState.parse_file(path) + + self._current_step_index = state.current_step_index + for i, step_data in enumerate(state.steps): + if i < len(self._steps): + self._steps[i].task_id = step_data.task_id + self._steps[i].status = step_data.status + + if self.verbose: + log.info(f"Loaded WebWorkflow state from: {path}") + for step in self._steps: + log.info(f" Step '{step.name}': task_id={step.task_id}, status={step.status}") + + +# Resolve forward references for Workflow type +WebWorkflow.update_forward_refs() diff --git a/tidy3d/web/core/task_core.py b/tidy3d/web/core/task_core.py index f1e4ca8c7d..4a8a754a74 100644 --- a/tidy3d/web/core/task_core.py +++ b/tidy3d/web/core/task_core.py @@ -805,24 +805,38 @@ def validate_post_upload(self, parent_tasks: Optional[list[str]] = None) -> None "A single parent 'task_id' corresponding to the task in which the meshing " "was run must be provided." ) + parent_task_id = parent_tasks[0] try: # get mesh task info - mesh_task = SimulationTask.get(parent_tasks[0], verbose=False) - assert mesh_task.task_type == "VOLUME_MESH" - assert mesh_task.status == "success" + mesh_task = SimulationTask.get(parent_task_id, verbose=False) + if mesh_task.task_type != "VOLUME_MESH": + raise ValidationError( + f"Parent task '{parent_task_id}' has type '{mesh_task.task_type}', " + f"expected 'VOLUME_MESH'." + ) + if mesh_task.status != "success": + raise ValidationError( + f"Parent task '{parent_task_id}' has status '{mesh_task.status}', " + f"expected 'success'." + ) # get up-to-date task info task = SimulationTask.get(self.task_id, verbose=False) if task.fileMd5 != mesh_task.childFileMd5: raise ValidationError( - "Simulation stored in parent task 'VolumeMesher' does not match the " - "current simulation." + f"Simulation stored in parent task '{parent_task_id}' does not match " + "the current simulation." ) + except ValidationError: + raise except Exception as e: raise ValidationError( - "The parent task must be a 'VolumeMesher' task which has been successfully " - "run and is associated to the same 'HeatChargeSimulation' as provided here." + f"The parent task '{parent_task_id}' must be a 'VolumeMesher' task which " + "has been successfully run and is associated to the same " + "'HeatChargeSimulation' as provided here." ) from e + except WebError: + raise except Exception as e: raise WebError(f"Provided 'parent_tasks' failed validation: {e!s}") from e