Skip to content

Commit ea0a894

Browse files
committed
feat: add additional transformers
Signed-off-by: vsoch <[email protected]>
1 parent 0d7fccf commit ea0a894

File tree

17 files changed

+1172
-76
lines changed

17 files changed

+1172
-76
lines changed

fractale/cli/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ def get_parser():
8585
default="kubernetes",
8686
)
8787
transform.add_argument(
88-
"-f", "--from", dest="from_transformer", help="transform from this jobspec", default="flux"
88+
"-f",
89+
"--from",
90+
dest="from_transformer",
91+
help="transform from this jobspec",
8992
)
9093
transform.add_argument(
9194
"--pretty",

fractale/cli/transform.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from rich import print
88
from rich.pretty import pprint
99

10-
from fractale.transformer import get_transformer
10+
from fractale.transformer import detect_transformer, get_transformer
1111

1212

1313
def main(args, extra, **kwargs):
@@ -18,6 +18,10 @@ def main(args, extra, **kwargs):
1818
if not os.path.exists(args.jobspec):
1919
sys.exit(f"JobSpec {args.jobspec} does not exist.")
2020

21+
# If no from transformer defined, try to detect
22+
if args.from_transformer is None:
23+
args.from_transformer = detect_transformer(args.jobspec)
24+
2125
# No selector or solver, just manual transform
2226
from_transformer = get_transformer(args.from_transformer)
2327
to_transformer = get_transformer(args.to_transformer)

fractale/transformer/__init__.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,47 @@
1+
import fractale.utils as utils
2+
3+
from .cobalt import Transformer as CobaltTransformer
14
from .flux import Transformer as FluxTransformer
25
from .kubernetes import Transformer as KubernetesTransformer
6+
from .lsf import Transformer as LSFTransformer
7+
from .oar import Transformer as OARTransformer
8+
from .pbs import Transformer as PBSTransformer
39
from .slurm import Transformer as SlurmTransformer
410

511
plugins = {
612
"kubernetes": KubernetesTransformer,
713
"flux": FluxTransformer,
814
"slurm": SlurmTransformer,
15+
"pbs": PBSTransformer,
16+
"lsf": LSFTransformer,
17+
"oar": OARTransformer,
18+
"cobalt": CobaltTransformer,
919
}
1020

1121

1222
def get_transformer(name, selector="random", solver=None):
1323
if name not in plugins:
1424
raise ValueError(f"{name} is not a valid transformer.")
1525
return plugins[name](selector, solver)
26+
27+
28+
def detect_transformer(jobspec):
29+
"""
30+
Quick and dirty detection.
31+
"""
32+
content = utils.read_file(jobspec)
33+
if "#FLUX" in content and "FLUX_CAPACITOR" not in content:
34+
return "flux"
35+
if "#SBATCH " in content:
36+
return "slurm"
37+
if "kind:" in content and "Job" in content:
38+
return "kubernetes"
39+
if "#PBS " in content:
40+
return "pbs"
41+
if "#BSUB" in content:
42+
return "lsf"
43+
if "#OAR" in content:
44+
return "oar"
45+
if "#COBALT" in content:
46+
return "cobalt"
47+
raise ValueError("Unkown transformer.")

fractale/transformer/base.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class TransformerBase:
1212
This can be very manual, or use an LLM.
1313
"""
1414

15-
def __init__(self, selector, solver):
15+
def __init__(self, selector="random", solver=None):
1616
"""
1717
Create a new transformer backend, accepting any options type.
1818
@@ -21,12 +21,18 @@ def __init__(self, selector, solver):
2121
self.selector = get_selector(selector)
2222
self.solver = solver
2323

24-
def parse(self, *args, **kwargs):
24+
def _parse(self, *args, **kwargs):
2525
"""
2626
Parse converts the native jobspec to the standard JobSpec
2727
"""
2828
raise NotImplementedError
2929

30+
def parse(self, filename):
31+
return self._parse(filename)
32+
33+
def unhandled(self, filename):
34+
return self._parse(filename, return_unhandled=True)
35+
3036
def convert(self, *args, **kwargs):
3137
"""
3238
Convert a normalized jobspec to the format here.
@@ -42,3 +48,46 @@ def render(self, matches, jobspec):
4248
"""
4349
js = utils.load_jobspec(jobspec)
4450
return self.run(matches, js)
51+
52+
53+
class Script:
54+
"""
55+
A helper class to build a batch script line by line.
56+
"""
57+
58+
def __init__(self, directive=""):
59+
self.script_lines = ["#!/bin/bash"]
60+
self.directive = directive
61+
62+
def newline(self):
63+
self.script_lines.append("")
64+
65+
def add_line(self, line: str):
66+
"""
67+
Add a custom line to the script.
68+
"""
69+
self.script_lines.append(line)
70+
71+
def add(self, name: str, value=None):
72+
"""
73+
Add a Flux directive, e.g., #FLUX: --job-name=my-job or #FLUX: -N 4.
74+
Handles both short and long options.
75+
"""
76+
if value is None:
77+
return
78+
79+
# Determine if it's a short (-n) or long (--tasks) option
80+
prefix = "-" if len(name) == 1 else "--"
81+
self.script_lines.append(f"{self.directive}: {prefix}{name}={value}")
82+
83+
def add_flag(self, name: str):
84+
"""
85+
Add a boolean flag (e.g., --exclusive).
86+
"""
87+
self.script_lines.append(f"{self.directive}: --{name}")
88+
89+
def render(self) -> str:
90+
"""
91+
Return the complete script as a single string.
92+
"""
93+
return "\n".join(self.script_lines)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .transform import CobaltTransformer as Transformer
2+
3+
assert Transformer
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
import re
2+
import shlex
3+
from datetime import datetime, timedelta
4+
5+
from fractale.transformer.base import TransformerBase, Script
6+
from fractale.transformer.common import JobSpec
7+
from fractale.logger.generate import JobNamer
8+
9+
10+
class CobaltScript(Script):
11+
"""
12+
A helper class for Cobalt. Unused as Cobalt uses command-line flags.
13+
"""
14+
def __init__(self):
15+
self.script_lines = ["#!/bin/bash"]
16+
self.directive = "" # No directive prefix
17+
18+
19+
20+
def priority_to_cobalt_priority(priority_str):
21+
"""
22+
Cobalt does not typically expose a direct user-facing priority flag.
23+
This is handled by queue policies. This function is a no-op.
24+
"""
25+
return None
26+
27+
28+
def cobalt_priority_to_priority(cobalt_priority):
29+
"""
30+
Cobalt does not have a parsable priority flag, so this always returns normal.
31+
"""
32+
return "normal"
33+
34+
35+
def seconds_to_cobalt_walltime(total_seconds):
36+
"""
37+
Converts integer seconds to Cobalt's HH:MM:SS walltime format.
38+
Cobalt -t flag also accepts minutes directly, but HH:MM:SS is more explicit.
39+
"""
40+
if not isinstance(total_seconds, int) or total_seconds <= 0:
41+
return None
42+
hours, remainder = divmod(total_seconds, 3600)
43+
minutes, secs = divmod(remainder, 60)
44+
return f"{int(hours):02d}:{int(minutes):02d}:{int(secs):02d}"
45+
46+
47+
def cobalt_walltime_to_seconds(time_str):
48+
"""
49+
Converts Cobalt HH:MM:SS walltime string back to integer seconds.
50+
"""
51+
if not time_str:
52+
return None
53+
try:
54+
# Can be HH:MM:SS or just minutes
55+
parts = time_str.split(":")
56+
if len(parts) == 3:
57+
h, m, s = map(int, parts)
58+
return int(timedelta(hours=h, minutes=m, seconds=s).total_seconds())
59+
elif len(parts) == 1:
60+
return int(parts[0]) * 60
61+
return None
62+
except (ValueError, IndexError):
63+
return None
64+
65+
66+
def epoch_to_cobalt_begin_time(epoch_seconds):
67+
"""
68+
Converts Unix epoch to Cobalt's begin time format for the '--at' flag.
69+
"""
70+
if not isinstance(epoch_seconds, int) or epoch_seconds <= 0:
71+
return None
72+
# A common supported format is YYYY-MM-DDTHH:MM:SS
73+
return datetime.fromtimestamp(epoch_seconds).strftime("%Y-%m-%dT%H:%M:%S")
74+
75+
76+
def cobalt_begin_time_to_epoch(time_str):
77+
"""
78+
Converts a Cobalt begin time string back to Unix epoch.
79+
"""
80+
if not time_str:
81+
return None
82+
try:
83+
return int(datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S").timestamp())
84+
except (ValueError, IndexError):
85+
return None
86+
87+
88+
def parse_cobalt_command(command_lines, spec):
89+
"""
90+
Parses a command line from within a Cobalt script body.
91+
"""
92+
if not command_lines:
93+
return []
94+
95+
main_command = command_lines[-1]
96+
parts = shlex.split(main_command)
97+
98+
# The common launcher on ALCF systems is 'aprun'
99+
if parts and parts[0] in ('aprun'):
100+
parts = parts[1:]
101+
102+
if parts and parts[0] in ("singularity", "apptainer") and parts[1] == "exec":
103+
spec.container_image = parts[2]
104+
parts = parts[3:]
105+
106+
return parts
107+
108+
109+
class CobaltTransformer(TransformerBase):
110+
"""
111+
Transforms a JobSpec to/from a Cobalt submission script.
112+
Note: Cobalt uses command-line flags to qsub, not in-script directives.
113+
This transformer generates a "runner" script that calls qsub.
114+
"""
115+
116+
def convert(self, spec):
117+
"""
118+
Converts a JobSpec into a self-submitting Cobalt script string.
119+
"""
120+
job_name = spec.job_name or JobNamer().generate()
121+
122+
# Build the qsub command line
123+
qsub_cmd = ["qsub"]
124+
qsub_cmd.extend(["-A", spec.account] if spec.account else [])
125+
qsub_cmd.extend(["-q", spec.queue] if spec.queue else [])
126+
qsub_cmd.extend(["-n", str(spec.num_nodes)])
127+
128+
# Cobalt uses --proccount for total MPI ranks
129+
if spec.num_tasks > 1:
130+
qsub_cmd.extend([f"--proccount", str(spec.num_tasks)])
131+
132+
wt = seconds_to_cobalt_walltime(spec.wall_time)
133+
if wt: qsub_cmd.extend(["-t", wt])
134+
135+
bt = epoch_to_cobalt_begin_time(spec.begin_time)
136+
if bt: qsub_cmd.extend(["--at", bt])
137+
138+
# -O sets the prefix for output/error files
139+
qsub_cmd.extend(["-O", job_name])
140+
141+
if spec.environment:
142+
for k,v in spec.environment.items():
143+
qsub_cmd.extend(["--env", f"{k}={v}"])
144+
145+
# Build the script that will be executed on the compute node
146+
exec_script_parts = ["#!/bin/bash", ""]
147+
148+
# The common launcher for Cobalt is aprun
149+
aprun_cmd = ["aprun"]
150+
151+
# Match aprun geometry to qsub submission
152+
aprun_cmd.extend(["-n", str(spec.num_tasks)])
153+
aprun_cmd.extend(["-N", str(spec.cpus_per_task)])
154+
155+
if spec.container_image:
156+
aprun_cmd.extend(["singularity", "exec", spec.container_image])
157+
if spec.executable:
158+
aprun_cmd.append(spec.executable)
159+
if spec.arguments:
160+
aprun_cmd.extend(spec.arguments)
161+
162+
exec_script_parts.append(" ".join(aprun_cmd))
163+
exec_script = "\n".join(exec_script_parts)
164+
165+
# Combine into a self-submitting script using a "here document"
166+
runner_script = [
167+
"#!/bin/bash",
168+
" ".join(qsub_cmd) + " << EOF",
169+
exec_script,
170+
"EOF"
171+
]
172+
return "\n".join(runner_script)
173+
174+
def _parse(self, content, return_unhandled=False):
175+
"""
176+
Parses a self-submitting Cobalt script into a JobSpec.
177+
"""
178+
spec = JobSpec()
179+
not_handled = set()
180+
181+
# Find the qsub line and the script body
182+
qsub_line = ""
183+
script_body = []
184+
in_script_body = False
185+
186+
qsub_re = re.compile(r'qsub\s+(.+?)<<\s*EOF')
187+
188+
for line in content.splitlines():
189+
m = qsub_re.search(line)
190+
if m:
191+
qsub_line = m.group(1)
192+
in_script_body = True
193+
continue
194+
195+
if in_script_body and line.strip() != "EOF":
196+
script_body.append(line)
197+
198+
# Parse the qsub command line flags
199+
if qsub_line:
200+
args = shlex.split(qsub_line)
201+
i = 0
202+
while i < len(args):
203+
arg = args[i]
204+
val = args[i+1] if i + 1 < len(args) else ""
205+
206+
if arg == '-A': spec.account = val; i+=2
207+
elif arg == '-q': spec.queue = val; i+=2
208+
elif arg == '-n': spec.num_nodes = int(val); i+=2
209+
elif arg == '-t': spec.wall_time = cobalt_walltime_to_seconds(val); i+=2
210+
elif arg == '--proccount': spec.num_tasks = int(val); i+=2
211+
elif arg == '-O': spec.job_name = val; i+=2
212+
elif arg == '--at': spec.begin_time = cobalt_begin_time_to_epoch(val); i+=2
213+
else: not_handled.add(arg); i+=1
214+
215+
# We again assume a block of text here.
216+
spec.script = script_body
217+
218+
# Parse the execution command from the script body
219+
parts = parse_cobalt_command(spec.script, spec)
220+
if parts:
221+
# Need to parse aprun args to get cpus_per_task
222+
temp_args = parts.copy()
223+
if '-N' in temp_args:
224+
idx = temp_args.index('-N')
225+
spec.cpus_per_task = int(temp_args[idx+1])
226+
temp_args.pop(idx); temp_args.pop(idx)
227+
228+
spec.executable = temp_args[0]
229+
spec.arguments = temp_args[1:]
230+
231+
if return_unhandled:
232+
return not_handled
233+
return spec

0 commit comments

Comments
 (0)