2020from typing import Dict , Any , Optional , Tuple
2121from odh_kuberay_client .kuberay_job_api import RayjobApi
2222
23+ from ..cluster .cluster import Cluster
24+ from ..cluster .config import ClusterConfiguration
25+ from ..cluster .build_ray_cluster import build_ray_cluster
26+
2327from .status import (
2428 RayJobDeploymentStatus ,
2529 CodeflareRayJobStatus ,
2630 RayJobInfo ,
2731)
2832from . import pretty_print
2933
30- # Set up logging
34+
3135logger = logging .getLogger (__name__ )
3236
3337
@@ -42,74 +46,110 @@ class RayJob:
4246 def __init__ (
4347 self ,
4448 job_name : str ,
45- cluster_name : str ,
49+ cluster_name : Optional [str ] = None ,
50+ cluster_config : Optional [ClusterConfiguration ] = None ,
4651 namespace : str = "default" ,
47- entrypoint : str = " None" ,
52+ entrypoint : Optional [ str ] = None ,
4853 runtime_env : Optional [Dict [str , Any ]] = None ,
54+ shutdown_after_job_finishes : bool = True ,
55+ ttl_seconds_after_finished : int = 0 ,
56+ active_deadline_seconds : Optional [int ] = None ,
4957 ):
5058 """
5159 Initialize a RayJob instance.
5260
5361 Args:
54- name: The name for the Ray job
55- namespace: The Kubernetes namespace to submit the job to (default: "default")
56- cluster_name: The name of the Ray cluster to submit the job to
57- **kwargs: Additional configuration options
62+ job_name: The name for the Ray job
63+ cluster_name: The name of an existing Ray cluster (optional if cluster_config provided)
64+ cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
65+ namespace: The Kubernetes namespace (default: "default")
66+ entrypoint: The Python script or command to run (required for submission)
67+ runtime_env: Ray runtime environment configuration (optional)
68+ shutdown_after_job_finishes: Whether to automatically cleanup the cluster after job completion (default: True)
69+ ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
70+ active_deadline_seconds: Maximum time the job can run before being terminated (optional)
5871 """
72+ # Validate input parameters
73+ if cluster_name is None and cluster_config is None :
74+ raise ValueError ("Either cluster_name or cluster_config must be provided" )
75+
76+ if cluster_name is not None and cluster_config is not None :
77+ raise ValueError ("Cannot specify both cluster_name and cluster_config" )
78+
5979 self .name = job_name
6080 self .namespace = namespace
61- self .cluster_name = cluster_name
6281 self .entrypoint = entrypoint
6382 self .runtime_env = runtime_env
83+ self .shutdown_after_job_finishes = shutdown_after_job_finishes
84+ self .ttl_seconds_after_finished = ttl_seconds_after_finished
85+ self .active_deadline_seconds = active_deadline_seconds
86+
87+ # Cluster configuration
88+ self ._cluster_name = cluster_name
89+ self ._cluster_config = cluster_config
90+
91+ # Determine cluster name for the job
92+ if cluster_config is not None :
93+ # Ensure cluster config has the same namespace as the job
94+ if cluster_config .namespace is None :
95+ cluster_config .namespace = namespace
96+ elif cluster_config .namespace != namespace :
97+ logger .warning (
98+ f"Cluster config namespace ({ cluster_config .namespace } ) differs from job namespace ({ namespace } )"
99+ )
100+
101+ self .cluster_name = cluster_config .name or f"{ job_name } -cluster"
102+ # Update the cluster config name if it wasn't set
103+ if not cluster_config .name :
104+ cluster_config .name = self .cluster_name
105+ else :
106+ self .cluster_name = cluster_name
64107
65108 # Initialize the KubeRay job API client
66109 self ._api = RayjobApi ()
67110
68111 logger .info (f"Initialized RayJob: { self .name } in namespace: { self .namespace } " )
69112
70- def submit (
71- self ,
72- ) -> str :
113+ def submit (self ) -> str :
73114 """
74115 Submit the Ray job to the Kubernetes cluster.
75116
76- Args:
77- entrypoint: The Python script or command to run
78- runtime_env: Ray runtime environment configuration (optional)
117+ The RayJob CRD will automatically:
118+ - Create a new cluster if cluster_config was provided
119+ - Use existing cluster if cluster_name was provided
120+ - Clean up resources based on shutdown_after_job_finishes setting
79121
80122 Returns:
81123 The job ID/name if submission was successful
82124
83125 Raises:
84- RuntimeError: If the job has already been submitted or submission fails
126+ ValueError: If entrypoint is not provided
127+ RuntimeError: If job submission fails
85128 """
129+ # Validate required parameters
130+ if not self .entrypoint :
131+ raise ValueError ("entrypoint must be provided to submit a RayJob" )
132+
86133 # Build the RayJob custom resource
87- rayjob_cr = self ._build_rayjob_cr (
88- entrypoint = self .entrypoint ,
89- runtime_env = self .runtime_env ,
90- )
134+ rayjob_cr = self ._build_rayjob_cr ()
91135
92- # Submit the job
93- logger .info (
94- f"Submitting RayJob { self .name } to RayCluster { self .cluster_name } in namespace { self .namespace } "
95- )
136+ # Submit the job - KubeRay operator handles everything else
137+ logger .info (f"Submitting RayJob { self .name } to KubeRay operator" )
96138 result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
97139
98140 if result :
99141 logger .info (f"Successfully submitted RayJob { self .name } " )
142+ if self .shutdown_after_job_finishes :
143+ logger .info (
144+ f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
145+ )
100146 return self .name
101147 else :
102148 raise RuntimeError (f"Failed to submit RayJob { self .name } " )
103149
104- def _build_rayjob_cr (
105- self ,
106- entrypoint : str ,
107- runtime_env : Optional [Dict [str , Any ]] = None ,
108- ) -> Dict [str , Any ]:
150+ def _build_rayjob_cr (self ) -> Dict [str , Any ]:
109151 """
110- Build the RayJob custom resource specification.
111-
112- This creates a minimal RayJob CR that can be extended later.
152+ Build the RayJob custom resource specification using native RayJob capabilities.
113153 """
114154 # Basic RayJob custom resource structure
115155 rayjob_cr = {
@@ -120,17 +160,75 @@ def _build_rayjob_cr(
120160 "namespace" : self .namespace ,
121161 },
122162 "spec" : {
123- "entrypoint" : entrypoint ,
124- "clusterSelector" : {"ray.io/cluster" : self .cluster_name },
163+ "entrypoint" : self .entrypoint ,
164+ "shutdownAfterJobFinishes" : self .shutdown_after_job_finishes ,
165+ "ttlSecondsAfterFinished" : self .ttl_seconds_after_finished ,
125166 },
126167 }
127168
169+ # Add active deadline if specified
170+ if self .active_deadline_seconds :
171+ rayjob_cr ["spec" ]["activeDeadlineSeconds" ] = self .active_deadline_seconds
172+
128173 # Add runtime environment if specified
129- if runtime_env :
130- rayjob_cr ["spec" ]["runtimeEnvYAML" ] = str (runtime_env )
174+ if self .runtime_env :
175+ rayjob_cr ["spec" ]["runtimeEnvYAML" ] = str (self .runtime_env )
176+
177+ # Configure cluster: either use existing or create new
178+ if self ._cluster_config is not None :
179+ # Use rayClusterSpec to create a new cluster - leverage existing build logic
180+ ray_cluster_spec = self ._build_ray_cluster_spec ()
181+ rayjob_cr ["spec" ]["rayClusterSpec" ] = ray_cluster_spec
182+ logger .info (f"RayJob will create new cluster: { self .cluster_name } " )
183+ else :
184+ # Use clusterSelector to reference existing cluster
185+ rayjob_cr ["spec" ]["clusterSelector" ] = {"ray.io/cluster" : self .cluster_name }
186+ logger .info (f"RayJob will use existing cluster: { self .cluster_name } " )
131187
132188 return rayjob_cr
133189
190+ def _build_ray_cluster_spec (self ) -> Dict [str , Any ]:
191+ """
192+ Build the RayCluster spec from ClusterConfiguration using existing build_ray_cluster logic.
193+
194+ Returns:
195+ Dict containing the RayCluster spec for embedding in RayJob
196+ """
197+ if not self ._cluster_config :
198+ raise RuntimeError ("No cluster configuration provided" )
199+
200+ # Create a shallow copy of the cluster config to avoid modifying the original
201+ import copy
202+
203+ temp_config = copy .copy (self ._cluster_config )
204+
205+ # Ensure we get a RayCluster (not AppWrapper) and don't write to file
206+ temp_config .appwrapper = False
207+ temp_config .write_to_file = False
208+
209+ # Create a minimal Cluster object for the build process
210+ from ..cluster .cluster import Cluster
211+
212+ temp_cluster = Cluster .__new__ (Cluster ) # Create without calling __init__
213+ temp_cluster .config = temp_config
214+
215+ """
216+ For now, RayJob with a new/auto-created cluster will not work with Kueue.
217+ This is due to the Kueue label not being propagated to the RayCluster.
218+ """
219+
220+ # Use the existing build_ray_cluster function to generate the RayCluster
221+ ray_cluster_dict = build_ray_cluster (temp_cluster )
222+
223+ # Extract just the RayCluster spec - RayJob CRD doesn't support metadata in rayClusterSpec
224+ # Note: CodeFlare Operator should still create dashboard routes for the RayCluster
225+ ray_cluster_spec = ray_cluster_dict ["spec" ]
226+
227+ logger .info (
228+ f"Built RayCluster spec using existing build logic for cluster: { self .cluster_name } "
229+ )
230+ return ray_cluster_spec
231+
134232 def status (
135233 self , print_to_console : bool = True
136234 ) -> Tuple [CodeflareRayJobStatus , bool ]:
0 commit comments