Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/ray-finetune-llm-deepspeed/create_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import json
import os

cache_dir="../../datasets"
if not os.path.exists(cache_dir):
cache_dir=""
dataset = load_dataset("gsm8k", "main", cache_dir=cache_dir)

def gsm8k_qa_tokens_template():
dataset = load_dataset("gsm8k", "main")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"# On OpenShift, you can retrieve the token by running `oc whoami -t`,\n",
"# and the server with `oc cluster-info`.\n",
"auth = TokenAuthentication(\n",
" token = \"\",\n",
" server = \"\",\n",
" token = '',\n",
" server = '',\n",
" skip_tls=False\n",
")\n",
"auth.login()"
Expand Down Expand Up @@ -125,7 +125,7 @@
"source": [
"# The S3 bucket where to store checkpoint.\n",
"# It can be set manually, otherwise it's retrieved from configured the data connection.\n",
"s3_bucket = \"\"\n",
"s3_bucket = ''\n",
"if not s3_bucket:\n",
" s3_bucket = os.environ.get('AWS_S3_BUCKET')\n",
"assert s3_bucket, \"An S3 bucket must be provided to store checkpoints\""
Expand All @@ -152,12 +152,12 @@
" \"--eval-batch-size-per-device=32 \",\n",
" runtime_env={\n",
" \"env_vars\": {\n",
" \"AWS_ACCESS_KEY_ID\": os.environ.get('AWS_ACCESS_KEY_ID'),\n",
" \"AWS_SECRET_ACCESS_KEY\": os.environ.get('AWS_SECRET_ACCESS_KEY'),\n",
" \"AWS_DEFAULT_REGION\": os.environ.get('AWS_DEFAULT_REGION')\n",
" 'AWS_ACCESS_KEY_ID': os.environ.get('AWS_ACCESS_KEY_ID'),\n",
" 'AWS_SECRET_ACCESS_KEY': os.environ.get('AWS_SECRET_ACCESS_KEY'),\n",
" 'AWS_DEFAULT_REGION': os.environ.get('AWS_DEFAULT_REGION')\n",
" },\n",
" \"pip\": \"requirements.txt\",\n",
" \"working_dir\": \"./\",\n",
" 'pip': 'requirements.txt',\n",
" 'working_dir': './',\n",
" \"excludes\": [\"/docs/\", \"*.ipynb\", \"*.md\"]\n",
" },\n",
")\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ def parse_args():

parser.add_argument("--lora", action="store_true", default=False,
help="If passed, will enable parameter efficient fine-tuning with LoRA.")

parser.add_argument("--lora-config", type=str, default="./lora_configs/lora.json",
help="Lora config json to use.")

parser.add_argument("--num-epochs", type=int, default=1,
help="Number of epochs to train for.")
Expand Down Expand Up @@ -669,7 +672,7 @@ def main():

# Add LoRA config if needed
if args.lora:
with open("./lora_configs/lora.json", "r") as json_file:
with open(args.lora_config, "r") as json_file:
lora_config = json.load(json_file)
config["lora_config"] = lora_config

Expand Down
1 change: 0 additions & 1 deletion tests/odh/mnist_ray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func mnistRay(t *testing.T, numGpus int) {

// Fetch created raycluster
rayClusterName := "mnisttest"
// Wait until raycluster is up and running
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

Expand Down
1 change: 0 additions & 1 deletion tests/odh/mnist_raytune_hpo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {

// Fetch created raycluster
rayClusterName := "mnisthpotest"
// Wait until raycluster is up and running
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

Expand Down
2 changes: 0 additions & 2 deletions tests/odh/notebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
)

const recommendedTagAnnotation = "opendatahub.io/workbench-image-recommended"

var notebookResource = schema.GroupVersionResource{Group: "kubeflow.org", Version: "v1", Resource: "notebooks"}

type NotebookProps struct {
Expand Down
169 changes: 169 additions & 0 deletions tests/odh/ray_finetune_llm_deepspeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package odh

import (
"fmt"
"os"
"strings"
"testing"
"time"

. "github.com/onsi/gomega"
. "github.com/project-codeflare/codeflare-common/support"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestRayFinetuneLlmDeepspeedDemoLlama_2_7b(t *testing.T) {
rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Llama-2-7b-chat-hf", "zero_3_llama_2_7b.json")
}
func TestRayFinetuneLlmDeepspeedDemoLlama_31_8b(t *testing.T) {
rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Meta-Llama-3.1-8B", "zero_3_offload_optim_param.json")
}

func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelName string, modelConfigFile string) {
test := With(t)

// Create a namespace
namespace := test.NewTestNamespace()
var workingDirectory, err = os.Getwd()
test.Expect(err).ToNot(HaveOccurred())

// Define the regular(non-admin) user
userName := GetNotebookUserName(test)
userToken := GetNotebookUserToken(test)

// Create role binding with Namespace specific admin cluster role
CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin")

// list changes required in llm-deepspeed-finetune-demo.ipynb file and update those
requiredChangesInNotebook := map[string]string{
"import os": "import os,time,sys",
"import sys": "!cp /opt/app-root/notebooks/* ./\\n\",\n\t\"!ls",
"from codeflare_sdk.cluster.auth import TokenAuthentication": "from codeflare_sdk.cluster.auth import TokenAuthentication\\n\",\n\t\"from codeflare_sdk.job import RayJobClient",
"token = ''": fmt.Sprintf("token = '%s'", userToken),
"server = ''": fmt.Sprintf("server = '%s'", GetOpenShiftApiUrl(test)),
"namespace='ray-finetune-llm-deepspeed'": fmt.Sprintf("namespace='%s'", namespace.Name),
"head_cpus=16": "head_cpus=2",
"head_extended_resource_requests=1": "head_extended_resource_requests=0",
"num_workers=7": "num_workers=1",
"worker_cpu_requests=16": "worker_cpu_requests=4",
"worker_cpu_limits=16": "worker_cpu_limits=4",
"worker_memory_requests=128": "worker_memory_requests=64",
"worker_memory_limits=256": "worker_memory_limits=128",
"head_memory=128": "head_memory=48",
"client = cluster.job_client": "ray_dashboard = cluster.cluster_dashboard_uri()\\n\",\n\t\"header = {\\\"Authorization\\\": \\\"Bearer " + userToken + "\\\"}\\n\",\n\t\"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\\n",
"--num-devices=8": fmt.Sprintf("--num-devices=%d", numGpus),
"--num-epochs=3": fmt.Sprintf("--num-epochs=%d", 1),
"--model-name=meta-llama/Meta-Llama-3.1-8B": fmt.Sprintf("--model-name=%s", modelName),
"--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json": fmt.Sprintf("--ds-config=./%s \\\"\\n\",\n\t\" \\\"--lora-config=./lora.json \\\"\\n\",\n\t\" \\\"--as-test", modelConfigFile),
"--batch-size-per-device=32": "--batch-size-per-device=6",
"--eval-batch-size-per-device=32": "--eval-batch-size-per-device=6",
"'pip': 'requirements.txt'": "'pip': '/opt/app-root/src/requirements.txt'",
"'working_dir': './'": "'working_dir': '/opt/app-root/src'",
"client.stop_job(submission_id)": "finished = False\\n\",\n\t\"while not finished:\\n\",\n\t\" time.sleep(1)\\n\",\n\t\" status = client.get_job_status(submission_id)\\n\",\n\t\" finished = (status == \\\"SUCCEEDED\\\")\\n\",\n\t\"if finished:\\n\",\n\t\" print(\\\"Job completed Successfully !\\\")\\n\",\n\t\"else:\\n\",\n\t\" print(\\\"Job failed !\\\")\\n\",\n\t\"time.sleep(10)\\n",
}

updatedNotebookContent := string(ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb"))
for oldValue, newValue := range requiredChangesInNotebook {
updatedNotebookContent = strings.Replace(updatedNotebookContent, oldValue, newValue, -1)
}
updatedNotebook := []byte(updatedNotebookContent)

// Test configuration
jupyterNotebookConfigMapFileName := "ray_finetune_llm_deepspeed.ipynb"
configMap := map[string][]byte{
jupyterNotebookConfigMapFileName: updatedNotebook,
"ray_finetune_llm_deepspeed.py": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py"),
"requirements.txt": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/requirements.txt"),
"create_dataset.py": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/create_dataset.py"),
"lora.json": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/lora_configs/lora.json"),
modelConfigFile: ReadFileExt(test, fmt.Sprintf(workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/deepspeed_configs/%s", modelConfigFile)),
"utils.py": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/utils.py"),
}

config := CreateConfigMap(test, namespace.Name, configMap)

// Create Notebook CR
createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, numGpus)

// Gracefully cleanup Notebook
defer func() {
deleteNotebook(test, namespace)
test.Eventually(listNotebooks(test, namespace), TestTimeoutGpuProvisioning).Should(HaveLen(0))
}()

// Make sure the RayCluster is created and running
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutGpuProvisioning).
Should(
And(
HaveLen(1),
ContainElement(WithTransform(RayClusterState, Equal(rayv1.Ready))),
),
)

// Fetch created raycluster
rayClusterName := "ray"
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true}
rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken)
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err))

// wait until rayjob exists
test.Eventually(func() []RayJobDetailsResponse {
rayJobs, err := rayClient.GetJobs()
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err))
return *rayJobs
}, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found")

// Get test job-id
jobID := GetTestJobId(test, rayClient, dashboardUrl.Host)
test.Expect(jobID).ToNot(BeEmpty())

// Wait for the job to be succeeded or failed
var rayJobStatus string
test.T().Logf("Waiting for job to be Succeeded...\n")
test.Eventually(func() string {
resp, err := rayClient.GetJobDetails(jobID)
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err))
rayJobStatusVal := resp.Status
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
test.T().Logf("JobStatus - %s\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
return rayJobStatus
}
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
test.T().Logf("JobStatus - %s...\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
}
return rayJobStatus
}, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
// Store job logs in output directory
WriteRayJobAPILogs(test, rayClient, jobID)

// Assert ray-job status after job execution
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")

// Make sure the RayCluster finishes and is deleted
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
Should(BeEmpty())
}
16 changes: 11 additions & 5 deletions tests/odh/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package odh
import (
"embed"
"net/url"
"os"

. "github.com/onsi/gomega"
gomega "github.com/onsi/gomega"
"github.com/project-codeflare/codeflare-common/support"
. "github.com/project-codeflare/codeflare-common/support"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
v1 "k8s.io/api/core/v1"
)
Expand All @@ -38,19 +37,26 @@ func ReadFile(t support.Test, fileName string) []byte {
return file
}

func ReadFileExt(t support.Test, fileName string) []byte {
t.T().Helper()
file, err := os.ReadFile(fileName)
t.Expect(err).NotTo(gomega.HaveOccurred())
return file
}

func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *rayv1.RayCluster) *url.URL {
dashboardName := "ray-dashboard-" + rayCluster.Name
route := GetRoute(test, namespace.Name, dashboardName)
route := support.GetRoute(test, namespace.Name, dashboardName)
hostname := route.Status.Ingress[0].Host
dashboardUrl, _ := url.Parse("https://" + hostname)
test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String())

return dashboardUrl
}

func GetTestJobId(test Test, rayClient RayClusterClient, hostName string) string {
func GetTestJobId(test support.Test, rayClient support.RayClusterClient, hostName string) string {
allJobsData, err := rayClient.GetJobs()
test.Expect(err).ToNot(HaveOccurred())
test.Expect(err).ToNot(gomega.HaveOccurred())

jobID := (*allJobsData)[0].SubmissionID
if len(*allJobsData) > 0 {
Expand Down