diff --git a/examples/ray-finetune-llm-deepspeed/create_dataset.py b/examples/ray-finetune-llm-deepspeed/create_dataset.py index 89467cb28..5b0429c7d 100644 --- a/examples/ray-finetune-llm-deepspeed/create_dataset.py +++ b/examples/ray-finetune-llm-deepspeed/create_dataset.py @@ -2,6 +2,10 @@ import json import os +cache_dir="../../datasets" +if not os.path.exists(cache_dir): + cache_dir="" +dataset = load_dataset("gsm8k", "main", cache_dir=cache_dir) def gsm8k_qa_tokens_template(): dataset = load_dataset("gsm8k", "main") diff --git a/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb b/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb index 92f237aad..2ddf087e8 100644 --- a/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb +++ b/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb @@ -42,8 +42,8 @@ "# On OpenShift, you can retrieve the token by running `oc whoami -t`,\n", "# and the server with `oc cluster-info`.\n", "auth = TokenAuthentication(\n", - " token = \"\",\n", - " server = \"\",\n", + " token = '',\n", + " server = '',\n", " skip_tls=False\n", ")\n", "auth.login()" @@ -125,7 +125,7 @@ "source": [ "# The S3 bucket where to store checkpoint.\n", "# It can be set manually, otherwise it's retrieved from configured the data connection.\n", - "s3_bucket = \"\"\n", + "s3_bucket = ''\n", "if not s3_bucket:\n", " s3_bucket = os.environ.get('AWS_S3_BUCKET')\n", "assert s3_bucket, \"An S3 bucket must be provided to store checkpoints\"" @@ -152,12 +152,12 @@ " \"--eval-batch-size-per-device=32 \",\n", " runtime_env={\n", " \"env_vars\": {\n", - " \"AWS_ACCESS_KEY_ID\": os.environ.get('AWS_ACCESS_KEY_ID'),\n", - " \"AWS_SECRET_ACCESS_KEY\": os.environ.get('AWS_SECRET_ACCESS_KEY'),\n", - " \"AWS_DEFAULT_REGION\": os.environ.get('AWS_DEFAULT_REGION')\n", + " 'AWS_ACCESS_KEY_ID': os.environ.get('AWS_ACCESS_KEY_ID'),\n", + " 'AWS_SECRET_ACCESS_KEY': os.environ.get('AWS_SECRET_ACCESS_KEY'),\n", + " 'AWS_DEFAULT_REGION': os.environ.get('AWS_DEFAULT_REGION')\n", " },\n", - " \"pip\": \"requirements.txt\",\n", - " \"working_dir\": \"./\",\n", + " 'pip': 'requirements.txt',\n", + " 'working_dir': './',\n", " \"excludes\": [\"/docs/\", \"*.ipynb\", \"*.md\"]\n", " },\n", ")\n", diff --git a/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py b/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py index fa5c80d02..93c9e9d40 100644 --- a/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py +++ b/examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py @@ -601,6 +601,9 @@ def parse_args(): parser.add_argument("--lora", action="store_true", default=False, help="If passed, will enable parameter efficient fine-tuning with LoRA.") + + parser.add_argument("--lora-config", type=str, default="./lora_configs/lora.json", + help="Lora config json to use.") parser.add_argument("--num-epochs", type=int, default=1, help="Number of epochs to train for.") @@ -669,7 +672,7 @@ def main(): # Add LoRA config if needed if args.lora: - with open("./lora_configs/lora.json", "r") as json_file: + with open(args.lora_config, "r") as json_file: lora_config = json.load(json_file) config["lora_config"] = lora_config diff --git a/tests/odh/mnist_ray_test.go b/tests/odh/mnist_ray_test.go index 0d74b2a52..c306d3ec9 100644 --- a/tests/odh/mnist_ray_test.go +++ b/tests/odh/mnist_ray_test.go @@ -131,7 +131,6 @@ func mnistRay(t *testing.T, numGpus int) { // Fetch created raycluster rayClusterName := "mnisttest" - // Wait until raycluster is up and running rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) test.Expect(err).ToNot(HaveOccurred()) diff --git a/tests/odh/mnist_raytune_hpo_test.go b/tests/odh/mnist_raytune_hpo_test.go index dbe436d42..5e40e4baa 100644 --- a/tests/odh/mnist_raytune_hpo_test.go +++ b/tests/odh/mnist_raytune_hpo_test.go @@ -132,7 +132,6 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { // Fetch created raycluster rayClusterName := "mnisthpotest" - // Wait until raycluster is up and running rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) test.Expect(err).ToNot(HaveOccurred()) diff --git a/tests/odh/notebook.go b/tests/odh/notebook.go index 12b8aa51f..f09d8312f 100644 --- a/tests/odh/notebook.go +++ b/tests/odh/notebook.go @@ -29,8 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" ) -const recommendedTagAnnotation = "opendatahub.io/workbench-image-recommended" - var notebookResource = schema.GroupVersionResource{Group: "kubeflow.org", Version: "v1", Resource: "notebooks"} type NotebookProps struct { diff --git a/tests/odh/ray_finetune_llm_deepspeed_test.go b/tests/odh/ray_finetune_llm_deepspeed_test.go new file mode 100644 index 000000000..ead556ea9 --- /dev/null +++ b/tests/odh/ray_finetune_llm_deepspeed_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package odh + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + . "github.com/onsi/gomega" + . "github.com/project-codeflare/codeflare-common/support" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestRayFinetuneLlmDeepspeedDemoLlama_2_7b(t *testing.T) { + rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Llama-2-7b-chat-hf", "zero_3_llama_2_7b.json") +} +func TestRayFinetuneLlmDeepspeedDemoLlama_31_8b(t *testing.T) { + rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Meta-Llama-3.1-8B", "zero_3_offload_optim_param.json") +} + +func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelName string, modelConfigFile string) { + test := With(t) + + // Create a namespace + namespace := test.NewTestNamespace() + var workingDirectory, err = os.Getwd() + test.Expect(err).ToNot(HaveOccurred()) + + // Define the regular(non-admin) user + userName := GetNotebookUserName(test) + userToken := GetNotebookUserToken(test) + + // Create role binding with Namespace specific admin cluster role + CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin") + + // list changes required in llm-deepspeed-finetune-demo.ipynb file and update those + requiredChangesInNotebook := map[string]string{ + "import os": "import os,time,sys", + "import sys": "!cp /opt/app-root/notebooks/* ./\\n\",\n\t\"!ls", + "from codeflare_sdk.cluster.auth import TokenAuthentication": "from codeflare_sdk.cluster.auth import TokenAuthentication\\n\",\n\t\"from codeflare_sdk.job import RayJobClient", + "token = ''": fmt.Sprintf("token = '%s'", userToken), + "server = ''": fmt.Sprintf("server = '%s'", GetOpenShiftApiUrl(test)), + "namespace='ray-finetune-llm-deepspeed'": fmt.Sprintf("namespace='%s'", namespace.Name), + "head_cpus=16": "head_cpus=2", + "head_extended_resource_requests=1": "head_extended_resource_requests=0", + "num_workers=7": "num_workers=1", + "worker_cpu_requests=16": "worker_cpu_requests=4", + "worker_cpu_limits=16": "worker_cpu_limits=4", + "worker_memory_requests=128": "worker_memory_requests=64", + "worker_memory_limits=256": "worker_memory_limits=128", + "head_memory=128": "head_memory=48", + "client = cluster.job_client": "ray_dashboard = cluster.cluster_dashboard_uri()\\n\",\n\t\"header = {\\\"Authorization\\\": \\\"Bearer " + userToken + "\\\"}\\n\",\n\t\"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\\n", + "--num-devices=8": fmt.Sprintf("--num-devices=%d", numGpus), + "--num-epochs=3": fmt.Sprintf("--num-epochs=%d", 1), + "--model-name=meta-llama/Meta-Llama-3.1-8B": fmt.Sprintf("--model-name=%s", modelName), + "--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json": fmt.Sprintf("--ds-config=./%s \\\"\\n\",\n\t\" \\\"--lora-config=./lora.json \\\"\\n\",\n\t\" \\\"--as-test", modelConfigFile), + "--batch-size-per-device=32": "--batch-size-per-device=6", + "--eval-batch-size-per-device=32": "--eval-batch-size-per-device=6", + "'pip': 'requirements.txt'": "'pip': '/opt/app-root/src/requirements.txt'", + "'working_dir': './'": "'working_dir': '/opt/app-root/src'", + "client.stop_job(submission_id)": "finished = False\\n\",\n\t\"while not finished:\\n\",\n\t\" time.sleep(1)\\n\",\n\t\" status = client.get_job_status(submission_id)\\n\",\n\t\" finished = (status == \\\"SUCCEEDED\\\")\\n\",\n\t\"if finished:\\n\",\n\t\" print(\\\"Job completed Successfully !\\\")\\n\",\n\t\"else:\\n\",\n\t\" print(\\\"Job failed !\\\")\\n\",\n\t\"time.sleep(10)\\n", + } + + updatedNotebookContent := string(ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb")) + for oldValue, newValue := range requiredChangesInNotebook { + updatedNotebookContent = strings.Replace(updatedNotebookContent, oldValue, newValue, -1) + } + updatedNotebook := []byte(updatedNotebookContent) + + // Test configuration + jupyterNotebookConfigMapFileName := "ray_finetune_llm_deepspeed.ipynb" + configMap := map[string][]byte{ + jupyterNotebookConfigMapFileName: updatedNotebook, + "ray_finetune_llm_deepspeed.py": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py"), + "requirements.txt": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/requirements.txt"), + "create_dataset.py": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/create_dataset.py"), + "lora.json": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/lora_configs/lora.json"), + modelConfigFile: ReadFileExt(test, fmt.Sprintf(workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/deepspeed_configs/%s", modelConfigFile)), + "utils.py": ReadFileExt(test, workingDirectory+"/../../examples/ray-finetune-llm-deepspeed/utils.py"), + } + + config := CreateConfigMap(test, namespace.Name, configMap) + + // Create Notebook CR + createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, numGpus) + + // Gracefully cleanup Notebook + defer func() { + deleteNotebook(test, namespace) + test.Eventually(listNotebooks(test, namespace), TestTimeoutGpuProvisioning).Should(HaveLen(0)) + }() + + // Make sure the RayCluster is created and running + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutGpuProvisioning). + Should( + And( + HaveLen(1), + ContainElement(WithTransform(RayClusterState, Equal(rayv1.Ready))), + ), + ) + + // Fetch created raycluster + rayClusterName := "ray" + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) + test.Expect(err).ToNot(HaveOccurred()) + + // Initialise raycluster client to interact with raycluster to get rayjob details using REST-API + dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) + rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} + rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err)) + + // wait until rayjob exists + test.Eventually(func() []RayJobDetailsResponse { + rayJobs, err := rayClient.GetJobs() + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err)) + return *rayJobs + }, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found") + + // Get test job-id + jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) + test.Expect(jobID).ToNot(BeEmpty()) + + // Wait for the job to be succeeded or failed + var rayJobStatus string + test.T().Logf("Waiting for job to be Succeeded...\n") + test.Eventually(func() string { + resp, err := rayClient.GetJobDetails(jobID) + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err)) + rayJobStatusVal := resp.Status + if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { + test.T().Logf("JobStatus - %s\n", rayJobStatusVal) + rayJobStatus = rayJobStatusVal + return rayJobStatus + } + if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { + test.T().Logf("JobStatus - %s...\n", rayJobStatusVal) + rayJobStatus = rayJobStatusVal + } + return rayJobStatus + }, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + // Store job logs in output directory + WriteRayJobAPILogs(test, rayClient, jobID) + + // Assert ray-job status after job execution + test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") + + // Make sure the RayCluster finishes and is deleted + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). + Should(BeEmpty()) +} diff --git a/tests/odh/support.go b/tests/odh/support.go index d73e4e91e..a6232a323 100644 --- a/tests/odh/support.go +++ b/tests/odh/support.go @@ -19,11 +19,10 @@ package odh import ( "embed" "net/url" + "os" - . "github.com/onsi/gomega" gomega "github.com/onsi/gomega" "github.com/project-codeflare/codeflare-common/support" - . "github.com/project-codeflare/codeflare-common/support" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" v1 "k8s.io/api/core/v1" ) @@ -38,9 +37,16 @@ func ReadFile(t support.Test, fileName string) []byte { return file } +func ReadFileExt(t support.Test, fileName string) []byte { + t.T().Helper() + file, err := os.ReadFile(fileName) + t.Expect(err).NotTo(gomega.HaveOccurred()) + return file +} + func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *rayv1.RayCluster) *url.URL { dashboardName := "ray-dashboard-" + rayCluster.Name - route := GetRoute(test, namespace.Name, dashboardName) + route := support.GetRoute(test, namespace.Name, dashboardName) hostname := route.Status.Ingress[0].Host dashboardUrl, _ := url.Parse("https://" + hostname) test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String()) @@ -48,9 +54,9 @@ func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *ray return dashboardUrl } -func GetTestJobId(test Test, rayClient RayClusterClient, hostName string) string { +func GetTestJobId(test support.Test, rayClient support.RayClusterClient, hostName string) string { allJobsData, err := rayClient.GetJobs() - test.Expect(err).ToNot(HaveOccurred()) + test.Expect(err).ToNot(gomega.HaveOccurred()) jobID := (*allJobsData)[0].SubmissionID if len(*allJobsData) > 0 {