Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cb94af5
Updates beaker-py version.
finbarrtimbers Sep 18, 2025
5c8795d
Fixed secrets ref.
finbarrtimbers Sep 18, 2025
169fe8a
Fixed whoami command
finbarrtimbers Sep 18, 2025
662661b
Fixed experimentspec error
finbarrtimbers Sep 18, 2025
ea655b9
Fixed constraints reference.
finbarrtimbers Sep 18, 2025
1c686af
Update mason.py for beaker-py v2 API changes
finbarrtimbers Sep 18, 2025
26c8c2a
Fix beaker.TaskSpec to beaker.BeakerTaskSpec for v2 API
finbarrtimbers Sep 18, 2025
0edd090
Fix remaining beaker v2 API changes
finbarrtimbers Sep 18, 2025
7f8e4db
Fix BeakerPriority to BeakerJobPriority for v2 API
finbarrtimbers Sep 18, 2025
5aee8fc
Fix BeakerJobPriority enum access to use string key
finbarrtimbers Sep 18, 2025
1e46770
Fix experiment ID access for beaker v2 API
finbarrtimbers Sep 18, 2025
73a4f3f
Fix beaker v2 API compatibility in utils.py and test_utils.py
finbarrtimbers Sep 18, 2025
c1a6768
Updated finetune script
finbarrtimbers Oct 1, 2025
7e4cfb1
Now, finetune_8b.sh uses uv.
finbarrtimbers Oct 1, 2025
50423b2
Updated finetune_8b.sh to work with build_image_and_launch.sh.
finbarrtimbers Oct 1, 2025
cdd2391
Updated code
finbarrtimbers Oct 1, 2025
abf1b45
added chat template to script.
finbarrtimbers Oct 1, 2025
64fa692
changed priority
finbarrtimbers Oct 1, 2025
f20e13e
updated priority
finbarrtimbers Oct 1, 2025
5bcda40
Updated scripts/train/debug/finetune.sh.
finbarrtimbers Oct 1, 2025
8bdd718
added non-resumable flag.
finbarrtimbers Oct 1, 2025
892f5a9
Set description for debug finetune script.
finbarrtimbers Oct 1, 2025
92add3e
Actually set chat template name.
finbarrtimbers Oct 1, 2025
1277e4e
Merge branch 'main' into update-beaker-py
finbarrtimbers Oct 1, 2025
8371060
Updated env var name
finbarrtimbers Oct 1, 2025
c291c04
updated typo
finbarrtimbers Oct 1, 2025
3f15ee7
Merge branch 'main' into update-beaker-py
finbarrtimbers Oct 7, 2025
252b7a5
Updated code
finbarrtimbers Oct 7, 2025
2ea42f9
Merge branch 'main' into update-beaker-py
finbarrtimbers Oct 7, 2025
5bef970
Updated error handling for interactive
finbarrtimbers Oct 10, 2025
fd565de
Updated script
finbarrtimbers Oct 10, 2025
faeacc3
Refactored finetune.sh
finbarrtimbers Oct 10, 2025
f45806b
Merge branch 'main' into update-beaker-py
finbarrtimbers Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 74 additions & 64 deletions mason.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def get_args():
)
parser.add_argument(
"--timeout",
type=str,
help="Timeout for the Beaker task (e.g., '2h', '30m', '1d'). If not specified, no timeout is set.",
type=int,
help="Timeout for the Beaker task in seconds (e.g., 7200 for 2 hours). If not specified, no timeout is set.",
default=None,
)
# Split up the mason args from the Python args.
Expand Down Expand Up @@ -232,15 +232,15 @@ def get_env_vars(
additional_env_var_names = {var["name"] for var in additional_env_vars}

env_vars = [
beaker.EnvVar(name=name, value=value)
beaker.BeakerEnvVar(name=name, value=value)
for name, value in DEFAULT_ENV_VARS.items()
if name not in additional_env_var_names
]

env_vars.extend([beaker.EnvVar(name=env_var["name"], value=env_var["value"]) for env_var in additional_env_vars])
env_vars.extend([beaker.BeakerEnvVar(name=env_var["name"], value=env_var["value"]) for env_var in additional_env_vars])

# add user-specific secrets
env_vars.extend([beaker.EnvVar(name=secret["name"], secret=secret["value"]) for secret in additional_secrets])
env_vars.extend([beaker.BeakerEnvVar(name=secret["name"], secret=secret["value"]) for secret in additional_secrets])

useful_secrets = [
"HF_TOKEN",
Expand All @@ -254,22 +254,24 @@ def get_env_vars(
]
for useful_secret in useful_secrets:
if f"{whoami}_{useful_secret}" in beaker_secrets:
env_vars.append(beaker.EnvVar(name=useful_secret, secret=f"{whoami}_{useful_secret}"))
env_vars.append(beaker.BeakerEnvVar(name=useful_secret, secret=f"{whoami}_{useful_secret}"))
elif useful_secret in beaker_secrets:
env_vars.append(beaker.EnvVar(name=useful_secret, secret=useful_secret))
env_vars.append(beaker.BeakerEnvVar(name=useful_secret, secret=useful_secret))

# use the user's PATH; including the conda / python PATH
if not pure_docker_mode:
env_vars.extend([beaker.EnvVar(name="PATH", value=os.getenv("PATH"))])
env_vars.extend([beaker.BeakerEnvVar(name="PATH", value=os.getenv("PATH"))])

# if all cluster is in weka, we mount the weka
if all(c in WEKA_CLUSTERS for c in cluster):
env_vars.extend(
[
beaker.EnvVar(name="HF_HOME", value="/weka/oe-adapt-default/allennlp/.cache/huggingface"),
beaker.EnvVar(name="HF_DATASETS_CACHE", value="/weka/oe-adapt-default/allennlp/.cache/huggingface"),
beaker.EnvVar(name="HF_HUB_CACHE", value="/weka/oe-adapt-default/allennlp/.cache/hub"),
beaker.EnvVar(
beaker.BeakerEnvVar(name="HF_HOME", value="/weka/oe-adapt-default/allennlp/.cache/huggingface"),
beaker.BeakerEnvVar(
name="HF_DATASETS_CACHE", value="/weka/oe-adapt-default/allennlp/.cache/huggingface"
),
beaker.BeakerEnvVar(name="HF_HUB_CACHE", value="/weka/oe-adapt-default/allennlp/.cache/hub"),
beaker.BeakerEnvVar(
name="CHECKPOINT_OUTPUT_DIR",
value=f"/weka/oe-adapt-default/allennlp/deletable_checkpoint_states/{global_wandb_id}",
),
Expand All @@ -278,19 +280,19 @@ def get_env_vars(
if num_nodes > 1:
env_vars.extend(
[
beaker.EnvVar(name="NCCL_SOCKET_IFNAME", value="ib"),
beaker.EnvVar(name="NCCL_IB_HCA", value="^=mlx5_bond_0"),
beaker.BeakerEnvVar(name="NCCL_SOCKET_IFNAME", value="ib"),
beaker.BeakerEnvVar(name="NCCL_IB_HCA", value="^=mlx5_bond_0"),
]
)
# if all cluster is in gcp we add the following env

elif all(c in GCP_CLUSTERS for c in cluster):
env_vars.extend(
[
beaker.EnvVar(name="HF_HOME", value="/filestore/.cache/huggingface"),
beaker.EnvVar(name="HF_DATASETS_CACHE", value="/filestore/.cache/huggingface"),
beaker.EnvVar(name="HF_HUB_CACHE", value="/filestore/.cache/hub"),
beaker.EnvVar(
beaker.BeakerEnvVar(name="HF_HOME", value="/filestore/.cache/huggingface"),
beaker.BeakerEnvVar(name="HF_DATASETS_CACHE", value="/filestore/.cache/huggingface"),
beaker.BeakerEnvVar(name="HF_HUB_CACHE", value="/filestore/.cache/hub"),
beaker.BeakerEnvVar(
name="HF_HUB_ENABLE_HF_TRANSFER",
value="0", # we disable it because GCP is weird on uploading to the hub
),
Expand All @@ -299,40 +301,40 @@ def get_env_vars(
if num_nodes > 1:
env_vars.extend(
[
beaker.EnvVar(name="LD_LIBRARY_PATH", value=r"/var/lib/tcpxo/lib64:${LD_LIBRARY_PATH}"),
beaker.EnvVar(name="NCCL_CROSS_NIC", value="0"),
beaker.EnvVar(name="NCCL_ALGO", value="Ring,Tree"),
beaker.EnvVar(name="NCCL_PROTO", value="Simple"),
beaker.EnvVar(name="NCCL_MIN_NCHANNELS", value="4"),
beaker.EnvVar(name="NCCL_P2P_NET_CHUNKSIZE", value="524288"),
beaker.EnvVar(name="NCCL_P2P_PCI_CHUNKSIZE", value="524288"),
beaker.EnvVar(name="NCCL_P2P_NVL_CHUNKSIZE", value="1048576"),
beaker.EnvVar(name="NCCL_FASTRAK_NUM_FLOWS", value="2"),
beaker.EnvVar(name="NCCL_FASTRAK_ENABLE_CONTROL_CHANNEL", value="0"),
beaker.EnvVar(name="NCCL_BUFFSIZE", value="8388608"),
beaker.EnvVar(name="NCCL_FASTRAK_USE_SNAP", value="1"),
beaker.EnvVar(name="CUDA_VISIBLE_DEVICES", value="0,1,2,3,4,5,6,7"),
beaker.EnvVar(name="NCCL_NET_GDR_LEVEL", value="PIX"),
beaker.EnvVar(name="NCCL_FASTRAK_ENABLE_HOTPATH_LOGGING", value="0"),
beaker.EnvVar(name="NCCL_TUNER_PLUGIN", value="libnccl-tuner.so"),
beaker.EnvVar(
beaker.BeakerEnvVar(name="LD_LIBRARY_PATH", value=r"/var/lib/tcpxo/lib64:${LD_LIBRARY_PATH}"),
beaker.BeakerEnvVar(name="NCCL_CROSS_NIC", value="0"),
beaker.BeakerEnvVar(name="NCCL_ALGO", value="Ring,Tree"),
beaker.BeakerEnvVar(name="NCCL_PROTO", value="Simple"),
beaker.BeakerEnvVar(name="NCCL_MIN_NCHANNELS", value="4"),
beaker.BeakerEnvVar(name="NCCL_P2P_NET_CHUNKSIZE", value="524288"),
beaker.BeakerEnvVar(name="NCCL_P2P_PCI_CHUNKSIZE", value="524288"),
beaker.BeakerEnvVar(name="NCCL_P2P_NVL_CHUNKSIZE", value="1048576"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_NUM_FLOWS", value="2"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_ENABLE_CONTROL_CHANNEL", value="0"),
beaker.BeakerEnvVar(name="NCCL_BUFFSIZE", value="8388608"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_USE_SNAP", value="1"),
beaker.BeakerEnvVar(name="CUDA_VISIBLE_DEVICES", value="0,1,2,3,4,5,6,7"),
beaker.BeakerEnvVar(name="NCCL_NET_GDR_LEVEL", value="PIX"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_ENABLE_HOTPATH_LOGGING", value="0"),
beaker.BeakerEnvVar(name="NCCL_TUNER_PLUGIN", value="libnccl-tuner.so"),
beaker.BeakerEnvVar(
name="NCCL_TUNER_CONFIG_PATH", value="/var/lib/tcpxo/lib64/a3plus_tuner_config.textproto"
),
beaker.EnvVar(
beaker.BeakerEnvVar(
name="NCCL_SHIMNET_GUEST_CONFIG_CHECKER_CONFIG_FILE",
value="/var/lib/tcpxo/lib64/a3plus_guest_config.textproto",
),
beaker.EnvVar(name="NCCL_FASTRAK_PLUGIN_ACCEPT_TIMEOUT_MS", value="600000"),
beaker.EnvVar(name="NCCL_NVLS_ENABLE", value="0"),
beaker.EnvVar(name="NCCL_FASTRAK_CTRL_DEV", value="enp0s12"),
beaker.EnvVar(
beaker.BeakerEnvVar(name="NCCL_FASTRAK_PLUGIN_ACCEPT_TIMEOUT_MS", value="600000"),
beaker.BeakerEnvVar(name="NCCL_NVLS_ENABLE", value="0"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_CTRL_DEV", value="enp0s12"),
beaker.BeakerEnvVar(
name="NCCL_FASTRAK_IFNAME",
value="enp6s0,enp7s0,enp13s0,enp14s0,enp134s0,enp135s0,enp141s0,enp142s0",
),
beaker.EnvVar(name="NCCL_SOCKET_IFNAME", value="enp0s12"),
beaker.EnvVar(name="NCCL_USE_SNAP", value="1"),
beaker.EnvVar(name="NCCL_FASTRAK_USE_LLCM", value="1"),
beaker.EnvVar(name="NCCL_FASTRAK_LLCM_DEVICE_DIRECTORY", value="/dev/aperture_devices"),
beaker.BeakerEnvVar(name="NCCL_SOCKET_IFNAME", value="enp0s12"),
beaker.BeakerEnvVar(name="NCCL_USE_SNAP", value="1"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_USE_LLCM", value="1"),
beaker.BeakerEnvVar(name="NCCL_FASTRAK_LLCM_DEVICE_DIRECTORY", value="/dev/aperture_devices"),
]
)
# don't mount anything; assume no cache
Expand All @@ -342,8 +344,8 @@ def get_env_vars(
if resumable:
env_vars.extend(
[
beaker.EnvVar(name="WANDB_RUN_ID", value=global_wandb_id),
beaker.EnvVar(name="WANDB_RESUME", value="allow"),
beaker.BeakerEnvVar(name="WANDB_RUN_ID", value=global_wandb_id),
beaker.BeakerEnvVar(name="WANDB_RESUME", value="allow"),
]
)

Expand All @@ -356,16 +358,22 @@ def get_datasets(beaker_datasets, cluster: List[str]):
# if all cluster is in weka, we mount the weka
if all(c in WEKA_CLUSTERS for c in cluster):
res = [
beaker.DataMount(source=beaker.DataSource(weka="oe-adapt-default"), mount_path="/weka/oe-adapt-default"),
beaker.DataMount(
source=beaker.DataSource(weka="oe-training-default"), mount_path="/weka/oe-training-default"
beaker.BeakerDataMount(
source=beaker.BeakerDataSource(weka="oe-adapt-default"), mount_path="/weka/oe-adapt-default"
),
beaker.BeakerDataMount(
source=beaker.BeakerDataSource(weka="oe-training-default"), mount_path="/weka/oe-training-default"
),
]
elif all(c in GCP_CLUSTERS for c in cluster):
res = [beaker.DataMount(source=beaker.DataSource(host_path="/mnt/filestore_1"), mount_path="/filestore")]
res = [
beaker.BeakerDataMount(
source=beaker.BeakerDataSource(host_path="/mnt/filestore_1"), mount_path="/filestore"
)
]
for beaker_dataset in beaker_datasets:
to_append = beaker.DataMount(
source=beaker.DataSource(beaker=beaker_dataset["beaker"]), mount_path=beaker_dataset["mount_path"]
to_append = beaker.BeakerDataMount(
source=beaker.BeakerDataSource(beaker=beaker_dataset["beaker"]), mount_path=beaker_dataset["mount_path"]
)
res.append(to_append)

Expand Down Expand Up @@ -716,17 +724,19 @@ def make_task_spec(args, full_command: str, i: int, beaker_secrets: str, whoami:
raise ValueError("GCP clusters do not have the dev filesystem, please use a proper image")

if args.hostname is not None:
constraints = beaker.Constraints(hostname=args.hostname)
constraints = beaker.BeakerConstraints(hostname=args.hostname)
else:
constraints = beaker.Constraints(cluster=args.cluster)
spec = beaker.TaskSpec(
constraints = beaker.BeakerConstraints(cluster=args.cluster)
spec = beaker.BeakerTaskSpec(
name=f"{args.task_name}__{i}",
image=beaker.ImageSource(beaker=args.image),
image=beaker.BeakerImageSource(beaker=args.image),
command=["/bin/bash", "-c"],
arguments=[full_command],
result=beaker.ResultSpec(path="/output"),
result=beaker.BeakerResultSpec(path="/output"),
datasets=get_datasets(args.beaker_datasets, args.cluster),
context=beaker.TaskContext(priority=beaker.Priority(args.priority), preemptible=args.preemptible),
context=beaker.BeakerTaskContext(
priority=beaker.BeakerJobPriority[args.priority], preemptible=args.preemptible
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Enum Lookup Fails on Case Mismatch

The change to beaker.BeakerJobPriority[args.priority] for task priority can cause a KeyError. This happens if the string value from args.priority (like "normal") doesn't exactly match the enum member name (e.g., "NORMAL"), especially regarding case sensitivity.

Fix in Cursor Fix in Web

),
constraints=constraints,
env_vars=get_env_vars(
args.pure_docker_mode,
Expand All @@ -738,7 +748,7 @@ def make_task_spec(args, full_command: str, i: int, beaker_secrets: str, whoami:
args.env,
args.secret,
),
resources=beaker.TaskResources(gpu_count=args.gpus, shared_memory=args.shared_memory),
resources=beaker.BeakerTaskResources(gpu_count=args.gpus, shared_memory=args.shared_memory),
replicas=args.num_nodes,
)
if args.num_nodes > 1:
Expand Down Expand Up @@ -769,8 +779,8 @@ def main():
beaker_client = beaker.Beaker.from_env(default_workspace=args.workspace)
else:
beaker_client = beaker.Beaker.from_env()
beaker_secrets = [secret.name for secret in beaker_client.workspace.secrets()]
whoami = beaker_client.account.whoami().name
beaker_secrets = [secret.name for secret in beaker_client.secret.list()]
whoami = beaker_client.user.get().name

full_commands = [make_internal_command(command, args, whoami, is_external_user) for command in commands]
if is_external_user:
Expand All @@ -789,17 +799,17 @@ def main():
console.print(Text(full_command))
if is_external_user:
return
experiment_spec = beaker.ExperimentSpec(
experiment_spec = beaker.BeakerExperimentSpec(
description=args.description,
tasks=[
make_task_spec(args, full_command, i, beaker_secrets, whoami, args.resumable)
for i, full_command in enumerate(full_commands)
],
budget=args.budget,
retry=beaker.RetrySpec(allowed_task_retries=args.max_retries),
retry=beaker.BeakerRetrySpec(allowed_task_retries=args.max_retries),
)
exp = beaker_client.experiment.create(spec=experiment_spec)
console.log(f"Kicked off Beaker job. https://beaker.org/ex/{exp.id}")
console.log(f"Kicked off Beaker job. https://beaker.org/ex/{exp.experiment.id}")


if __name__ == "__main__":
Expand Down
14 changes: 10 additions & 4 deletions open_instruct/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,22 @@ def _setup_beaker_mocks(mock_beaker_from_env, mock_is_beaker_job, initial_descri
mock_client = mock.MagicMock()
mock_beaker_from_env.return_value = mock_client

# Mock the workload object
mock_workload = mock.MagicMock()
mock_client.workload.get.return_value = mock_workload

# Mock the spec object returned by experiment.get_spec
mock_spec = mock.MagicMock()
mock_spec.description = initial_description
mock_client.experiment.get.return_value = mock_spec
mock_client.experiment.get_spec.return_value = mock_spec

description_history = []

def track_description(exp_id, desc):
description_history.append(desc)
def track_description(workload, description=None):
if description is not None:
description_history.append(description)

mock_client.experiment.set_description.side_effect = track_description
mock_client.workload.update.side_effect = track_description

return mock_client, mock_spec, description_history

Expand Down
12 changes: 8 additions & 4 deletions open_instruct/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,13 +978,16 @@ def maybe_update_beaker_description(

try:
client = beaker.Beaker.from_env()
except beaker.exceptions.ConfigurationError as e:
except beaker.exceptions.BeakerConfigurationError as e:
logger.warning(f"Failed to initialize Beaker client: {e}")
return

try:
spec = client.experiment.get(experiment_id)
except beaker.exceptions.ExperimentNotFound:
# Get the workload first (experiment_id is actually BEAKER_WORKLOAD_ID)
workload = client.workload.get(experiment_id)
# Then get the experiment spec from the workload
spec = client.experiment.get_spec(workload)
except (beaker.exceptions.BeakerExperimentNotFound, ValueError):
logger.warning(
f"Failed to get Beaker experiment with ID: {experiment_id}"
"This might be fine if you are e.g. running in an interactive job."
Expand Down Expand Up @@ -1025,7 +1028,8 @@ def maybe_update_beaker_description(
description_components.append(progress_bar)
new_description = " ".join(description_components)
try:
client.experiment.set_description(experiment_id, new_description)
# Update the workload description using the workload object we got earlier
client.workload.update(workload, description=new_description)
except requests.exceptions.HTTPError as e:
logger.warning(
f"Failed to update Beaker description due to HTTP error: {e}"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ link-mode = "hardlink"

[dependency-groups]
dev = [
"beaker-py>=1.32.2,<2.0",
"beaker-py>=2.5.0",
"mkdocs-material>=9.6.8",
"markdown-include>=0.8.1",
"pytest>=8.3.4",
Expand Down
31 changes: 28 additions & 3 deletions scripts/train/debug/finetune.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
uv run accelerate launch \
#!/bin/bash

LAUNCH_CMD="accelerate launch \
--mixed_precision bf16 \
--num_processes 1 \
open_instruct/finetune.py \
Expand All @@ -12,11 +14,34 @@ uv run accelerate launch \
--warmup_ratio 0.03 \
--weight_decay 0.0 \
--num_train_epochs 2 \
--output_dir output/ \
--report_to wandb \
--logging_steps 1 \
--model_revision main \
--dataset_mixer_list allenai/tulu-3-sft-personas-algebra 100 \
--add_bos \
--seed 123 \
# --with_tracking \
--chat_template_name tulu \
--with_tracking"

if [ -n "$1" ]; then
BEAKER_IMAGE="$1"
echo "Using Beaker image: $BEAKER_IMAGE"

uv run python mason.py \
--cluster ai2/jupiter \
--workspace ai2/open-instruct-dev \
--priority normal \
--image "$BEAKER_IMAGE" \
--description "Single GPU finetune job." \
--pure_docker_mode \
--preemptible \
--num_nodes 1 \
--budget ai2/oe-adapt \
--gpus 1 \
--non_resumable \
-- \
$LAUNCH_CMD
else
echo "Running locally..."
uv run $LAUNCH_CMD
fi
2 changes: 1 addition & 1 deletion scripts/train/debug/large_test_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ uv run python mason.py \
--preemptible \
--num_nodes 2 \
--description "Large (multi-node) test script." \
--timeout "1h" \
--timeout 3600 \
--max_retries 0 \
--env VLLM_ALLOW_LONG_MAX_MODEL_LEN=1 \
--budget ai2/oe-adapt \
Expand Down
2 changes: 1 addition & 1 deletion scripts/train/debug/single_gpu_on_beaker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ uv run python mason.py \
--priority urgent \
--num_nodes 1 \
--max_retries 0 \
--timeout "15m" \
--timeout 900 \
--env VLLM_ALLOW_LONG_MAX_MODEL_LEN=1 \
--budget ai2/oe-adapt \
--gpus 1 \
Expand Down
Loading