Skip to content

Commit 77bc95a

Browse files
committed
Removed managed cluster func and fixed tests
Signed-off-by: Pat O'Connor <[email protected]>
1 parent b3251fc commit 77bc95a

File tree

11 files changed

+52
-648
lines changed

11 files changed

+52
-648
lines changed

src/codeflare_sdk/ray/cluster/cluster.py

Lines changed: 2 additions & 302 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import uuid
2828
import warnings
2929

30-
from ..job.job import RayJobSpec
31-
3230
from ...common.kubernetes_cluster.auth import (
3331
config_check,
3432
get_api_client,
@@ -609,298 +607,6 @@ def _component_resources_down(
609607
yamls = yaml.safe_load_all(self.resource_yaml)
610608
_delete_resources(yamls, namespace, api_instance, cluster_name)
611609

612-
@staticmethod
613-
def run_job_with_managed_cluster(
614-
cluster_config: ClusterConfiguration,
615-
job_config: RayJobSpec,
616-
job_cr_name: Optional[str] = None,
617-
submission_mode: str = "K8sJobMode",
618-
shutdown_after_job_finishes: bool = True,
619-
ttl_seconds_after_finished: Optional[int] = None,
620-
suspend_rayjob_creation: bool = False,
621-
wait_for_completion: bool = True,
622-
job_timeout_seconds: Optional[int] = 3600,
623-
job_polling_interval_seconds: int = 10,
624-
):
625-
"""
626-
Manages the lifecycle of a Ray cluster and a job by creating a RayJob custom resource.
627-
KubeRay operator will then create/delete the RayCluster based on the RayJob definition.
628-
629-
This method will:
630-
1. Generate a RayCluster specification from the provided 'cluster_config'.
631-
2. Construct a RayJob custom resource definition using 'job_config' and embedding the RayCluster spec.
632-
3. Create the RayJob resource in Kubernetes.
633-
4. Optionally, wait for the RayJob to complete or timeout, monitoring its status.
634-
5. The RayCluster lifecycle (creation and deletion) is managed by KubeRay
635-
based on the RayJob's 'shutdownAfterJobFinishes' field.
636-
637-
Args:
638-
cluster_config: Configuration for the Ray cluster to be created by RayJob.
639-
job_config: RayJobSpec object containing job-specific details like entrypoint, runtime_env, etc.
640-
job_cr_name: Name for the RayJob Custom Resource. If None, a unique name is generated.
641-
submission_mode: How the job is submitted ("K8sJobMode" or "RayClientMode").
642-
shutdown_after_job_finishes: If True, RayCluster is deleted after job finishes.
643-
ttl_seconds_after_finished: TTL for RayJob after it's finished.
644-
suspend_rayjob_creation: If True, creates the RayJob in a suspended state.
645-
wait_for_completion: If True, waits for the job to finish.
646-
job_timeout_seconds: Timeout for waiting for job completion.
647-
job_polling_interval_seconds: Interval for polling job status.
648-
649-
Returns:
650-
A dictionary containing details like RayJob CR name, reported job submission ID,
651-
final job status, dashboard URL, and the RayCluster name.
652-
653-
Raises:
654-
TimeoutError: If the job doesn't complete within the specified timeout.
655-
ApiException: For Kubernetes API errors.
656-
ValueError: For configuration issues.
657-
"""
658-
config_check()
659-
k8s_co_api = k8s_client.CustomObjectsApi(get_api_client())
660-
namespace = cluster_config.namespace
661-
662-
if not job_config.entrypoint:
663-
raise ValueError("job_config.entrypoint must be specified.")
664-
665-
# Warn if Pydantic V1/V2 specific fields in RayJobSpec are set, as they are not used for RayJob CR.
666-
if (
667-
job_config.entrypoint_num_cpus is not None
668-
or job_config.entrypoint_num_gpus is not None
669-
or job_config.entrypoint_memory is not None
670-
):
671-
warnings.warn(
672-
"RayJobSpec fields 'entrypoint_num_cpus', 'entrypoint_num_gpus', 'entrypoint_memory' "
673-
"are not directly used when creating a RayJob CR. They are primarily for the Ray Job Submission Client. "
674-
"Resource requests for the job driver pod should be configured in the RayCluster head node spec via ClusterConfiguration.",
675-
UserWarning,
676-
)
677-
678-
# Generate rayClusterSpec from ClusterConfiguration
679-
temp_config_for_spec = copy.deepcopy(cluster_config)
680-
temp_config_for_spec.appwrapper = False
681-
682-
with warnings.catch_warnings():
683-
warnings.simplefilter("ignore", UserWarning)
684-
dummy_cluster_for_spec = Cluster(temp_config_for_spec)
685-
686-
ray_cluster_cr_dict = dummy_cluster_for_spec.resource_yaml
687-
if (
688-
not isinstance(ray_cluster_cr_dict, dict)
689-
or "spec" not in ray_cluster_cr_dict
690-
):
691-
raise ValueError(
692-
"Failed to generate RayCluster CR dictionary from ClusterConfiguration. "
693-
f"Got: {type(ray_cluster_cr_dict)}"
694-
)
695-
ray_cluster_spec = ray_cluster_cr_dict["spec"]
696-
697-
# Prepare RayJob CR
698-
actual_job_cr_name = job_cr_name or f"rayjob-{uuid.uuid4().hex[:10]}"
699-
700-
runtime_env_yaml_str = ""
701-
if job_config.runtime_env:
702-
try:
703-
runtime_env_yaml_str = yaml.dump(job_config.runtime_env)
704-
except yaml.YAMLError as e:
705-
raise ValueError(
706-
f"Invalid job_config.runtime_env, failed to dump to YAML: {e}"
707-
)
708-
709-
ray_job_cr_spec = {
710-
"entrypoint": job_config.entrypoint,
711-
"shutdownAfterJobFinishes": shutdown_after_job_finishes,
712-
"rayClusterSpec": ray_cluster_spec,
713-
"submissionMode": submission_mode,
714-
}
715-
716-
if runtime_env_yaml_str:
717-
ray_job_cr_spec["runtimeEnvYAML"] = runtime_env_yaml_str
718-
if job_config.submission_id:
719-
ray_job_cr_spec["jobId"] = job_config.submission_id
720-
if job_config.metadata:
721-
ray_job_cr_spec["metadata"] = job_config.metadata
722-
if ttl_seconds_after_finished is not None:
723-
ray_job_cr_spec["ttlSecondsAfterFinished"] = ttl_seconds_after_finished
724-
if suspend_rayjob_creation:
725-
ray_job_cr_spec["suspend"] = True
726-
if job_config.entrypoint_resources:
727-
ray_job_cr_spec["entrypointResources"] = job_config.entrypoint_resources
728-
729-
ray_job_cr = {
730-
"apiVersion": "ray.io/v1",
731-
"kind": "RayJob",
732-
"metadata": {
733-
"name": actual_job_cr_name,
734-
"namespace": namespace,
735-
},
736-
"spec": ray_job_cr_spec,
737-
}
738-
739-
returned_job_submission_id = None
740-
final_job_status = "UNKNOWN"
741-
dashboard_url = None
742-
ray_cluster_name_actual = None
743-
744-
try:
745-
print(
746-
f"Submitting RayJob '{actual_job_cr_name}' to namespace '{namespace}'..."
747-
)
748-
k8s_co_api.create_namespaced_custom_object(
749-
group="ray.io",
750-
version="v1",
751-
namespace=namespace,
752-
plural="rayjobs",
753-
body=ray_job_cr,
754-
)
755-
print(f"RayJob '{actual_job_cr_name}' created successfully.")
756-
757-
if wait_for_completion:
758-
print(f"Waiting for RayJob '{actual_job_cr_name}' to complete...")
759-
start_time = time.time()
760-
while True:
761-
try:
762-
ray_job_status_cr = (
763-
k8s_co_api.get_namespaced_custom_object_status(
764-
group="ray.io",
765-
version="v1",
766-
namespace=namespace,
767-
plural="rayjobs",
768-
name=actual_job_cr_name,
769-
)
770-
)
771-
except ApiException as e:
772-
if e.status == 404:
773-
print(
774-
f"RayJob '{actual_job_cr_name}' status not found yet, retrying..."
775-
)
776-
time.sleep(job_polling_interval_seconds)
777-
continue
778-
raise
779-
780-
status_field = ray_job_status_cr.get("status", {})
781-
job_deployment_status = status_field.get(
782-
"jobDeploymentStatus", "UNKNOWN"
783-
)
784-
current_job_status = status_field.get("jobStatus", "PENDING")
785-
786-
dashboard_url = status_field.get("dashboardURL", dashboard_url)
787-
ray_cluster_name_actual = status_field.get(
788-
"rayClusterName", ray_cluster_name_actual
789-
)
790-
returned_job_submission_id = status_field.get(
791-
"jobId", job_config.submission_id
792-
)
793-
794-
final_job_status = current_job_status
795-
print(
796-
f"RayJob '{actual_job_cr_name}' status: JobDeployment='{job_deployment_status}', Job='{current_job_status}'"
797-
)
798-
799-
if current_job_status in ["SUCCEEDED", "FAILED", "STOPPED"]:
800-
break
801-
802-
if (
803-
job_timeout_seconds
804-
and (time.time() - start_time) > job_timeout_seconds
805-
):
806-
try:
807-
ray_job_status_cr_final = (
808-
k8s_co_api.get_namespaced_custom_object_status(
809-
group="ray.io",
810-
version="v1",
811-
namespace=namespace,
812-
plural="rayjobs",
813-
name=actual_job_cr_name,
814-
)
815-
)
816-
status_field_final = ray_job_status_cr_final.get(
817-
"status", {}
818-
)
819-
final_job_status = status_field_final.get(
820-
"jobStatus", final_job_status
821-
)
822-
returned_job_submission_id = status_field_final.get(
823-
"jobId", returned_job_submission_id
824-
)
825-
dashboard_url = status_field_final.get(
826-
"dashboardURL", dashboard_url
827-
)
828-
ray_cluster_name_actual = status_field_final.get(
829-
"rayClusterName", ray_cluster_name_actual
830-
)
831-
except Exception:
832-
pass
833-
raise TimeoutError(
834-
f"RayJob '{actual_job_cr_name}' timed out after {job_timeout_seconds} seconds. Last status: {final_job_status}"
835-
)
836-
837-
time.sleep(job_polling_interval_seconds)
838-
839-
print(
840-
f"RayJob '{actual_job_cr_name}' finished with status: {final_job_status}"
841-
)
842-
else:
843-
try:
844-
ray_job_status_cr = k8s_co_api.get_namespaced_custom_object_status(
845-
group="ray.io",
846-
version="v1",
847-
namespace=namespace,
848-
plural="rayjobs",
849-
name=actual_job_cr_name,
850-
)
851-
status_field = ray_job_status_cr.get("status", {})
852-
final_job_status = status_field.get("jobStatus", "SUBMITTED")
853-
returned_job_submission_id = status_field.get(
854-
"jobId", job_config.submission_id
855-
)
856-
dashboard_url = status_field.get("dashboardURL", dashboard_url)
857-
ray_cluster_name_actual = status_field.get(
858-
"rayClusterName", ray_cluster_name_actual
859-
)
860-
except ApiException as e:
861-
if e.status == 404:
862-
final_job_status = "SUBMITTED_NOT_FOUND"
863-
else:
864-
print(
865-
f"Warning: Could not fetch initial status for RayJob '{actual_job_cr_name}': {e}"
866-
)
867-
final_job_status = "UNKNOWN_API_ERROR"
868-
869-
return {
870-
"job_cr_name": actual_job_cr_name,
871-
"job_submission_id": returned_job_submission_id,
872-
"job_status": final_job_status,
873-
"dashboard_url": dashboard_url,
874-
"ray_cluster_name": ray_cluster_name_actual,
875-
}
876-
877-
except ApiException as e:
878-
print(
879-
f"Kubernetes API error during RayJob '{actual_job_cr_name}' management: {e.reason} (status: {e.status})"
880-
)
881-
final_status_on_error = "ERROR_BEFORE_SUBMISSION"
882-
if actual_job_cr_name:
883-
try:
884-
ray_job_status_cr = k8s_co_api.get_namespaced_custom_object_status(
885-
group="ray.io",
886-
version="v1",
887-
namespace=namespace,
888-
plural="rayjobs",
889-
name=actual_job_cr_name,
890-
)
891-
status_field = ray_job_status_cr.get("status", {})
892-
final_status_on_error = status_field.get(
893-
"jobStatus", "UNKNOWN_AFTER_K8S_ERROR"
894-
)
895-
except Exception:
896-
final_status_on_error = "UNKNOWN_FINAL_STATUS_FETCH_FAILED"
897-
raise
898-
except Exception as e:
899-
print(
900-
f"An unexpected error occurred during managed RayJob execution for '{actual_job_cr_name}': {e}"
901-
)
902-
raise
903-
904610

905611
def list_all_clusters(namespace: str, print_to_console: bool = True):
906612
"""
@@ -1057,21 +763,15 @@ def get_cluster(
1057763
head_extended_resource_requests=head_extended_resources,
1058764
worker_extended_resource_requests=worker_extended_resources,
1059765
)
1060-
# 1. Prepare RayClusterSpec from ClusterConfiguration
1061-
# Create a temporary config with appwrapper=False to ensure build_ray_cluster returns RayCluster YAML
1062-
temp_cluster_config_dict = cluster_config.dict(
1063-
exclude_none=True
1064-
) # Assuming Pydantic V1 or similar .dict() method
1065-
temp_cluster_config_dict["appwrapper"] = False
1066-
temp_cluster_config_for_spec = ClusterConfiguration(**temp_cluster_config_dict)
766+
1067767
# Ignore the warning here for the lack of a ClusterConfiguration
1068768
with warnings.catch_warnings():
1069769
warnings.filterwarnings(
1070770
"ignore",
1071771
message="Please provide a ClusterConfiguration to initialise the Cluster object",
1072772
)
1073773
cluster = Cluster(None)
1074-
cluster.config = temp_cluster_config_for_spec
774+
cluster.config = cluster_config
1075775

1076776
# Remove auto-generated fields like creationTimestamp, uid and etc.
1077777
remove_autogenerated_fields(resource)

src/codeflare_sdk/ray/cluster/test_cluster.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,5 +758,11 @@ def custom_side_effect(group, version, namespace, plural, **kwargs):
758758

759759
# Make sure to always keep this function last
760760
def test_cleanup():
761-
os.remove(f"{aw_dir}test-all-params.yaml")
762-
os.remove(f"{aw_dir}aw-all-params.yaml")
761+
# Remove files only if they exist
762+
test_file = f"{aw_dir}test-all-params.yaml"
763+
if os.path.exists(test_file):
764+
os.remove(test_file)
765+
766+
aw_file = f"{aw_dir}aw-all-params.yaml"
767+
if os.path.exists(aw_file):
768+
os.remove(aw_file)

0 commit comments

Comments
 (0)