Skip to content

Commit d259ec1

Browse files
committed
RHOAIENG-32532: Update kueue integration
1 parent 538d345 commit d259ec1

File tree

9 files changed

+780
-1033
lines changed

9 files changed

+780
-1033
lines changed

src/codeflare_sdk/ray/rayjobs/config.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,8 @@ class ManagedClusterConfig:
131131
accelerator_configs:
132132
A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names.
133133
Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings.
134-
local_queue:
135-
The name of the queue to use for the cluster.
136134
annotations:
137-
A dictionary of annotations to apply to the cluster.
135+
A dictionary of annotations to apply to the Job.
138136
volumes:
139137
A list of V1Volume objects to add to the Cluster
140138
volume_mounts:
@@ -161,7 +159,6 @@ class ManagedClusterConfig:
161159
accelerator_configs: Dict[str, str] = field(
162160
default_factory=lambda: DEFAULT_ACCELERATORS.copy()
163161
)
164-
local_queue: Optional[str] = None
165162
annotations: Dict[str, str] = field(default_factory=dict)
166163
volumes: list[V1Volume] = field(default_factory=list)
167164
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
@@ -248,7 +245,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
248245
"""
249246
ray_cluster_spec = {
250247
"rayVersion": RAY_VERSION,
251-
"enableInTreeAutoscaling": False,
252248
"headGroupSpec": self._build_head_group_spec(),
253249
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
254250
}
@@ -346,12 +342,9 @@ def _build_head_container(self) -> V1Container:
346342
self.head_accelerators,
347343
),
348344
volume_mounts=self._generate_volume_mounts(),
345+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
349346
)
350347

351-
# Add environment variables if specified
352-
if hasattr(self, "envs") and self.envs:
353-
container.env = self._build_env_vars()
354-
355348
return container
356349

357350
def _build_worker_container(self) -> V1Container:
@@ -373,12 +366,9 @@ def _build_worker_container(self) -> V1Container:
373366
self.worker_accelerators,
374367
),
375368
volume_mounts=self._generate_volume_mounts(),
369+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
376370
)
377371

378-
# Add environment variables if specified
379-
if hasattr(self, "envs") and self.envs:
380-
container.env = self._build_env_vars()
381-
382372
return container
383373

384374
def _build_resource_requirements(

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import re
2323
import ast
2424
from typing import Dict, Any, Optional, Tuple
25+
from codeflare_sdk.common.kueue.kueue import get_default_kueue_name
2526
from codeflare_sdk.common.utils.constants import MOUNT_PATH
2627
from kubernetes import client
2728
from ...common.kubernetes_cluster.auth import get_api_client
@@ -59,9 +60,9 @@ def __init__(
5960
cluster_config: Optional[ManagedClusterConfig] = None,
6061
namespace: Optional[str] = None,
6162
runtime_env: Optional[Dict[str, Any]] = None,
62-
shutdown_after_job_finishes: Optional[bool] = None,
6363
ttl_seconds_after_finished: int = 0,
6464
active_deadline_seconds: Optional[int] = None,
65+
local_queue: Optional[str] = None,
6566
):
6667
"""
6768
Initialize a RayJob instance.
@@ -73,12 +74,11 @@ def __init__(
7374
cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
7475
namespace: The Kubernetes namespace (auto-detected if not specified)
7576
runtime_env: Ray runtime environment configuration (optional)
76-
shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional)
7777
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
7878
active_deadline_seconds: Maximum time the job can run before being terminated (optional)
79+
local_queue: The Kueue LocalQueue to submit the job to (optional)
7980
8081
Note:
81-
shutdown_after_job_finishes is automatically detected but can be overridden:
8282
- True if cluster_config is provided (new cluster will be cleaned up)
8383
- False if cluster_name is provided (existing cluster will not be shut down)
8484
- User can explicitly set this value to override auto-detection
@@ -108,17 +108,7 @@ def __init__(
108108
self.runtime_env = runtime_env
109109
self.ttl_seconds_after_finished = ttl_seconds_after_finished
110110
self.active_deadline_seconds = active_deadline_seconds
111-
112-
# Auto-set shutdown_after_job_finishes based on cluster_config presence
113-
# If cluster_config is provided, we want to clean up the cluster after job finishes
114-
# If using existing cluster, we don't want to shut it down
115-
# User can override this behavior by explicitly setting shutdown_after_job_finishes
116-
if shutdown_after_job_finishes is not None:
117-
self.shutdown_after_job_finishes = shutdown_after_job_finishes
118-
elif cluster_config is not None:
119-
self.shutdown_after_job_finishes = True
120-
else:
121-
self.shutdown_after_job_finishes = False
111+
self.local_queue = local_queue
122112

123113
if namespace is None:
124114
detected_namespace = get_current_namespace()
@@ -177,10 +167,6 @@ def submit(self) -> str:
177167
if scripts:
178168
self._handle_script_volumes_for_existing_cluster(scripts, result)
179169

180-
if self.shutdown_after_job_finishes:
181-
logger.info(
182-
f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion"
183-
)
184170
return self.name
185171
else:
186172
raise RuntimeError(f"Failed to submit RayJob {self.name}")
@@ -230,11 +216,32 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
230216
},
231217
"spec": {
232218
"entrypoint": self.entrypoint,
233-
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
234219
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
220+
"shutdownAfterJobFinishes": self._cluster_config is not None,
235221
},
236222
}
237223

224+
labels = {}
225+
# If cluster_config is provided, use the local_queue from the cluster_config
226+
if self._cluster_config is not None:
227+
if self.local_queue:
228+
labels["kueue.x-k8s.io/queue-name"] = self.local_queue
229+
else:
230+
default_queue = get_default_kueue_name(self.namespace)
231+
if default_queue:
232+
labels["kueue.x-k8s.io/queue-name"] = default_queue
233+
else:
234+
# No default queue found, use "default" as fallback
235+
labels["kueue.x-k8s.io/queue-name"] = "default"
236+
logger.warning(
237+
f"No default Kueue LocalQueue found in namespace '{self.namespace}'. "
238+
f"Using 'default' as the queue name. If a LocalQueue named 'default' "
239+
f"does not exist, the RayJob submission will fail. "
240+
f"To fix this, please explicitly specify the 'local_queue' parameter."
241+
)
242+
243+
rayjob_cr["metadata"]["labels"] = labels
244+
238245
# Add active deadline if specified
239246
if self.active_deadline_seconds:
240247
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds

0 commit comments

Comments
 (0)