|
2 | 2 |
|
3 | 3 | from codeflare_sdk import Cluster, ClusterConfiguration |
4 | 4 | from codeflare_sdk.ray.rayjobs import RayJob |
| 5 | +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus |
5 | 6 |
|
6 | 7 | import pytest |
7 | 8 |
|
@@ -104,120 +105,45 @@ def assert_rayjob_submit_against_existing_cluster( |
104 | 105 | ), f"Job submission failed, expected {job_name}, got {submission_result}" |
105 | 106 | print(f"✅ Successfully submitted RayJob '{job_name}' against existing cluster") |
106 | 107 |
|
107 | | - # Monitor the job status until completion using kubectl (Kind-specific workaround) |
108 | | - self.monitor_rayjob_completion_kubectl(job_name, timeout=900) |
| 108 | + # Monitor the job status until completion |
| 109 | + self.monitor_rayjob_completion(rayjob, timeout=900) |
109 | 110 |
|
110 | 111 | print(f"✅ RayJob '{job_name}' completed successfully against existing cluster!") |
111 | 112 |
|
112 | | - def monitor_rayjob_completion_kubectl(self, job_name: str, timeout: int = 900): |
| 113 | + def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): |
113 | 114 | """ |
114 | | - Monitor a RayJob until it completes or fails using kubectl directly. |
115 | | - This is a workaround for Kind clusters where the SDK status method doesn't work. |
| 115 | + Monitor a RayJob until it completes or fails. |
116 | 116 |
|
117 | 117 | Args: |
118 | | - job_name: The name of the RayJob to monitor |
| 118 | + rayjob: The RayJob instance to monitor |
119 | 119 | timeout: Maximum time to wait in seconds (default: 15 minutes) |
120 | 120 | """ |
121 | | - import subprocess |
122 | | - import time |
123 | | - |
124 | | - print(f"⏳ Monitoring RayJob '{job_name}' status using kubectl...") |
| 121 | + print(f"⏳ Monitoring RayJob '{rayjob.name}' status...") |
125 | 122 |
|
126 | 123 | elapsed_time = 0 |
127 | 124 | check_interval = 10 # Check every 10 seconds |
128 | 125 |
|
129 | 126 | while elapsed_time < timeout: |
130 | | - try: |
131 | | - # Get RayJob status using kubectl |
132 | | - result = subprocess.run( |
133 | | - [ |
134 | | - "kubectl", |
135 | | - "get", |
136 | | - "rayjobs", |
137 | | - "-n", |
138 | | - self.namespace, |
139 | | - job_name, |
140 | | - "-o", |
141 | | - "jsonpath={.status.jobDeploymentStatus}", |
142 | | - ], |
143 | | - capture_output=True, |
144 | | - text=True, |
145 | | - timeout=10, |
146 | | - ) |
147 | | - |
148 | | - if result.returncode == 0: |
149 | | - status = result.stdout.strip() |
150 | | - |
151 | | - # Also get job status for more details |
152 | | - job_status_result = subprocess.run( |
153 | | - [ |
154 | | - "kubectl", |
155 | | - "get", |
156 | | - "rayjobs", |
157 | | - "-n", |
158 | | - self.namespace, |
159 | | - job_name, |
160 | | - "-o", |
161 | | - "jsonpath={.status.jobStatus}", |
162 | | - ], |
163 | | - capture_output=True, |
164 | | - text=True, |
165 | | - timeout=10, |
166 | | - ) |
167 | | - job_status = ( |
168 | | - job_status_result.stdout.strip() |
169 | | - if job_status_result.returncode == 0 |
170 | | - else "Unknown" |
171 | | - ) |
172 | | - |
173 | | - print( |
174 | | - f"📊 RayJob '{job_name}' - Deployment Status: {status}, Job Status: {job_status}" |
175 | | - ) |
176 | | - |
177 | | - # Check completion status |
178 | | - if status == "Complete" or job_status == "SUCCEEDED": |
179 | | - print(f"✅ RayJob '{job_name}' completed successfully!") |
180 | | - return |
181 | | - elif status == "Failed" or job_status == "FAILED": |
182 | | - # Get error details |
183 | | - try: |
184 | | - error_result = subprocess.run( |
185 | | - [ |
186 | | - "kubectl", |
187 | | - "get", |
188 | | - "rayjobs", |
189 | | - "-n", |
190 | | - self.namespace, |
191 | | - job_name, |
192 | | - "-o", |
193 | | - "yaml", |
194 | | - ], |
195 | | - capture_output=True, |
196 | | - text=True, |
197 | | - timeout=10, |
198 | | - ) |
199 | | - print( |
200 | | - f"❌ RayJob '{job_name}' failed! Details:\n{error_result.stdout}" |
201 | | - ) |
202 | | - except: |
203 | | - pass |
204 | | - raise AssertionError(f"❌ RayJob '{job_name}' failed!") |
205 | | - elif status == "Running" or job_status == "RUNNING": |
206 | | - print(f"🏃 RayJob '{job_name}' is still running...") |
207 | | - else: |
208 | | - print(f"⏳ RayJob '{job_name}' status: {status}") |
209 | | - |
210 | | - else: |
211 | | - print(f"❌ Could not get RayJob status: {result.stderr}") |
212 | | - |
213 | | - except Exception as e: |
214 | | - print(f"❌ Error checking RayJob status: {e}") |
| 127 | + status, ready = rayjob.status(print_to_console=True) |
| 128 | + |
| 129 | + # Check if job has completed (either successfully or failed) |
| 130 | + if status == CodeflareRayJobStatus.COMPLETE: |
| 131 | + print(f"✅ RayJob '{rayjob.name}' completed successfully!") |
| 132 | + return |
| 133 | + elif status == CodeflareRayJobStatus.FAILED: |
| 134 | + raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!") |
| 135 | + elif status == CodeflareRayJobStatus.RUNNING: |
| 136 | + print(f"🏃 RayJob '{rayjob.name}' is still running...") |
| 137 | + elif status == CodeflareRayJobStatus.UNKNOWN: |
| 138 | + print(f"❓ RayJob '{rayjob.name}' status is unknown") |
215 | 139 |
|
216 | 140 | # Wait before next check |
217 | 141 | sleep(check_interval) |
218 | 142 | elapsed_time += check_interval |
219 | 143 |
|
220 | 144 | # If we reach here, the job has timed out |
| 145 | + final_status, _ = rayjob.status(print_to_console=True) |
221 | 146 | raise TimeoutError( |
222 | | - f"⏰ RayJob '{job_name}' did not complete within {timeout} seconds." |
| 147 | + f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. " |
| 148 | + f"Final status: {final_status}" |
223 | 149 | ) |
0 commit comments