Skip to content

[WIP] Add torchft to CI #1398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/integration_test_8gpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
script: |
set -eux

nvidia-smi topo -p2p n

# The generic Linux job chooses to use base env, not the one setup by the image
CONDA_ENV=$(conda env list --json | jq -r ".envs | .[-1]")
conda activate "${CONDA_ENV}"
Expand Down
58 changes: 58 additions & 0 deletions .github/workflows/integration_test_8gpu_torchft.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name: TorchFT 8 GPU Integration Test

on:
push:
branches: [ main ]
paths:
- 'torchtitan/components/ft.py'
pull_request:
paths:
- 'torchtitan/components/ft.py'
schedule:
# Runs every 6 hours
- cron: '0 */6 * * *'
concurrency:
group: unit-test${{ github.workflow }}-${{ github.ref == 'refs/heads/main' && github.run_number || github.ref }}
cancel-in-progress: true

defaults:
run:
shell: bash -l -eo pipefail {0}

jobs:
build-test:
uses: pytorch/test-infra/.github/workflows/linux_job_v2.yml@main
with:
runner: linux.g5.48xlarge.nvidia.gpu
gpu-arch-type: cuda
gpu-arch-version: "12.6"
# This image is faster to clone than the default, but it lacks CC needed by triton
# (1m25s vs 2m37s).
docker-image: torchtitan-ubuntu-20.04-clang12
repository: pytorch/torchtitan
upload-artifact: outputs
script: |
set -eux

# The generic Linux job chooses to use base env, not the one setup by the image
CONDA_ENV=$(conda env list --json | jq -r ".envs | .[-1]")
conda activate "${CONDA_ENV}"

nvidia-smi topo -p2p n

pip config --user set global.progress_bar off

curl --proto '=https' --tlsv1.2 https://sh.rustup.rs -sSf | sh

python -m pip install --force-reinstall --pre torch --index-url https://download.pytorch.org/whl/nightly/cu126
pip install maturin
git clone https://github.com/pytorch/torchft
pushd torchft; pip install .; popd

mkdir artifacts-to-be-uploaded
echo "torchft_lighthouse"
RUST_BACKTRACE=1 torchft_lighthouse --min_replicas 1 --quorum_tick_ms 100 --join_timeout_ms 10000 > /dev/null 2>&1 &
echo "ft_integration_test"
# Getting error - Cuda failure 217 'peer access is not supported between these two devices'
python ./tests/integration_tests_ft.py artifacts-to-be-uploaded --ngpu 8
# pkill -9 torchft_lighthouse
155 changes: 155 additions & 0 deletions tests/integration_tests_ft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import argparse
import concurrent.futures
import logging
import os
import subprocess
from collections import defaultdict
from dataclasses import dataclass
from typing import Sequence

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib


@dataclass
class OverrideDefinitions:
"""
This class is used to define the override definitions for the integration tests.
"""

override_args: Sequence[Sequence[str]] = tuple(tuple(" "))
test_descr: str = "default"
test_name: str = "default"
ngpu: int = 4
model_flavor: str = "debugmodel"

def __repr__(self):
return self.test_descr


def build_test_list():
"""
key is the config file name and value is a list of OverrideDefinitions
that is used to generate variations of integration tests based on the
same root config file.
"""
integration_tests_flavors = defaultdict(list)
integration_tests_flavors["debug_model.toml"] = [
OverrideDefinitions(
[
["--training.steps 10", "--checkpoint.enable_checkpoint"],
],
"Default TorchFT integration test",
"default_torchft",
)
]
return integration_tests_flavors


def _run_cmd(cmd):
return subprocess.run([cmd], text=True, shell=True)


def run_test(test_flavor: OverrideDefinitions, full_path: str, output_dir: str):
# run_test supports sequence of tests.
test_name = test_flavor.test_name
dump_folder_arg = f"--job.dump_folder {output_dir}/{test_name}"
model_flavor_arg = f"--model.flavor {test_flavor.model_flavor}"

all_ranks = [",".join(map(str, range(0, 4))), ",".join(map(str, range(4, 8)))]

for idx, override_arg in enumerate(test_flavor.override_args):
cmds = []
for replica_id, ranks in enumerate(all_ranks):
cmd = (
f'TORCH_TRACE="{output_dir}/{test_name}/compile_trace" '
# + f"NCCL_P2P_DISABLE=1 " # TODO: Why is P2P not supported on CI?
+ f"NGPU=4 CUDA_VISIBLE_DEVICES={ranks} "
+ f"CONFIG_FILE={full_path} ./run_train.sh "
+ "--fault_tolerance.enable "
+ f"--fault_tolerance.replica_id={replica_id} --fault_tolerance.group_size=2"
)

cmd += " " + dump_folder_arg
cmd += " " + model_flavor_arg
if override_arg:
cmd += " " + " ".join(override_arg)

logger.info(
"=====TorchFT Integration test, flavor : "
f"{test_flavor.test_descr}, command : {cmd}====="
)
cmds.append(cmd)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(_run_cmd, cmd) for cmd in cmds]
results = [future.result() for future in futures]

for result in results:
logger.info(result.stdout)

if result.returncode == 0:
continue

raise Exception(
f"Integration test failed, flavor : {test_flavor.test_descr}, command : {cmd}"
)


def run_tests(args):
integration_tests_flavors = build_test_list()

if args.ngpu < 8:
logger.info("Skipping TorchFT integration tests as we need 8 GPUs.")
return

for config_file in os.listdir(args.config_dir):
if not config_file.endswith(".toml"):
continue

full_path = os.path.join(args.config_dir, config_file)
with open(full_path, "rb") as f:
config = tomllib.load(f)
is_integration_test = config["job"].get("use_for_integration_test", False)
if not is_integration_test:
continue

for test_flavor in integration_tests_flavors[config_file]:
if not (args.test == "all" or test_flavor.test_name == args.test):
continue

run_test(test_flavor, full_path, args.output_dir)


def main():
parser = argparse.ArgumentParser()
parser.add_argument("output_dir")
parser.add_argument(
"--config_dir", default="./torchtitan/models/llama3/train_configs"
)
parser.add_argument(
"--test",
default="all",
help="test to run, acceptable values: `test_name` in `build_test_list` (default: all)",
)
parser.add_argument("--ngpu", default=8, type=int)
args = parser.parse_args()

if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
run_tests(args)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions torchtitan/components/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ def save(self, curr_step: int, last_step: bool = False) -> None:
None
"""

# TODO: we are always saving the checkpoint when ft is on? even if enable_checkpoint is off?
if self.ft_manager:
self._ft_save(curr_step)

Expand Down
1 change: 1 addition & 0 deletions torchtitan/components/ft.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# TODO: test that changes in this file trigger CI
import importlib
from contextlib import nullcontext
from typing import ContextManager, Optional, TYPE_CHECKING, Union
Expand Down