Skip to content

Commit 4350b28

Browse files
test: e2e rayjob
1 parent 8b74dbc commit 4350b28

File tree

4 files changed

+454
-0
lines changed

4 files changed

+454
-0
lines changed

tests/e2e/rayjob/__init__.py

Whitespace-only changes.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import pytest
2+
import sys
3+
import os
4+
from time import sleep
5+
6+
# Add the parent directory to the path to import support
7+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
8+
from support import *
9+
10+
from codeflare_sdk import (
11+
Cluster,
12+
ClusterConfiguration,
13+
TokenAuthentication,
14+
)
15+
from codeflare_sdk.ray.rayjobs import RayJob
16+
from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus
17+
18+
# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on OpenShift
19+
20+
21+
@pytest.mark.openshift
22+
class TestRayJobExistingClusterOauth:
23+
def setup_method(self):
24+
initialize_kubernetes_client(self)
25+
26+
def teardown_method(self):
27+
delete_namespace(self)
28+
delete_kueue_resources(self)
29+
30+
def test_rayjob_against_existing_cluster_oauth(self):
31+
self.setup_method()
32+
create_namespace(self)
33+
create_kueue_resources(self)
34+
self.run_rayjob_against_existing_cluster_oauth()
35+
36+
def run_rayjob_against_existing_cluster_oauth(self):
37+
ray_image = get_ray_image()
38+
39+
auth = TokenAuthentication(
40+
token=run_oc_command(["whoami", "--show-token=true"]),
41+
server=run_oc_command(["whoami", "--show-server=true"]),
42+
skip_tls=True,
43+
)
44+
auth.login()
45+
46+
cluster_name = "existing-cluster"
47+
48+
cluster = Cluster(
49+
ClusterConfiguration(
50+
name=cluster_name,
51+
namespace=self.namespace,
52+
num_workers=1,
53+
head_cpu_requests="500m",
54+
head_cpu_limits="500m",
55+
worker_cpu_requests=1,
56+
worker_cpu_limits=1,
57+
worker_memory_requests=1,
58+
worker_memory_limits=4,
59+
image=ray_image,
60+
write_to_file=True,
61+
verify_tls=False,
62+
)
63+
)
64+
65+
cluster.apply()
66+
cluster.status()
67+
cluster.wait_ready()
68+
cluster.status()
69+
cluster.details()
70+
71+
print(f"Ray cluster '{cluster_name}' is ready!")
72+
73+
job_name = "existing-cluster-rayjob"
74+
75+
rayjob = RayJob(
76+
job_name=job_name,
77+
cluster_name=cluster_name,
78+
namespace=self.namespace,
79+
entrypoint="python -c \"import ray; ray.init(); print('Hello from RayJob!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"",
80+
runtime_env={
81+
"pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"],
82+
"env_vars": get_setup_env_variables(ACCELERATOR="cpu"),
83+
},
84+
shutdown_after_job_finishes=False,
85+
)
86+
87+
# Submit the job
88+
print(
89+
f"Submitting RayJob '{job_name}' against existing cluster '{cluster_name}'"
90+
)
91+
submission_result = rayjob.submit()
92+
assert (
93+
submission_result == job_name
94+
), f"Job submission failed, expected {job_name}, got {submission_result}"
95+
print(f"Successfully submitted RayJob '{job_name}'!")
96+
97+
# Monitor the job status until completion
98+
self.monitor_rayjob_completion(rayjob)
99+
100+
# Cleanup - manually tear down the cluster since job won't do it
101+
print("🧹 Cleaning up Ray cluster")
102+
cluster.down()
103+
104+
def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900):
105+
"""
106+
Monitor a RayJob until it completes or fails.
107+
Args:
108+
rayjob: The RayJob instance to monitor
109+
timeout: Maximum time to wait in seconds (default: 5 minutes)
110+
"""
111+
print(f"Monitoring RayJob '{rayjob.name}' status...")
112+
113+
elapsed_time = 0
114+
check_interval = 10 # Check every 10 seconds
115+
116+
while elapsed_time < timeout:
117+
status, ready = rayjob.status(print_to_console=True)
118+
119+
# Check if job has completed (either successfully or failed)
120+
if status == CodeflareRayJobStatus.COMPLETE:
121+
print(f"RayJob '{rayjob.name}' completed successfully!")
122+
return
123+
elif status == CodeflareRayJobStatus.FAILED:
124+
raise AssertionError(f"RayJob '{rayjob.name}' failed!")
125+
elif status == CodeflareRayJobStatus.RUNNING:
126+
print(f"RayJob '{rayjob.name}' is still running...")
127+
elif status == CodeflareRayJobStatus.UNKNOWN:
128+
print(f"RayJob '{rayjob.name}' status is unknown")
129+
130+
# Wait before next check
131+
sleep(check_interval)
132+
elapsed_time += check_interval
133+
134+
# If we reach here, the job has timed out
135+
final_status, _ = rayjob.status(print_to_console=True)
136+
raise TimeoutError(
137+
f"RayJob '{rayjob.name}' did not complete within {timeout} seconds. "
138+
f"Final status: {final_status}"
139+
)
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import pytest
2+
import sys
3+
import os
4+
from time import sleep
5+
6+
# Add the parent directory to the path to import support
7+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
8+
from support import *
9+
10+
from codeflare_sdk import (
11+
TokenAuthentication,
12+
RayJob,
13+
ManagedClusterConfig,
14+
)
15+
from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus
16+
17+
# This test creates a RayJob that will create and lifecycle its own cluster on OpenShift
18+
19+
20+
@pytest.mark.openshift
21+
class TestRayJobLifecycledClusterOauth:
22+
def setup_method(self):
23+
initialize_kubernetes_client(self)
24+
25+
def teardown_method(self):
26+
delete_namespace(self)
27+
delete_kueue_resources(self)
28+
29+
def test_rayjob_with_lifecycled_cluster_oauth(self):
30+
self.setup_method()
31+
create_namespace(self)
32+
create_kueue_resources(self)
33+
self.run_rayjob_with_lifecycled_cluster_oauth()
34+
35+
def run_rayjob_with_lifecycled_cluster_oauth(self):
36+
ray_image = get_ray_image()
37+
38+
auth = TokenAuthentication(
39+
token=run_oc_command(["whoami", "--show-token=true"]),
40+
server=run_oc_command(["whoami", "--show-server=true"]),
41+
skip_tls=True,
42+
)
43+
auth.login()
44+
45+
job_name = "lifecycled-cluster-rayjob"
46+
47+
# Create cluster configuration for auto-creation
48+
cluster_config = ManagedClusterConfig(
49+
head_cpu_requests="500m",
50+
head_cpu_limits="500m",
51+
head_memory_requests=1,
52+
head_memory_limits=4,
53+
num_workers=1,
54+
worker_cpu_requests="500m",
55+
worker_cpu_limits="500m",
56+
worker_memory_requests=1,
57+
worker_memory_limits=4,
58+
image=ray_image,
59+
)
60+
61+
# Create RayJob with embedded cluster - will auto-create and manage cluster lifecycle
62+
rayjob = RayJob(
63+
job_name=job_name,
64+
cluster_config=cluster_config, # This triggers auto-cluster creation
65+
namespace=self.namespace,
66+
entrypoint="python -c \"import ray; ray.init(); print('Hello from auto-created cluster!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"",
67+
runtime_env={
68+
"pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"],
69+
"env_vars": get_setup_env_variables(ACCELERATOR="cpu"),
70+
},
71+
shutdown_after_job_finishes=True, # Auto-cleanup cluster after job finishes
72+
ttl_seconds_after_finished=30, # Wait 30s after job completion before cleanup
73+
)
74+
75+
# Submit the job
76+
print(
77+
f"Submitting RayJob '{job_name}' with auto-cluster creation and lifecycle management"
78+
)
79+
submission_result = rayjob.submit()
80+
assert (
81+
submission_result == job_name
82+
), f"Job submission failed, expected {job_name}, got {submission_result}"
83+
print(
84+
f"Successfully submitted RayJob '{job_name}' with cluster '{rayjob.cluster_name}'!"
85+
)
86+
87+
# Monitor the job status until completion
88+
self.monitor_rayjob_completion(rayjob)
89+
90+
# Verify cluster auto-cleanup
91+
print("🔍 Verifying cluster auto-cleanup after job completion...")
92+
self.verify_cluster_cleanup(rayjob.cluster_name, timeout=60)
93+
94+
def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900):
95+
"""
96+
Monitor a RayJob until it completes or fails.
97+
Args:
98+
rayjob: The RayJob instance to monitor
99+
timeout: Maximum time to wait in seconds (default: 15 minutes)
100+
"""
101+
print(f"Monitoring RayJob '{rayjob.name}' status...")
102+
103+
elapsed_time = 0
104+
check_interval = 10 # Check every 10 seconds
105+
106+
while elapsed_time < timeout:
107+
status, ready = rayjob.status(print_to_console=True)
108+
109+
# Check if job has completed (either successfully or failed)
110+
if status == CodeflareRayJobStatus.COMPLETE:
111+
print(f"RayJob '{rayjob.name}' completed successfully!")
112+
return
113+
elif status == CodeflareRayJobStatus.FAILED:
114+
raise AssertionError(f"RayJob '{rayjob.name}' failed!")
115+
elif status == CodeflareRayJobStatus.RUNNING:
116+
print(f"RayJob '{rayjob.name}' is still running...")
117+
elif status == CodeflareRayJobStatus.UNKNOWN:
118+
print(f"RayJob '{rayjob.name}' status is unknown")
119+
120+
# Wait before next check
121+
sleep(check_interval)
122+
elapsed_time += check_interval
123+
124+
# If we reach here, the job has timed out
125+
final_status, _ = rayjob.status(print_to_console=True)
126+
raise TimeoutError(
127+
f"RayJob '{rayjob.name}' did not complete within {timeout} seconds. "
128+
f"Final status: {final_status}"
129+
)
130+
131+
def verify_cluster_cleanup(self, cluster_name: str, timeout: int = 60):
132+
"""
133+
Verify that the cluster created by the RayJob has been cleaned up.
134+
Args:
135+
cluster_name: The name of the cluster to check for cleanup
136+
timeout: Maximum time to wait for cleanup in seconds (default: 1 minute)
137+
"""
138+
from kubernetes import client
139+
import kubernetes.client.rest
140+
141+
elapsed_time = 0
142+
check_interval = 5 # Check every 5 seconds
143+
144+
while elapsed_time < timeout:
145+
try:
146+
# Try to get the RayCluster resource
147+
custom_api = client.CustomObjectsApi()
148+
custom_api.get_namespaced_custom_object(
149+
group="ray.io",
150+
version="v1",
151+
namespace=self.namespace,
152+
plural="rayclusters",
153+
name=cluster_name,
154+
)
155+
print(f"Cluster '{cluster_name}' still exists, waiting for cleanup...")
156+
sleep(check_interval)
157+
elapsed_time += check_interval
158+
except kubernetes.client.rest.ApiException as e:
159+
if e.status == 404:
160+
print(
161+
f"✅ Cluster '{cluster_name}' has been successfully cleaned up!"
162+
)
163+
return
164+
else:
165+
raise e
166+
167+
# If we reach here, the cluster was not cleaned up in time
168+
raise TimeoutError(
169+
f"Cluster '{cluster_name}' was not cleaned up within {timeout} seconds"
170+
)

0 commit comments

Comments
 (0)