@@ -160,23 +160,24 @@ def submit(self) -> str:
160
160
161
161
self ._validate_ray_version_compatibility ()
162
162
163
- # Automatically handle script files for new clusters
164
- if self ._cluster_config is not None :
165
- scripts = self ._extract_script_files_from_entrypoint ()
166
- if scripts :
167
- self ._handle_script_volumes_for_new_cluster (scripts )
168
- elif self ._cluster_name :
169
- scripts = self ._extract_script_files_from_entrypoint ()
170
- if scripts :
171
- self ._handle_script_volumes_for_existing_cluster (scripts )
172
-
173
163
rayjob_cr = self ._build_rayjob_cr ()
174
164
175
165
logger .info (f"Submitting RayJob { self .name } to Kuberay operator" )
176
166
result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
177
167
178
168
if result :
179
169
logger .info (f"Successfully submitted RayJob { self .name } " )
170
+
171
+ # Handle script files after RayJob creation so we can set owner reference
172
+ if self ._cluster_config is not None :
173
+ scripts = self ._extract_script_files_from_entrypoint ()
174
+ if scripts :
175
+ self ._handle_script_volumes_for_new_cluster (scripts , result )
176
+ elif self ._cluster_name :
177
+ scripts = self ._extract_script_files_from_entrypoint ()
178
+ if scripts :
179
+ self ._handle_script_volumes_for_existing_cluster (scripts , result )
180
+
180
181
if self .shutdown_after_job_finishes :
181
182
logger .info (
182
183
f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
@@ -206,6 +207,17 @@ def resubmit(self):
206
207
else :
207
208
raise RuntimeError (f"Failed to resubmit the RayJob { self .name } " )
208
209
210
+ def delete (self ):
211
+ """
212
+ Delete the Ray job.
213
+ """
214
+ deleted = self ._api .delete_job (name = self .name , k8s_namespace = self .namespace )
215
+ if deleted :
216
+ logger .info (f"Successfully deleted the RayJob { self .name } " )
217
+ return True
218
+ else :
219
+ raise RuntimeError (f"Failed to delete the RayJob { self .name } " )
220
+
209
221
def _build_rayjob_cr (self ) -> Dict [str , Any ]:
210
222
"""
211
223
Build the RayJob custom resource specification using native RayJob capabilities.
@@ -466,7 +478,9 @@ def _find_local_imports(
466
478
except (SyntaxError , ValueError ) as e :
467
479
logger .debug (f"Could not parse imports from { script_path } : { e } " )
468
480
469
- def _handle_script_volumes_for_new_cluster (self , scripts : Dict [str , str ]):
481
+ def _handle_script_volumes_for_new_cluster (
482
+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
483
+ ):
470
484
"""Handle script volumes for new clusters (uses ManagedClusterConfig)."""
471
485
# Validate ConfigMap size before creation
472
486
self ._cluster_config .validate_configmap_size (scripts )
@@ -476,15 +490,17 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]):
476
490
job_name = self .name , namespace = self .namespace , scripts = scripts
477
491
)
478
492
479
- # Create ConfigMap via Kubernetes API
480
- configmap_name = self ._create_configmap_from_spec (configmap_spec )
493
+ # Create ConfigMap via Kubernetes API with owner reference
494
+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
481
495
482
496
# Add volumes to cluster config (config.py handles spec building)
483
497
self ._cluster_config .add_script_volumes (
484
498
configmap_name = configmap_name , mount_path = "/home/ray/scripts"
485
499
)
486
500
487
- def _handle_script_volumes_for_existing_cluster (self , scripts : Dict [str , str ]):
501
+ def _handle_script_volumes_for_existing_cluster (
502
+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
503
+ ):
488
504
"""Handle script volumes for existing clusters (updates RayCluster CR)."""
489
505
# Create config builder for utility methods
490
506
config_builder = ManagedClusterConfig ()
@@ -497,28 +513,57 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]):
497
513
job_name = self .name , namespace = self .namespace , scripts = scripts
498
514
)
499
515
500
- # Create ConfigMap via Kubernetes API
501
- configmap_name = self ._create_configmap_from_spec (configmap_spec )
516
+ # Create ConfigMap via Kubernetes API with owner reference
517
+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
502
518
503
519
# Update existing RayCluster
504
520
self ._update_existing_cluster_for_scripts (configmap_name , config_builder )
505
521
506
- def _create_configmap_from_spec (self , configmap_spec : Dict [str , Any ]) -> str :
522
+ def _create_configmap_from_spec (
523
+ self , configmap_spec : Dict [str , Any ], rayjob_result : Dict [str , Any ] = None
524
+ ) -> str :
507
525
"""
508
526
Create ConfigMap from specification via Kubernetes API.
509
527
510
528
Args:
511
529
configmap_spec: ConfigMap specification dictionary
530
+ rayjob_result: The result from RayJob creation containing UID
512
531
513
532
Returns:
514
533
str: Name of the created ConfigMap
515
534
"""
516
535
517
536
configmap_name = configmap_spec ["metadata" ]["name" ]
518
537
538
+ metadata = client .V1ObjectMeta (** configmap_spec ["metadata" ])
539
+
540
+ # Add owner reference if we have the RayJob result
541
+ if (
542
+ rayjob_result
543
+ and isinstance (rayjob_result , dict )
544
+ and rayjob_result .get ("metadata" , {}).get ("uid" )
545
+ ):
546
+ logger .info (
547
+ f"Adding owner reference to ConfigMap '{ configmap_name } ' with RayJob UID: { rayjob_result ['metadata' ]['uid' ]} "
548
+ )
549
+ metadata .owner_references = [
550
+ client .V1OwnerReference (
551
+ api_version = "ray.io/v1" ,
552
+ kind = "RayJob" ,
553
+ name = self .name ,
554
+ uid = rayjob_result ["metadata" ]["uid" ],
555
+ controller = True ,
556
+ block_owner_deletion = True ,
557
+ )
558
+ ]
559
+ else :
560
+ logger .warning (
561
+ f"No valid RayJob result with UID found, ConfigMap '{ configmap_name } ' will not have owner reference. Result: { rayjob_result } "
562
+ )
563
+
519
564
# Convert dict spec to V1ConfigMap
520
565
configmap = client .V1ConfigMap (
521
- metadata = client . V1ObjectMeta ( ** configmap_spec [ " metadata" ]) ,
566
+ metadata = metadata ,
522
567
data = configmap_spec ["data" ],
523
568
)
524
569
0 commit comments