Skip to content

RHOAIENG-26482: Add GCS Fault Tolerance, Disable Usage Stats, Rename the RayJob scope RayCluster Config #880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
coverage:
precision: 2
round: down
status:
project:
default:
target: auto
threshold: 2.5%
patch:
default:
target: 85%
threshold: 2.5%

ignore:
- "**/__init__.py"
2 changes: 1 addition & 1 deletion src/codeflare_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
AppWrapperStatus,
RayJobClient,
RayJob,
RayJobClusterConfig,
ManagedClusterConfig,
)

from .common.widgets import view_clusters
Expand Down
2 changes: 1 addition & 1 deletion src/codeflare_sdk/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from .rayjobs import (
RayJob,
RayJobClusterConfig,
ManagedClusterConfig,
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
Expand Down
3 changes: 2 additions & 1 deletion src/codeflare_sdk/ray/rayjobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .rayjob import RayJob, RayJobClusterConfig
from .rayjob import RayJob, ManagedClusterConfig
from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo
from .config import ManagedClusterConfig
72 changes: 67 additions & 5 deletions src/codeflare_sdk/ray/rayjobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

"""
The config sub-module contains the definition of the RayJobClusterConfigV2 dataclass,
The config sub-module contains the definition of the ManagedClusterConfig dataclass,
which is used to specify resource requirements and other details when creating a
Cluster object.
"""
Expand Down Expand Up @@ -104,7 +104,7 @@


@dataclass
class RayJobClusterConfig:
class ManagedClusterConfig:
"""
This dataclass is used to specify resource requirements and other details for RayJobs.
The cluster name and namespace are automatically derived from the RayJob configuration.
Expand Down Expand Up @@ -139,6 +139,14 @@ class RayJobClusterConfig:
A list of V1Volume objects to add to the Cluster
volume_mounts:
A list of V1VolumeMount objects to add to the Cluster
enable_gcs_ft:
A boolean indicating whether to enable GCS fault tolerance.
redis_address:
The address of the Redis server to use for GCS fault tolerance, required when enable_gcs_ft is True.
redis_password_secret:
Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"}
external_storage_namespace:
The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster.
"""

head_cpu_requests: Union[int, str] = 2
Expand All @@ -165,8 +173,35 @@ class RayJobClusterConfig:
annotations: Dict[str, str] = field(default_factory=dict)
volumes: list[V1Volume] = field(default_factory=list)
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
enable_gcs_ft: bool = False
redis_address: Optional[str] = None
redis_password_secret: Optional[Dict[str, str]] = None
external_storage_namespace: Optional[str] = None

def __post_init__(self):
self.envs["RAY_USAGE_STATS_ENABLED"] = "0"

if self.enable_gcs_ft:
if not self.redis_address:
raise ValueError(
"redis_address must be provided when enable_gcs_ft is True"
)

if self.redis_password_secret and not isinstance(
self.redis_password_secret, dict
):
raise ValueError(
"redis_password_secret must be a dictionary with 'name' and 'key' fields"
)

if self.redis_password_secret and (
"name" not in self.redis_password_secret
or "key" not in self.redis_password_secret
):
raise ValueError(
"redis_password_secret must contain both 'name' and 'key' fields"
)

self._validate_types()
self._memory_to_string()
self._validate_gpu_config(self.head_accelerators)
Expand All @@ -190,7 +225,7 @@ def _memory_to_string(self):
self.worker_memory_limits = f"{self.worker_memory_limits}G"

def _validate_types(self):
"""Validate the types of all fields in the RayJobClusterConfig dataclass."""
"""Validate the types of all fields in the ManagedClusterConfig dataclass."""
errors = []
for field_info in fields(self):
value = getattr(self, field_info.name)
Expand Down Expand Up @@ -235,10 +270,10 @@ def check_type(value, expected_type):

def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
"""
Build the RayCluster spec from RayJobClusterConfig for embedding in RayJob.
Build the RayCluster spec from ManagedClusterConfig for embedding in RayJob.

Args:
self: The cluster configuration object (RayJobClusterConfig)
self: The cluster configuration object (ManagedClusterConfig)
cluster_name: The name for the cluster (derived from RayJob name)

Returns:
Expand All @@ -251,6 +286,11 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
}

# Add GCS fault tolerance if enabled
if self.enable_gcs_ft:
gcs_ft_options = self._build_gcs_ft_options()
ray_cluster_spec["gcsFaultToleranceOptions"] = gcs_ft_options

return ray_cluster_spec

def _build_head_group_spec(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -453,3 +493,25 @@ def _generate_volumes(self) -> list:
def _build_env_vars(self) -> list:
"""Build environment variables list."""
return [V1EnvVar(name=key, value=value) for key, value in self.envs.items()]

def _build_gcs_ft_options(self) -> Dict[str, Any]:
"""Build GCS fault tolerance options."""
gcs_ft_options = {"redisAddress": self.redis_address}

if (
hasattr(self, "external_storage_namespace")
and self.external_storage_namespace
):
gcs_ft_options["externalStorageNamespace"] = self.external_storage_namespace

if hasattr(self, "redis_password_secret") and self.redis_password_secret:
gcs_ft_options["redisPassword"] = {
"valueFrom": {
"secretKeyRef": {
"name": self.redis_password_secret["name"],
"key": self.redis_password_secret["key"],
}
}
}

return gcs_ft_options
5 changes: 2 additions & 3 deletions src/codeflare_sdk/ray/rayjobs/rayjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Dict, Any, Optional, Tuple
from python_client.kuberay_job_api import RayjobApi

from codeflare_sdk.ray.rayjobs.config import RayJobClusterConfig
from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig

from ...common.utils import get_current_namespace

Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
job_name: str,
entrypoint: str,
cluster_name: Optional[str] = None,
cluster_config: Optional[RayJobClusterConfig] = None,
cluster_config: Optional[ManagedClusterConfig] = None,
namespace: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
shutdown_after_job_finishes: Optional[bool] = None,
Expand Down Expand Up @@ -140,7 +140,6 @@ def __init__(
self.cluster_name = cluster_name
logger.info(f"Using existing cluster: {self.cluster_name}")

# Initialize the KubeRay job API client
self._api = RayjobApi()

logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}")
Expand Down
67 changes: 59 additions & 8 deletions src/codeflare_sdk/ray/rayjobs/test_config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""
Tests for the simplified RayJobClusterConfig accelerator_configs behavior.
Tests for the simplified ManagedClusterConfig accelerator_configs behavior.
"""

import pytest
from codeflare_sdk.ray.rayjobs.config import RayJobClusterConfig, DEFAULT_ACCELERATORS
from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig, DEFAULT_ACCELERATORS


def test_accelerator_configs_defaults_to_default_accelerators():
"""Test that accelerator_configs defaults to DEFAULT_ACCELERATORS.copy()"""
config = RayJobClusterConfig()
config = ManagedClusterConfig()

# Should have all the default accelerators
assert "nvidia.com/gpu" in config.accelerator_configs
Expand All @@ -27,7 +27,7 @@ def test_accelerator_configs_can_be_overridden():
"custom.com/accelerator": "CUSTOM_ACCELERATOR",
}

config = RayJobClusterConfig(accelerator_configs=custom_configs)
config = ManagedClusterConfig(accelerator_configs=custom_configs)

# Should have custom configs
assert config.accelerator_configs == custom_configs
Expand All @@ -46,7 +46,7 @@ def test_accelerator_configs_can_extend_defaults():
"custom.com/accelerator": "CUSTOM_ACCEL",
}

config = RayJobClusterConfig(accelerator_configs=extended_configs)
config = ManagedClusterConfig(accelerator_configs=extended_configs)

# Should have all defaults plus custom
assert "nvidia.com/gpu" in config.accelerator_configs
Expand All @@ -57,15 +57,15 @@ def test_accelerator_configs_can_extend_defaults():

def test_gpu_validation_works_with_defaults():
"""Test that GPU validation works with default accelerator configs"""
config = RayJobClusterConfig(head_accelerators={"nvidia.com/gpu": 1})
config = ManagedClusterConfig(head_accelerators={"nvidia.com/gpu": 1})

# Should not raise any errors
assert config.head_accelerators == {"nvidia.com/gpu": 1}


def test_gpu_validation_works_with_custom_configs():
"""Test that GPU validation works with custom accelerator configs"""
config = RayJobClusterConfig(
config = ManagedClusterConfig(
accelerator_configs={"custom.com/accelerator": "CUSTOM_ACCEL"},
head_accelerators={"custom.com/accelerator": 1},
)
Expand All @@ -79,4 +79,55 @@ def test_gpu_validation_fails_with_unsupported_accelerator():
with pytest.raises(
ValueError, match="GPU configuration 'unsupported.com/accelerator' not found"
):
RayJobClusterConfig(head_accelerators={"unsupported.com/accelerator": 1})
ManagedClusterConfig(head_accelerators={"unsupported.com/accelerator": 1})


def test_ray_usage_stats_always_disabled_by_default():
"""Test that RAY_USAGE_STATS_ENABLED is always set to '0' by default"""
config = ManagedClusterConfig()

# Should always have the environment variable set to "0"
assert "RAY_USAGE_STATS_ENABLED" in config.envs
assert config.envs["RAY_USAGE_STATS_ENABLED"] == "0"


def test_ray_usage_stats_overwrites_user_env():
"""Test that RAY_USAGE_STATS_ENABLED is always set to '0' even if user specifies it"""
# User tries to enable usage stats
config = ManagedClusterConfig(envs={"RAY_USAGE_STATS_ENABLED": "1"})

# Should still be disabled (our setting takes precedence)
assert "RAY_USAGE_STATS_ENABLED" in config.envs
assert config.envs["RAY_USAGE_STATS_ENABLED"] == "0"


def test_ray_usage_stats_overwrites_user_env_string():
"""Test that RAY_USAGE_STATS_ENABLED is always set to '0' even if user specifies it as string"""
# User tries to enable usage stats with string
config = ManagedClusterConfig(envs={"RAY_USAGE_STATS_ENABLED": "true"})

# Should still be disabled (our setting takes precedence)
assert "RAY_USAGE_STATS_ENABLED" in config.envs
assert config.envs["RAY_USAGE_STATS_ENABLED"] == "0"


def test_ray_usage_stats_with_other_user_envs():
"""Test that RAY_USAGE_STATS_ENABLED is set correctly while preserving other user envs"""
# User sets other environment variables
user_envs = {
"CUSTOM_VAR": "custom_value",
"ANOTHER_VAR": "another_value",
"RAY_USAGE_STATS_ENABLED": "1", # This should be overwritten
}

config = ManagedClusterConfig(envs=user_envs)

# Our setting should take precedence
assert config.envs["RAY_USAGE_STATS_ENABLED"] == "0"

# Other user envs should be preserved
assert config.envs["CUSTOM_VAR"] == "custom_value"
assert config.envs["ANOTHER_VAR"] == "another_value"

# Total count should be correct (3 user envs)
assert len(config.envs) == 3
Loading
Loading