@@ -159,23 +159,24 @@ def submit(self) -> str:
159
159
160
160
self ._validate_ray_version_compatibility ()
161
161
162
- # Automatically handle script files for new clusters
163
- if self ._cluster_config is not None :
164
- scripts = self ._extract_script_files_from_entrypoint ()
165
- if scripts :
166
- self ._handle_script_volumes_for_new_cluster (scripts )
167
- elif self ._cluster_name :
168
- scripts = self ._extract_script_files_from_entrypoint ()
169
- if scripts :
170
- self ._handle_script_volumes_for_existing_cluster (scripts )
171
-
172
162
rayjob_cr = self ._build_rayjob_cr ()
173
163
174
164
logger .info (f"Submitting RayJob { self .name } to Kuberay operator" )
175
165
result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
176
166
177
167
if result :
178
168
logger .info (f"Successfully submitted RayJob { self .name } " )
169
+
170
+ # Handle script files after RayJob creation so we can set owner reference
171
+ if self ._cluster_config is not None :
172
+ scripts = self ._extract_script_files_from_entrypoint ()
173
+ if scripts :
174
+ self ._handle_script_volumes_for_new_cluster (scripts , result )
175
+ elif self ._cluster_name :
176
+ scripts = self ._extract_script_files_from_entrypoint ()
177
+ if scripts :
178
+ self ._handle_script_volumes_for_existing_cluster (scripts , result )
179
+
179
180
if self .shutdown_after_job_finishes :
180
181
logger .info (
181
182
f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
@@ -205,6 +206,17 @@ def resubmit(self):
205
206
else :
206
207
raise RuntimeError (f"Failed to resubmit the RayJob { self .name } " )
207
208
209
+ def delete (self ):
210
+ """
211
+ Delete the Ray job.
212
+ """
213
+ deleted = self ._api .delete_job (name = self .name , k8s_namespace = self .namespace )
214
+ if deleted :
215
+ logger .info (f"Successfully deleted the RayJob { self .name } " )
216
+ return True
217
+ else :
218
+ raise RuntimeError (f"Failed to delete the RayJob { self .name } " )
219
+
208
220
def _build_rayjob_cr (self ) -> Dict [str , Any ]:
209
221
"""
210
222
Build the RayJob custom resource specification using native RayJob capabilities.
@@ -464,7 +476,9 @@ def _find_local_imports(
464
476
except (SyntaxError , ValueError ) as e :
465
477
logger .debug (f"Could not parse imports from { script_path } : { e } " )
466
478
467
- def _handle_script_volumes_for_new_cluster (self , scripts : Dict [str , str ]):
479
+ def _handle_script_volumes_for_new_cluster (
480
+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
481
+ ):
468
482
"""Handle script volumes for new clusters (uses ManagedClusterConfig)."""
469
483
# Validate ConfigMap size before creation
470
484
self ._cluster_config .validate_configmap_size (scripts )
@@ -474,15 +488,17 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]):
474
488
job_name = self .name , namespace = self .namespace , scripts = scripts
475
489
)
476
490
477
- # Create ConfigMap via Kubernetes API
478
- configmap_name = self ._create_configmap_from_spec (configmap_spec )
491
+ # Create ConfigMap via Kubernetes API with owner reference
492
+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
479
493
480
494
# Add volumes to cluster config (config.py handles spec building)
481
495
self ._cluster_config .add_script_volumes (
482
496
configmap_name = configmap_name , mount_path = MOUNT_PATH
483
497
)
484
498
485
- def _handle_script_volumes_for_existing_cluster (self , scripts : Dict [str , str ]):
499
+ def _handle_script_volumes_for_existing_cluster (
500
+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
501
+ ):
486
502
"""Handle script volumes for existing clusters (updates RayCluster CR)."""
487
503
# Create config builder for utility methods
488
504
config_builder = ManagedClusterConfig ()
@@ -495,28 +511,57 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]):
495
511
job_name = self .name , namespace = self .namespace , scripts = scripts
496
512
)
497
513
498
- # Create ConfigMap via Kubernetes API
499
- configmap_name = self ._create_configmap_from_spec (configmap_spec )
514
+ # Create ConfigMap via Kubernetes API with owner reference
515
+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
500
516
501
517
# Update existing RayCluster
502
518
self ._update_existing_cluster_for_scripts (configmap_name , config_builder )
503
519
504
- def _create_configmap_from_spec (self , configmap_spec : Dict [str , Any ]) -> str :
520
+ def _create_configmap_from_spec (
521
+ self , configmap_spec : Dict [str , Any ], rayjob_result : Dict [str , Any ] = None
522
+ ) -> str :
505
523
"""
506
524
Create ConfigMap from specification via Kubernetes API.
507
525
508
526
Args:
509
527
configmap_spec: ConfigMap specification dictionary
528
+ rayjob_result: The result from RayJob creation containing UID
510
529
511
530
Returns:
512
531
str: Name of the created ConfigMap
513
532
"""
514
533
515
534
configmap_name = configmap_spec ["metadata" ]["name" ]
516
535
536
+ metadata = client .V1ObjectMeta (** configmap_spec ["metadata" ])
537
+
538
+ # Add owner reference if we have the RayJob result
539
+ if (
540
+ rayjob_result
541
+ and isinstance (rayjob_result , dict )
542
+ and rayjob_result .get ("metadata" , {}).get ("uid" )
543
+ ):
544
+ logger .info (
545
+ f"Adding owner reference to ConfigMap '{ configmap_name } ' with RayJob UID: { rayjob_result ['metadata' ]['uid' ]} "
546
+ )
547
+ metadata .owner_references = [
548
+ client .V1OwnerReference (
549
+ api_version = "ray.io/v1" ,
550
+ kind = "RayJob" ,
551
+ name = self .name ,
552
+ uid = rayjob_result ["metadata" ]["uid" ],
553
+ controller = True ,
554
+ block_owner_deletion = True ,
555
+ )
556
+ ]
557
+ else :
558
+ logger .warning (
559
+ f"No valid RayJob result with UID found, ConfigMap '{ configmap_name } ' will not have owner reference. Result: { rayjob_result } "
560
+ )
561
+
517
562
# Convert dict spec to V1ConfigMap
518
563
configmap = client .V1ConfigMap (
519
- metadata = client . V1ObjectMeta ( ** configmap_spec [ " metadata" ]) ,
564
+ metadata = metadata ,
520
565
data = configmap_spec ["data" ],
521
566
)
522
567
0 commit comments