20
20
from typing import Dict , Any , Optional , Tuple
21
21
from odh_kuberay_client .kuberay_job_api import RayjobApi
22
22
23
+ from ..cluster .cluster import Cluster
24
+ from ..cluster .config import ClusterConfiguration
25
+ from ..cluster .build_ray_cluster import build_ray_cluster
26
+
23
27
from .status import (
24
28
RayJobDeploymentStatus ,
25
29
CodeflareRayJobStatus ,
26
30
RayJobInfo ,
27
31
)
28
32
from . import pretty_print
29
33
30
- # Set up logging
34
+
31
35
logger = logging .getLogger (__name__ )
32
36
33
37
@@ -42,74 +46,110 @@ class RayJob:
42
46
def __init__ (
43
47
self ,
44
48
job_name : str ,
45
- cluster_name : str ,
49
+ cluster_name : Optional [str ] = None ,
50
+ cluster_config : Optional [ClusterConfiguration ] = None ,
46
51
namespace : str = "default" ,
47
- entrypoint : str = " None" ,
52
+ entrypoint : Optional [ str ] = None ,
48
53
runtime_env : Optional [Dict [str , Any ]] = None ,
54
+ shutdown_after_job_finishes : bool = True ,
55
+ ttl_seconds_after_finished : int = 0 ,
56
+ active_deadline_seconds : Optional [int ] = None ,
49
57
):
50
58
"""
51
59
Initialize a RayJob instance.
52
60
53
61
Args:
54
- name: The name for the Ray job
55
- namespace: The Kubernetes namespace to submit the job to (default: "default")
56
- cluster_name: The name of the Ray cluster to submit the job to
57
- **kwargs: Additional configuration options
62
+ job_name: The name for the Ray job
63
+ cluster_name: The name of an existing Ray cluster (optional if cluster_config provided)
64
+ cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
65
+ namespace: The Kubernetes namespace (default: "default")
66
+ entrypoint: The Python script or command to run (required for submission)
67
+ runtime_env: Ray runtime environment configuration (optional)
68
+ shutdown_after_job_finishes: Whether to automatically cleanup the cluster after job completion (default: True)
69
+ ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
70
+ active_deadline_seconds: Maximum time the job can run before being terminated (optional)
58
71
"""
72
+ # Validate input parameters
73
+ if cluster_name is None and cluster_config is None :
74
+ raise ValueError ("Either cluster_name or cluster_config must be provided" )
75
+
76
+ if cluster_name is not None and cluster_config is not None :
77
+ raise ValueError ("Cannot specify both cluster_name and cluster_config" )
78
+
59
79
self .name = job_name
60
80
self .namespace = namespace
61
- self .cluster_name = cluster_name
62
81
self .entrypoint = entrypoint
63
82
self .runtime_env = runtime_env
83
+ self .shutdown_after_job_finishes = shutdown_after_job_finishes
84
+ self .ttl_seconds_after_finished = ttl_seconds_after_finished
85
+ self .active_deadline_seconds = active_deadline_seconds
86
+
87
+ # Cluster configuration
88
+ self ._cluster_name = cluster_name
89
+ self ._cluster_config = cluster_config
90
+
91
+ # Determine cluster name for the job
92
+ if cluster_config is not None :
93
+ # Ensure cluster config has the same namespace as the job
94
+ if cluster_config .namespace is None :
95
+ cluster_config .namespace = namespace
96
+ elif cluster_config .namespace != namespace :
97
+ logger .warning (
98
+ f"Cluster config namespace ({ cluster_config .namespace } ) differs from job namespace ({ namespace } )"
99
+ )
100
+
101
+ self .cluster_name = cluster_config .name or f"{ job_name } -cluster"
102
+ # Update the cluster config name if it wasn't set
103
+ if not cluster_config .name :
104
+ cluster_config .name = self .cluster_name
105
+ else :
106
+ self .cluster_name = cluster_name
64
107
65
108
# Initialize the KubeRay job API client
66
109
self ._api = RayjobApi ()
67
110
68
111
logger .info (f"Initialized RayJob: { self .name } in namespace: { self .namespace } " )
69
112
70
- def submit (
71
- self ,
72
- ) -> str :
113
+ def submit (self ) -> str :
73
114
"""
74
115
Submit the Ray job to the Kubernetes cluster.
75
116
76
- Args:
77
- entrypoint: The Python script or command to run
78
- runtime_env: Ray runtime environment configuration (optional)
117
+ The RayJob CRD will automatically:
118
+ - Create a new cluster if cluster_config was provided
119
+ - Use existing cluster if cluster_name was provided
120
+ - Clean up resources based on shutdown_after_job_finishes setting
79
121
80
122
Returns:
81
123
The job ID/name if submission was successful
82
124
83
125
Raises:
84
- RuntimeError: If the job has already been submitted or submission fails
126
+ ValueError: If entrypoint is not provided
127
+ RuntimeError: If job submission fails
85
128
"""
129
+ # Validate required parameters
130
+ if not self .entrypoint :
131
+ raise ValueError ("entrypoint must be provided to submit a RayJob" )
132
+
86
133
# Build the RayJob custom resource
87
- rayjob_cr = self ._build_rayjob_cr (
88
- entrypoint = self .entrypoint ,
89
- runtime_env = self .runtime_env ,
90
- )
134
+ rayjob_cr = self ._build_rayjob_cr ()
91
135
92
- # Submit the job
93
- logger .info (
94
- f"Submitting RayJob { self .name } to RayCluster { self .cluster_name } in namespace { self .namespace } "
95
- )
136
+ # Submit the job - KubeRay operator handles everything else
137
+ logger .info (f"Submitting RayJob { self .name } to KubeRay operator" )
96
138
result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
97
139
98
140
if result :
99
141
logger .info (f"Successfully submitted RayJob { self .name } " )
142
+ if self .shutdown_after_job_finishes :
143
+ logger .info (
144
+ f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
145
+ )
100
146
return self .name
101
147
else :
102
148
raise RuntimeError (f"Failed to submit RayJob { self .name } " )
103
149
104
- def _build_rayjob_cr (
105
- self ,
106
- entrypoint : str ,
107
- runtime_env : Optional [Dict [str , Any ]] = None ,
108
- ) -> Dict [str , Any ]:
150
+ def _build_rayjob_cr (self ) -> Dict [str , Any ]:
109
151
"""
110
- Build the RayJob custom resource specification.
111
-
112
- This creates a minimal RayJob CR that can be extended later.
152
+ Build the RayJob custom resource specification using native RayJob capabilities.
113
153
"""
114
154
# Basic RayJob custom resource structure
115
155
rayjob_cr = {
@@ -120,17 +160,75 @@ def _build_rayjob_cr(
120
160
"namespace" : self .namespace ,
121
161
},
122
162
"spec" : {
123
- "entrypoint" : entrypoint ,
124
- "clusterSelector" : {"ray.io/cluster" : self .cluster_name },
163
+ "entrypoint" : self .entrypoint ,
164
+ "shutdownAfterJobFinishes" : self .shutdown_after_job_finishes ,
165
+ "ttlSecondsAfterFinished" : self .ttl_seconds_after_finished ,
125
166
},
126
167
}
127
168
169
+ # Add active deadline if specified
170
+ if self .active_deadline_seconds :
171
+ rayjob_cr ["spec" ]["activeDeadlineSeconds" ] = self .active_deadline_seconds
172
+
128
173
# Add runtime environment if specified
129
- if runtime_env :
130
- rayjob_cr ["spec" ]["runtimeEnvYAML" ] = str (runtime_env )
174
+ if self .runtime_env :
175
+ rayjob_cr ["spec" ]["runtimeEnvYAML" ] = str (self .runtime_env )
176
+
177
+ # Configure cluster: either use existing or create new
178
+ if self ._cluster_config is not None :
179
+ # Use rayClusterSpec to create a new cluster - leverage existing build logic
180
+ ray_cluster_spec = self ._build_ray_cluster_spec ()
181
+ rayjob_cr ["spec" ]["rayClusterSpec" ] = ray_cluster_spec
182
+ logger .info (f"RayJob will create new cluster: { self .cluster_name } " )
183
+ else :
184
+ # Use clusterSelector to reference existing cluster
185
+ rayjob_cr ["spec" ]["clusterSelector" ] = {"ray.io/cluster" : self .cluster_name }
186
+ logger .info (f"RayJob will use existing cluster: { self .cluster_name } " )
131
187
132
188
return rayjob_cr
133
189
190
+ def _build_ray_cluster_spec (self ) -> Dict [str , Any ]:
191
+ """
192
+ Build the RayCluster spec from ClusterConfiguration using existing build_ray_cluster logic.
193
+
194
+ Returns:
195
+ Dict containing the RayCluster spec for embedding in RayJob
196
+ """
197
+ if not self ._cluster_config :
198
+ raise RuntimeError ("No cluster configuration provided" )
199
+
200
+ # Create a shallow copy of the cluster config to avoid modifying the original
201
+ import copy
202
+
203
+ temp_config = copy .copy (self ._cluster_config )
204
+
205
+ # Ensure we get a RayCluster (not AppWrapper) and don't write to file
206
+ temp_config .appwrapper = False
207
+ temp_config .write_to_file = False
208
+
209
+ # Create a minimal Cluster object for the build process
210
+ from ..cluster .cluster import Cluster
211
+
212
+ temp_cluster = Cluster .__new__ (Cluster ) # Create without calling __init__
213
+ temp_cluster .config = temp_config
214
+
215
+ """
216
+ For now, RayJob with a new/auto-created cluster will not work with Kueue.
217
+ This is due to the Kueue label not being propagated to the RayCluster.
218
+ """
219
+
220
+ # Use the existing build_ray_cluster function to generate the RayCluster
221
+ ray_cluster_dict = build_ray_cluster (temp_cluster )
222
+
223
+ # Extract just the RayCluster spec - RayJob CRD doesn't support metadata in rayClusterSpec
224
+ # Note: CodeFlare Operator should still create dashboard routes for the RayCluster
225
+ ray_cluster_spec = ray_cluster_dict ["spec" ]
226
+
227
+ logger .info (
228
+ f"Built RayCluster spec using existing build logic for cluster: { self .cluster_name } "
229
+ )
230
+ return ray_cluster_spec
231
+
134
232
def status (
135
233
self , print_to_console : bool = True
136
234
) -> Tuple [CodeflareRayJobStatus , bool ]:
0 commit comments