diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 2fb77f3bf..7c97f37bb 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -149,7 +149,6 @@ from torchx.util.strings import normalize_str from torchx.workspace.docker_workspace import DockerWorkspaceMixin - if TYPE_CHECKING: from docker import DockerClient from kubernetes.client import ApiClient, CustomObjectsApi @@ -159,6 +158,7 @@ ) from kubernetes.client.rest import ApiException + logger: logging.Logger = logging.getLogger(__name__) # Kubernetes reserves a small amount of resources per host for the system. For @@ -294,7 +294,14 @@ def sanitize_for_serialization(obj: object) -> object: return api.sanitize_for_serialization(obj) -def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod": +def role_to_pod( + name: str, + role: Role, + service_account: Optional[str], + reserved_millicpu: int = RESERVED_MILLICPU, + reserved_memmb: int = RESERVED_MEMMB, + efa_device_count: Optional[int] = None, +) -> "V1Pod": from kubernetes.client.models import ( # noqa: F811 redefinition of unused V1Container, V1ContainerPort, @@ -324,18 +331,29 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod if resource.cpu > 0: mcpu = int(resource.cpu * 1000) limits["cpu"] = f"{mcpu}m" - request_mcpu = max(mcpu - RESERVED_MILLICPU, 0) + request_mcpu = max(mcpu - reserved_millicpu, 0) requests["cpu"] = f"{request_mcpu}m" if resource.memMB > 0: limits["memory"] = f"{int(resource.memMB)}M" - request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0) + request_memMB = max(int(resource.memMB) - reserved_memmb, 0) requests["memory"] = f"{request_memMB}M" if resource.gpu > 0: requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu) + EFA_DEVICE = "vpc.amazonaws.com/efa" for device_name, device_limit in resource.devices.items(): limits[device_name] = str(device_limit) + # Handle EFA device count override: + # - None (default): use whatever count is in the resource spec (already added above) + # - 0: remove EFA devices entirely + # - N > 0: set EFA device count to N (override or add) + if efa_device_count is not None: + if efa_device_count == 0: + limits.pop(EFA_DEVICE, None) + else: + limits[EFA_DEVICE] = str(efa_device_count) + resources = V1ResourceRequirements( limits=limits, requests=requests, @@ -475,6 +493,9 @@ def app_to_resource( queue: str, service_account: Optional[str], priority_class: Optional[str] = None, + reserved_millicpu: int = RESERVED_MILLICPU, + reserved_memmb: int = RESERVED_MEMMB, + efa_device_count: Optional[int] = None, ) -> Dict[str, Any]: """ app_to_resource creates a volcano job kubernetes resource definition from @@ -507,7 +528,14 @@ def app_to_resource( replica_role.env["TORCHX_RANK0_HOST"] = "localhost" replica_role.env["TORCHX_IMAGE"] = replica_role.image - pod = role_to_pod(name, replica_role, service_account) + pod = role_to_pod( + name, + replica_role, + service_account, + reserved_millicpu, + reserved_memmb, + efa_device_count, + ) if k8s_metadata := role.metadata.get("kubernetes"): if isinstance(k8s_metadata, str): import fsspec @@ -589,6 +617,9 @@ class KubernetesOpts(TypedDict, total=False): service_account: Optional[str] priority_class: Optional[str] validate_spec: Optional[bool] + reserved_millicpu: Optional[int] + reserved_memmb: Optional[int] + efa_device_count: Optional[int] class KubernetesScheduler(DockerWorkspaceMixin, Scheduler[KubernetesOpts]): @@ -707,9 +738,14 @@ def _api_client(self) -> "ApiClient": if c is None: configuration = client.Configuration() try: - config.load_kube_config(client_configuration=configuration) - except config.ConfigException as e: - warnings.warn(f"failed to load kube config: {e}") + # Try in-cluster config first (for pods with ServiceAccount) + config.load_incluster_config(client_configuration=configuration) + except config.ConfigException: + # Fall back to kubeconfig (for local development) + try: + config.load_kube_config(client_configuration=configuration) + except config.ConfigException as e: + warnings.warn(f"failed to load kube config: {e}") c = self._client = client.ApiClient(configuration) @@ -783,7 +819,26 @@ def _submit_dryrun( priority_class, str ), "priority_class must be a str" - resource = app_to_resource(app, queue, service_account, priority_class) + reserved_millicpu = cfg.get("reserved_millicpu", RESERVED_MILLICPU) + assert isinstance(reserved_millicpu, int), "reserved_millicpu must be an int" + + reserved_memmb = cfg.get("reserved_memmb", RESERVED_MEMMB) + assert isinstance(reserved_memmb, int), "reserved_memmb must be an int" + + efa_device_count = cfg.get("efa_device_count") + assert efa_device_count is None or isinstance( + efa_device_count, int + ), "efa_device_count must be an int or None" + + resource = app_to_resource( + app, + queue, + service_account, + priority_class, + reserved_millicpu, + reserved_memmb, + efa_device_count, + ) if cfg.get("validate_spec"): try: @@ -889,9 +944,29 @@ def _run_opts(self) -> runopts: help="Validate job spec using Kubernetes API dry-run before submission", default=True, ) + opts.add( + "reserved_millicpu", + type_=int, + help="Amount of CPU in millicores to reserve for Kubernetes system overhead (default: 100)", + default=RESERVED_MILLICPU, + ) + opts.add( + "reserved_memmb", + type_=int, + help="Amount of memory in MB to reserve for Kubernetes system overhead (default: 1024)", + default=RESERVED_MEMMB, + ) + opts.add( + "efa_device_count", + type_=int, + help="EFA device count override: None/unset=use resource spec, " + "0=remove EFA, N>0=set EFA count to N", + default=None, + ) return opts def describe(self, app_id: str) -> Optional[DescribeAppResponse]: + from kubernetes import client from kubernetes.client.rest import ApiException namespace, name = app_id.split(":") @@ -917,8 +992,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: TASK_STATUS_COUNT = "taskStatusCount" if TASK_STATUS_COUNT in status: - for name, status in status[TASK_STATUS_COUNT].items(): - role, _, idx = name.rpartition("-") + for task_name, status in status[TASK_STATUS_COUNT].items(): + role, _, idx = task_name.rpartition("-") state_str = next(iter(status["phase"].keys())) state = TASK_STATE[state_str] @@ -927,8 +1002,32 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: roles[role] = Role(name=role, num_replicas=0, image="") roles_statuses[role] = RoleStatus(role, []) roles[role].num_replicas += 1 + + # Pod name follows the pattern: {job_name}-{task_name}-0 + # Get the pod to retrieve its IP address + pod_name_k8s = f"{name}-{task_name}-0" + try: + core_api = client.CoreV1Api(self._api_client()) + pod = core_api.read_namespaced_pod( + name=pod_name_k8s, namespace=namespace + ) + pod_ip = pod.status.pod_ip + + # Convert IP to dashed format (e.g., 10.244.1.5 -> 10-244-1-5) + pod_ip_dashed = pod_ip.replace(".", "-") + + # Kubernetes DNS = ..pod.cluster.local + # Note: This will only be useful if the client using the IPs in in the cluster. + hostname = f"{pod_ip_dashed}.{namespace}.pod.cluster.local" + + except ApiException: + # Fallback to old behavior if pod not found + hostname = "" + roles_statuses[role].replicas.append( - ReplicaStatus(id=int(idx), role=role, state=state, hostname="") + ReplicaStatus( + id=int(idx), role=role, state=state, hostname=hostname + ) ) else: app_state = AppState.UNKNOWN diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index edb643364..65b710052 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -476,32 +476,91 @@ def test_device_mounts(self) -> None: ) self.assertTrue(pod.spec.containers[0].security_context.privileged) - def test_resource_devices(self) -> None: - scheduler = create_scheduler("test") - - role = specs.Role( + def test_efa_device_override(self) -> None: + """Test EFA device count can be overridden via efa_device_count parameter.""" + role_with_efa = specs.Role( name="foo", image="", resource=specs.Resource( cpu=2, memMB=3000, gpu=4, - devices={ - "vpc.amazonaws.com/efa": 4, - }, + devices={"vpc.amazonaws.com/efa": 4}, ), ) + role_without_efa = specs.Role( + name="foo", + image="", + resource=specs.Resource(cpu=2, memMB=3000, gpu=4), + ) + + # Default: use resource spec's EFA count (or no EFA if not in spec) + pod = role_to_pod("foo", role_with_efa, service_account="") + self.assertEqual( + pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "4" + ) + + pod = role_to_pod("foo", role_without_efa, service_account="") + self.assertNotIn( + "vpc.amazonaws.com/efa", pod.spec.containers[0].resources.limits + ) + + # Override to 0: remove EFA entirely + pod = role_to_pod("foo", role_with_efa, service_account="", efa_device_count=0) + self.assertNotIn( + "vpc.amazonaws.com/efa", pod.spec.containers[0].resources.limits + ) + + # Override to different count: use override value + pod = role_to_pod("foo", role_with_efa, service_account="", efa_device_count=8) + self.assertEqual( + pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "8" + ) + + # Add EFA when not in resource spec + pod = role_to_pod( + "foo", role_without_efa, service_account="", efa_device_count=32 + ) + self.assertEqual( + pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "32" + ) + + def test_reserved_resources_override(self) -> None: + """Test that reserved_millicpu and reserved_memmb overrides work correctly.""" + role = specs.Role( + name="foo", + image="", + resource=specs.Resource(cpu=2, gpu=0, memMB=3000), + ) + + # Default: 100 millicpu and 1024 memmb reserved pod = role_to_pod("foo", role, service_account="") + self.assertEqual(pod.spec.containers[0].resources.limits["cpu"], "2000m") + self.assertEqual(pod.spec.containers[0].resources.limits["memory"], "3000M") self.assertEqual( - pod.spec.containers[0].resources.limits, - { - "cpu": "2000m", - "memory": "3000M", - "nvidia.com/gpu": "4", - "vpc.amazonaws.com/efa": "4", - }, + pod.spec.containers[0].resources.requests["cpu"], "1900m" + ) # 2000 - 100 + self.assertEqual( + pod.spec.containers[0].resources.requests["memory"], "1976M" + ) # 3000 - 1024 + + # Custom overrides for both CPU and memory + pod = role_to_pod( + "foo", role, service_account="", reserved_millicpu=300, reserved_memmb=1000 ) - self.assertFalse(pod.spec.containers[0].security_context.privileged) + self.assertEqual( + pod.spec.containers[0].resources.requests["cpu"], "1700m" + ) # 2000 - 300 + self.assertEqual( + pod.spec.containers[0].resources.requests["memory"], "2000M" + ) # 3000 - 1000 + + # Zero reserved: requests equal limits + pod = role_to_pod( + "foo", role, service_account="", reserved_millicpu=0, reserved_memmb=0 + ) + self.assertEqual(pod.spec.containers[0].resources.requests["cpu"], "2000m") + self.assertEqual(pod.spec.containers[0].resources.requests["memory"], "3000M") def test_instance_type(self) -> None: scheduler = create_scheduler("test") @@ -682,8 +741,15 @@ def test_submit_job_name_conflict( second_call_kwargs = create_namespaced_custom_object.call_args_list[1][1] self.assertNotIn("dry_run", second_call_kwargs) + @patch("kubernetes.client.CoreV1Api.read_namespaced_pod") @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object_status") - def test_describe(self, get_namespaced_custom_object_status: MagicMock) -> None: + def test_describe( + self, + get_namespaced_custom_object_status: MagicMock, + read_namespaced_pod: MagicMock, + ) -> None: + from kubernetes.client.rest import ApiException + get_namespaced_custom_object_status.return_value = { "status": { "state": {"phase": "Completed"}, @@ -691,6 +757,8 @@ def test_describe(self, get_namespaced_custom_object_status: MagicMock) -> None: "taskStatusCount": {"echo-0": {"phase": {"Succeeded": 1}}}, } } + # Simulate pod not found to trigger fallback to empty hostname + read_namespaced_pod.side_effect = ApiException(status=404, reason="Not Found") app_id = "testnamespace:testid" scheduler = create_scheduler("test") info = scheduler.describe(app_id) @@ -730,6 +798,61 @@ def test_describe(self, get_namespaced_custom_object_status: MagicMock) -> None: ), ) + @patch("kubernetes.client.CoreV1Api.read_namespaced_pod") + @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object_status") + def test_describe_with_pod_ip( + self, + get_namespaced_custom_object_status: MagicMock, + read_namespaced_pod: MagicMock, + ) -> None: + get_namespaced_custom_object_status.return_value = { + "status": { + "state": {"phase": "Running"}, + "running": 1, + "taskStatusCount": {"worker-0": {"phase": {"Running": 1}}}, + } + } + # Mock a pod with a valid IP address + mock_pod = MagicMock() + mock_pod.status.pod_ip = "10.244.1.5" + read_namespaced_pod.return_value = mock_pod + + app_id = "testnamespace:testid" + scheduler = create_scheduler("test") + info = scheduler.describe(app_id) + + # Verify the pod API was called with correct parameters + read_namespaced_pod.assert_called_once_with( + name="testid-worker-0-0", namespace="testnamespace" + ) + + # Verify hostname follows the spec: {pod-ip-dashed}.{namespace}.pod.cluster.local + # IP 10.244.1.5 should become 10-244-1-5 + expected_hostname = "10-244-1-5.testnamespace.pod.cluster.local" + self.assertEqual( + info, + DescribeAppResponse( + app_id=app_id, + state=specs.AppState.RUNNING, + roles_statuses=[ + specs.RoleStatus( + "worker", + [ + specs.ReplicaStatus( + id=0, + role="worker", + state=specs.AppState.RUNNING, + hostname=expected_hostname, + ) + ], + ), + ], + roles=[ + specs.Role(name="worker", image="", num_replicas=1), + ], + ), + ) + @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object_status") def test_describe_unknown( self, get_namespaced_custom_object_status: MagicMock @@ -797,6 +920,9 @@ def test_runopts(self) -> None: "service_account", "priority_class", "validate_spec", + "reserved_millicpu", + "reserved_memmb", + "efa_device_count", }, )