Skip to content

Commit e096a1d

Browse files
committed
New unit tests for jobs with managed clusters
Signed-off-by: Pat O'Connor <[email protected]>
1 parent d472541 commit e096a1d

File tree

1 file changed

+219
-0
lines changed

1 file changed

+219
-0
lines changed

src/codeflare_sdk/ray/cluster/test_cluster.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,225 @@ def custom_side_effect(group, version, namespace, plural, **kwargs):
756756
assert result.dashboard == rc_dashboard
757757

758758

759+
def test_run_job_with_managed_cluster_success(mocker):
760+
"""Test successful RayJob execution with managed cluster."""
761+
from codeflare_sdk.ray.job.job import RayJobSpec
762+
763+
# Mock Kubernetes API and config
764+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
765+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
766+
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")
767+
768+
# Mock get_api_client
769+
mock_api_client = mocker.Mock()
770+
mocker.patch("codeflare_sdk.common.kubernetes_cluster.auth.get_api_client", return_value=mock_api_client)
771+
772+
# Mock CustomObjectsApi
773+
mock_co_api = mocker.Mock()
774+
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_co_api)
775+
776+
# Mock Cluster creation for generating RayCluster spec
777+
mock_cluster_resource = {
778+
"apiVersion": "ray.io/v1",
779+
"kind": "RayCluster",
780+
"spec": {
781+
"rayVersion": "2.47.1",
782+
"headGroupSpec": {"template": {"spec": {}}},
783+
"workerGroupSpecs": [{"template": {"spec": {}}}]
784+
}
785+
}
786+
787+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster.__init__", return_value=None)
788+
mock_cluster_instance = mocker.Mock()
789+
mock_cluster_instance.resource_yaml = mock_cluster_resource
790+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster", return_value=mock_cluster_instance)
791+
792+
# Mock successful RayJob creation
793+
mock_co_api.create_namespaced_custom_object.return_value = {"metadata": {"name": "test-rayjob"}}
794+
795+
# Mock RayJob status for completion
796+
mock_status_response = {
797+
"status": {
798+
"jobDeploymentStatus": "Running",
799+
"jobStatus": "SUCCEEDED",
800+
"dashboardURL": "http://test-dashboard.com",
801+
"rayClusterName": "test-rayjob-cluster",
802+
"jobId": "test-job-123"
803+
}
804+
}
805+
mock_co_api.get_namespaced_custom_object_status.return_value = mock_status_response
806+
807+
# Mock time.sleep to speed up test
808+
mocker.patch("time.sleep")
809+
810+
# Create test configuration
811+
cluster_config = ClusterConfiguration(
812+
name="test-cluster",
813+
namespace="test-namespace",
814+
num_workers=1,
815+
head_cpu_requests=1,
816+
head_memory_requests="2G",
817+
worker_cpu_requests=1,
818+
worker_memory_requests="1G"
819+
)
820+
821+
job_config = RayJobSpec(
822+
entrypoint="python -c 'print(\"Hello World\")'",
823+
submission_id="test-submission-123"
824+
)
825+
826+
# Execute the method
827+
result = Cluster.run_job_with_managed_cluster(
828+
cluster_config=cluster_config,
829+
job_config=job_config,
830+
job_cr_name="test-rayjob",
831+
wait_for_completion=True,
832+
job_timeout_seconds=60
833+
)
834+
835+
# Verify the result
836+
assert result["job_cr_name"] == "test-rayjob"
837+
assert result["job_submission_id"] == "test-job-123"
838+
assert result["job_status"] == "SUCCEEDED"
839+
assert result["dashboard_url"] == "http://test-dashboard.com"
840+
assert result["ray_cluster_name"] == "test-rayjob-cluster"
841+
842+
# Verify API calls were made
843+
mock_co_api.create_namespaced_custom_object.assert_called_once()
844+
mock_co_api.get_namespaced_custom_object_status.assert_called()
845+
846+
847+
def test_run_job_with_managed_cluster_no_wait(mocker):
848+
"""Test RayJob execution without waiting for completion."""
849+
from codeflare_sdk.ray.job.job import RayJobSpec
850+
851+
# Mock dependencies (similar to above but condensed)
852+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
853+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
854+
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")
855+
856+
mock_api_client = mocker.Mock()
857+
mocker.patch("codeflare_sdk.common.kubernetes_cluster.auth.get_api_client", return_value=mock_api_client)
858+
859+
mock_co_api = mocker.Mock()
860+
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_co_api)
861+
862+
# Mock Cluster creation
863+
mock_cluster_resource = {
864+
"apiVersion": "ray.io/v1",
865+
"kind": "RayCluster",
866+
"spec": {"rayVersion": "2.47.1", "headGroupSpec": {"template": {"spec": {}}}}
867+
}
868+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster.__init__", return_value=None)
869+
mock_cluster_instance = mocker.Mock()
870+
mock_cluster_instance.resource_yaml = mock_cluster_resource
871+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster", return_value=mock_cluster_instance)
872+
873+
# Mock RayJob creation and status (not found initially - job just submitted)
874+
mock_co_api.create_namespaced_custom_object.return_value = {"metadata": {"name": "test-rayjob-nowait"}}
875+
876+
from kubernetes.client.rest import ApiException
877+
mock_co_api.get_namespaced_custom_object_status.side_effect = ApiException(status=404, reason="Not Found")
878+
879+
cluster_config = ClusterConfiguration(
880+
name="test-cluster",
881+
namespace="test-namespace"
882+
)
883+
884+
job_config = RayJobSpec(entrypoint="python script.py")
885+
886+
# Execute without waiting
887+
result = Cluster.run_job_with_managed_cluster(
888+
cluster_config=cluster_config,
889+
job_config=job_config,
890+
wait_for_completion=False
891+
)
892+
893+
# Verify result for no-wait case
894+
assert "job_cr_name" in result
895+
assert result["job_status"] == "SUBMITTED_NOT_FOUND"
896+
897+
# Verify no polling happened
898+
mock_co_api.create_namespaced_custom_object.assert_called_once()
899+
900+
901+
def test_run_job_with_managed_cluster_timeout(mocker):
902+
"""Test RayJob execution with timeout."""
903+
from codeflare_sdk.ray.job.job import RayJobSpec
904+
905+
# Mock dependencies
906+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
907+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
908+
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")
909+
910+
mock_api_client = mocker.Mock()
911+
mocker.patch("codeflare_sdk.common.kubernetes_cluster.auth.get_api_client", return_value=mock_api_client)
912+
913+
mock_co_api = mocker.Mock()
914+
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_co_api)
915+
916+
# Mock Cluster creation
917+
mock_cluster_resource = {"apiVersion": "ray.io/v1", "kind": "RayCluster", "spec": {}}
918+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster.__init__", return_value=None)
919+
mock_cluster_instance = mocker.Mock()
920+
mock_cluster_instance.resource_yaml = mock_cluster_resource
921+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster", return_value=mock_cluster_instance)
922+
923+
# Mock RayJob creation
924+
mock_co_api.create_namespaced_custom_object.return_value = {"metadata": {"name": "test-rayjob-timeout"}}
925+
926+
# Mock job status as always RUNNING (never completes)
927+
mock_status_response = {
928+
"status": {
929+
"jobDeploymentStatus": "Running",
930+
"jobStatus": "RUNNING",
931+
"jobId": "timeout-job-123"
932+
}
933+
}
934+
mock_co_api.get_namespaced_custom_object_status.return_value = mock_status_response
935+
936+
# Mock time to simulate timeout
937+
start_time = 1000
938+
mocker.patch("time.time", side_effect=[start_time, start_time + 70]) # Exceed 60s timeout
939+
mocker.patch("time.sleep")
940+
941+
cluster_config = ClusterConfiguration(name="timeout-cluster", namespace="test-ns")
942+
job_config = RayJobSpec(entrypoint="python long_running_script.py")
943+
944+
# Execute and expect timeout
945+
try:
946+
result = Cluster.run_job_with_managed_cluster(
947+
cluster_config=cluster_config,
948+
job_config=job_config,
949+
wait_for_completion=True,
950+
job_timeout_seconds=60,
951+
job_polling_interval_seconds=1
952+
)
953+
assert False, "Expected TimeoutError"
954+
except TimeoutError as e:
955+
assert "timed out after 60 seconds" in str(e)
956+
assert "RUNNING" in str(e)
957+
958+
959+
def test_run_job_with_managed_cluster_validation_error(mocker):
960+
"""Test RayJob execution with validation errors."""
961+
from codeflare_sdk.ray.job.job import RayJobSpec
962+
963+
cluster_config = ClusterConfiguration(name="test-cluster", namespace="test-ns")
964+
965+
# Test missing entrypoint
966+
job_config_no_entrypoint = RayJobSpec(entrypoint="")
967+
968+
try:
969+
Cluster.run_job_with_managed_cluster(
970+
cluster_config=cluster_config,
971+
job_config=job_config_no_entrypoint
972+
)
973+
assert False, "Expected ValueError for missing entrypoint"
974+
except ValueError as e:
975+
assert "entrypoint must be specified" in str(e)
976+
977+
759978
# Make sure to always keep this function last
760979
def test_cleanup():
761980
os.remove(f"{aw_dir}test-all-params.yaml")

0 commit comments

Comments
 (0)