Skip to content

Commit c549a0e

Browse files
RHOAIENG-27792: Add stop and resubmit functions to RayJob
1 parent 47122ee commit c549a0e

File tree

3 files changed

+146
-11
lines changed

3 files changed

+146
-11
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ cryptography = "43.0.3"
3333
executing = "1.2.0"
3434
pydantic = "< 2"
3535
ipywidgets = "8.1.2"
36-
python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "d1e750d9beac612ad455b951c1a789f971409ab3" }
36+
python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "49419654418865a5838adc7f323f13d82454aa18" }
3737

3838
[[tool.poetry.source]]
3939
name = "pypi"

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -154,29 +154,24 @@ def __init__(
154154
logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}")
155155

156156
def submit(self) -> str:
157-
# Validate required parameters
158157
if not self.entrypoint:
159-
raise ValueError("entrypoint must be provided to submit a RayJob")
158+
raise ValueError("Entrypoint must be provided to submit a RayJob")
160159

161-
# Validate Ray version compatibility for both cluster_config and runtime_env
162160
self._validate_ray_version_compatibility()
161+
163162
# Automatically handle script files for new clusters
164163
if self._cluster_config is not None:
165164
scripts = self._extract_script_files_from_entrypoint()
166165
if scripts:
167166
self._handle_script_volumes_for_new_cluster(scripts)
168-
169-
# Handle script files for existing clusters
170167
elif self._cluster_name:
171168
scripts = self._extract_script_files_from_entrypoint()
172169
if scripts:
173170
self._handle_script_volumes_for_existing_cluster(scripts)
174171

175-
# Build the RayJob custom resource
176172
rayjob_cr = self._build_rayjob_cr()
177173

178-
# Submit the job - KubeRay operator handles everything else
179-
logger.info(f"Submitting RayJob {self.name} to KubeRay operator")
174+
logger.info(f"Submitting RayJob {self.name} to Kuberay operator")
180175
result = self._api.submit_job(k8s_namespace=self.namespace, job=rayjob_cr)
181176

182177
if result:
@@ -189,11 +184,31 @@ def submit(self) -> str:
189184
else:
190185
raise RuntimeError(f"Failed to submit RayJob {self.name}")
191186

187+
def stop(self):
188+
"""
189+
Suspend the Ray job.
190+
"""
191+
stopped = self._api.suspend_job(name=self.name, k8s_namespace=self.namespace)
192+
if stopped:
193+
logger.info(f"Successfully stopped the RayJob {self.name}")
194+
return True
195+
else:
196+
raise RuntimeError(f"Failed to stop the RayJob {self.name}")
197+
198+
def resubmit(self):
199+
"""
200+
Resubmit the Ray job.
201+
"""
202+
if self._api.resubmit_job(name=self.name, k8s_namespace=self.namespace):
203+
logger.info(f"Successfully resubmitted the RayJob {self.name}")
204+
return True
205+
else:
206+
raise RuntimeError(f"Failed to resubmit the RayJob {self.name}")
207+
192208
def _build_rayjob_cr(self) -> Dict[str, Any]:
193209
"""
194210
Build the RayJob custom resource specification using native RayJob capabilities.
195211
"""
196-
# Basic RayJob custom resource structure
197212
rayjob_cr = {
198213
"apiVersion": "ray.io/v1",
199214
"kind": "RayJob",

src/codeflare_sdk/ray/rayjobs/test_rayjob.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def test_submit_validation_no_entrypoint(mocker):
390390
)
391391

392392
with pytest.raises(
393-
ValueError, match="entrypoint must be provided to submit a RayJob"
393+
ValueError, match="Entrypoint must be provided to submit a RayJob"
394394
):
395395
rayjob.submit()
396396

@@ -1878,3 +1878,123 @@ def test_add_script_volumes_existing_mount_skip():
18781878
# Should still have only one mount and no volume added
18791879
assert len(config.volumes) == 0 # Volume not added due to mount skip
18801880
assert len(config.volume_mounts) == 1
1881+
1882+
1883+
def test_rayjob_stop_success(mocker, caplog):
1884+
"""Test successful RayJob stop operation."""
1885+
mocker.patch("kubernetes.config.load_kube_config")
1886+
1887+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1888+
mock_api_instance = MagicMock()
1889+
mock_api_class.return_value = mock_api_instance
1890+
1891+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1892+
1893+
mock_api_instance.suspend_job.return_value = {
1894+
"metadata": {"name": "test-rayjob"},
1895+
"spec": {"suspend": True},
1896+
}
1897+
1898+
rayjob = RayJob(
1899+
job_name="test-rayjob",
1900+
cluster_name="test-cluster",
1901+
namespace="test-namespace",
1902+
entrypoint="python script.py",
1903+
)
1904+
1905+
with caplog.at_level("INFO"):
1906+
result = rayjob.stop()
1907+
1908+
assert result is True
1909+
1910+
mock_api_instance.suspend_job.assert_called_once_with(
1911+
name="test-rayjob", k8s_namespace="test-namespace"
1912+
)
1913+
1914+
# Verify success message was logged
1915+
assert "Successfully stopped the RayJob test-rayjob" in caplog.text
1916+
1917+
1918+
def test_rayjob_stop_failure(mocker):
1919+
"""Test RayJob stop operation when API call fails."""
1920+
mocker.patch("kubernetes.config.load_kube_config")
1921+
1922+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1923+
mock_api_instance = MagicMock()
1924+
mock_api_class.return_value = mock_api_instance
1925+
1926+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1927+
1928+
mock_api_instance.suspend_job.return_value = None
1929+
1930+
rayjob = RayJob(
1931+
job_name="test-rayjob",
1932+
cluster_name="test-cluster",
1933+
namespace="test-namespace",
1934+
entrypoint="python script.py",
1935+
)
1936+
1937+
with pytest.raises(RuntimeError, match="Failed to stop the RayJob test-rayjob"):
1938+
rayjob.stop()
1939+
1940+
mock_api_instance.suspend_job.assert_called_once_with(
1941+
name="test-rayjob", k8s_namespace="test-namespace"
1942+
)
1943+
1944+
1945+
def test_rayjob_resubmit_success(mocker):
1946+
"""Test successful RayJob resubmit operation."""
1947+
mocker.patch("kubernetes.config.load_kube_config")
1948+
1949+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1950+
mock_api_instance = MagicMock()
1951+
mock_api_class.return_value = mock_api_instance
1952+
1953+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1954+
1955+
mock_api_instance.resubmit_job.return_value = {
1956+
"metadata": {"name": "test-rayjob"},
1957+
"spec": {"suspend": False},
1958+
}
1959+
1960+
rayjob = RayJob(
1961+
job_name="test-rayjob",
1962+
cluster_name="test-cluster",
1963+
namespace="test-namespace",
1964+
entrypoint="python script.py",
1965+
)
1966+
1967+
result = rayjob.resubmit()
1968+
1969+
assert result is True
1970+
1971+
mock_api_instance.resubmit_job.assert_called_once_with(
1972+
name="test-rayjob", k8s_namespace="test-namespace"
1973+
)
1974+
1975+
1976+
def test_rayjob_resubmit_failure(mocker):
1977+
"""Test RayJob resubmit operation when API call fails."""
1978+
mocker.patch("kubernetes.config.load_kube_config")
1979+
1980+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1981+
mock_api_instance = MagicMock()
1982+
mock_api_class.return_value = mock_api_instance
1983+
1984+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1985+
1986+
mock_api_instance.resubmit_job.return_value = None
1987+
1988+
rayjob = RayJob(
1989+
job_name="test-rayjob",
1990+
cluster_name="test-cluster",
1991+
namespace="test-namespace",
1992+
entrypoint="python script.py",
1993+
)
1994+
1995+
with pytest.raises(RuntimeError, match="Failed to resubmit the RayJob test-rayjob"):
1996+
rayjob.resubmit()
1997+
1998+
mock_api_instance.resubmit_job.assert_called_once_with(
1999+
name="test-rayjob", k8s_namespace="test-namespace"
2000+
)

0 commit comments

Comments
 (0)