@@ -114,3 +114,54 @@ def to_dict(self) -> Dict[str, Any]:
114
114
"driver_info" : self .spec .driver_info ,
115
115
} if self .status is None else self .status
116
116
}
117
+
118
+ def apply (self , force = False ):
119
+ """
120
+ Applies the RayJob using server-side apply.
121
+ If 'force' is set to True, conflicts will be forced.
122
+
123
+ Args:
124
+ force (bool): If True, force conflicts during server-side apply.
125
+ """
126
+ from kubernetes import client
127
+ from kubernetes .dynamic import DynamicClient
128
+ from ...common .kubernetes_cluster .auth import get_api_client , config_check
129
+ from ...common import _kube_api_error_handling
130
+
131
+ CF_SDK_FIELD_MANAGER = "codeflare-sdk"
132
+
133
+ try :
134
+ # Check Kubernetes configuration
135
+ config_check ()
136
+
137
+ # Get the dynamic client
138
+ crds = DynamicClient (get_api_client ()).resources
139
+
140
+ # Get the RayJob API instance
141
+ api_version = "ray.io/v1"
142
+ api_instance = crds .get (api_version = api_version , kind = "RayJob" )
143
+
144
+ # Get namespace from metadata
145
+ namespace = self .metadata .get ("namespace" , "default" )
146
+ name = self .metadata .get ("name" )
147
+
148
+ # Convert job to dictionary
149
+ body = self .to_dict ()
150
+
151
+ # Apply the job using server-side apply
152
+ api_instance .server_side_apply (
153
+ field_manager = CF_SDK_FIELD_MANAGER ,
154
+ group = "ray.io" ,
155
+ version = "v1" ,
156
+ namespace = namespace ,
157
+ plural = "rayjobs" ,
158
+ body = body ,
159
+ force_conflicts = force ,
160
+ )
161
+
162
+ print (f"RayJob: '{ name } ' has successfully been applied" )
163
+
164
+ except AttributeError as e :
165
+ raise RuntimeError (f"Failed to initialize DynamicClient: { e } " )
166
+ except Exception as e :
167
+ return _kube_api_error_handling (e )
0 commit comments