From ba4ab092c9bcca4e1450462d05917a692b6ae115 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 7 Jul 2025 17:05:39 -0700 Subject: [PATCH 1/8] [data] allocate GPU resources Signed-off-by: Hao Chen --- .../ray/data/_internal/execution/resource_manager.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index fbe4081926a..f707bb86102 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -695,9 +695,15 @@ def update_usages(self): to_borrow, ) self._op_budgets[op] = self._op_budgets[op].add(op_shared) - # We don't limit GPU resources, as not all operators - # use GPU resources. - self._op_budgets[op].gpu = float("inf") + if op.min_max_resource_requirements()[1].gpu > 0: + # If an operator needs GPU, we just allocate all GPUs to it. + # TODO(hchen): allocate resources across multiple GPU operators. + self._op_budgets[op].gpu = ( + self._resource_manager._global_limits.gpu + - self._resource_manager._op_usages[op].gpu + ) + else: + self._op_budgets[op].gpu = 0 # A materializing operator like `AllToAllOperator` waits for all its input # operator's outputs before processing data. This often forces the input From f1ca9bd7096d838e59fd7e63f1f4ecaf1d55f388 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 8 Jul 2025 14:43:20 -0700 Subject: [PATCH 2/8] fix Signed-off-by: Hao Chen --- .../ray/data/_internal/execution/resource_manager.py | 2 +- python/ray/data/tests/test_resource_manager.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index f707bb86102..4e716770d91 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -700,7 +700,7 @@ def update_usages(self): # TODO(hchen): allocate resources across multiple GPU operators. self._op_budgets[op].gpu = ( self._resource_manager._global_limits.gpu - - self._resource_manager._op_usages[op].gpu + - self._resource_manager.get_op_usage(op).gpu ) else: self._op_budgets[op].gpu = 0 diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 792021b9a92..8488c0c446c 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -388,8 +388,8 @@ def mock_get_global_limits(): # 50% of the global limits are shared. assert allocator._total_shared == ExecutionResources(8, 0, 500) # Test budgets. - assert allocator._op_budgets[o2] == ExecutionResources(8, float("inf"), 375) - assert allocator._op_budgets[o3] == ExecutionResources(8, float("inf"), 375) + assert allocator._op_budgets[o2] == ExecutionResources(8, 0, 375) + assert allocator._op_budgets[o3] == ExecutionResources(8, 0, 375) # Test can_submit_new_task and max_task_output_bytes_to_read. assert allocator.can_submit_new_task(o2) assert allocator.can_submit_new_task(o3) @@ -418,9 +418,9 @@ def mock_get_global_limits(): # remaining shared = 1000/2 - 275 = 225 # Test budgets. # memory_budget[o2] = 0 + 225/2 = 112.5 - assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5) + assert allocator._op_budgets[o2] == ExecutionResources(3, 0, 112.5) # memory_budget[o3] = 95 + 225/2 = 207.5 - assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 207.5) + assert allocator._op_budgets[o3] == ExecutionResources(5, 0, 207.5) # Test can_submit_new_task and max_task_output_bytes_to_read. assert allocator.can_submit_new_task(o2) assert allocator.can_submit_new_task(o3) @@ -454,9 +454,9 @@ def mock_get_global_limits(): # Test budgets. # memory_budget[o2] = 0 + 100/2 = 50 - assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50) + assert allocator._op_budgets[o2] == ExecutionResources(1.5, 0, 50) # memory_budget[o3] = 70 + 100/2 = 120 - assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 120) + assert allocator._op_budgets[o3] == ExecutionResources(2.5, 0, 120) # Test can_submit_new_task and max_task_output_bytes_to_read. assert allocator.can_submit_new_task(o2) assert allocator.can_submit_new_task(o3) From aee6b6dca0a130cac9937088023e25f18b077f37 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 8 Jul 2025 14:54:41 -0700 Subject: [PATCH 3/8] add test Signed-off-by: Hao Chen --- .../_internal/execution/resource_manager.py | 2 +- .../ray/data/tests/test_resource_manager.py | 147 +++++++++++++++++- 2 files changed, 146 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 4e716770d91..f77ab293471 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -699,7 +699,7 @@ def update_usages(self): # If an operator needs GPU, we just allocate all GPUs to it. # TODO(hchen): allocate resources across multiple GPU operators. self._op_budgets[op].gpu = ( - self._resource_manager._global_limits.gpu + self._resource_manager.get_global_limits().gpu - self._resource_manager.get_op_usage(op).gpu ) else: diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 8488c0c446c..7044358aa29 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -337,8 +337,8 @@ def test_basic(self, restore_data_context): o4 = LimitOperator(1, o3, DataContext.get_current()) op_usages = {op: ExecutionResources.zero() for op in [o1, o2, o3, o4]} - op_internal_usage = {op: 0 for op in [o1, o2, o3, o4]} - op_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]} + op_internal_usage = dict.fromkeys([o1, o2, o3, o4], 0) + op_outputs_usages = dict.fromkeys([o1, o2, o3, o4], 0) topo, _ = build_streaming_topology(o4, ExecutionOptions()) @@ -617,6 +617,149 @@ def test_only_handle_eligible_ops(self, restore_data_context): allocator.update_usages() assert o2 not in allocator._op_budgets + def test_gpu_allocation(self, restore_data_context): + """Test GPU allocation logic for operators that need GPU vs those that don't.""" + DataContext.get_current().op_resource_reservation_enabled = True + DataContext.get_current().op_resource_reservation_ratio = 0.5 + + # Create operators with different GPU requirements + o1 = InputDataBuffer(DataContext.get_current(), []) + + # Non-GPU operator + o2 = mock_map_op(o1, incremental_resource_usage=ExecutionResources(1, 0, 15)) + + # GPU operator + o3 = mock_map_op( + o2, + ray_remote_args={"num_cpus": 1, "num_gpus": 1}, + incremental_resource_usage=ExecutionResources(1, 1, 15), + ) + + # Another non-GPU operator + o4 = mock_map_op(o3, incremental_resource_usage=ExecutionResources(1, 0, 15)) + + # Mock min_max_resource_requirements to return appropriate GPU requirements + o2.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(0, 0, 0), ExecutionResources(1, 0, 15)) + ) + o3.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(1, 1, 15), ExecutionResources(2, 1, 30)) + ) + o4.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(0, 0, 0), ExecutionResources(1, 0, 15)) + ) + + topo, _ = build_streaming_topology(o4, ExecutionOptions()) + + # Set up global limits with GPUs available + global_limits = ExecutionResources(cpu=16, gpu=4, object_store_memory=1000) + + # Set up usage tracking + op_usages = { + o1: ExecutionResources.zero(), + o2: ExecutionResources(1, 0, 10), # Non-GPU op using some resources + o3: ExecutionResources(1, 1, 20), # GPU op using 1 GPU + o4: ExecutionResources(1, 0, 5), # Non-GPU op using some resources + } + + resource_manager = ResourceManager( + topo, ExecutionOptions(), MagicMock(), DataContext.get_current() + ) + resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) + resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3, o4], 0) + resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3, o4], 0) + resource_manager.get_global_limits = MagicMock(return_value=global_limits) + + allocator = resource_manager._op_resource_allocator + assert isinstance(allocator, ReservationOpResourceAllocator) + + allocator.update_usages() + + # Test GPU allocation + # o2 (non-GPU): should get 0 GPU + assert allocator._op_budgets[o2].gpu == 0 + + # o3 (GPU): should get remaining GPUs (4 total - 1 used = 3 available) + assert allocator._op_budgets[o3].gpu == 3 + + # o4 (non-GPU): should get 0 GPU + assert allocator._op_budgets[o4].gpu == 0 + + # Test scenario with no GPUs available + op_usages[o3] = ExecutionResources(1, 4, 20) # GPU op using all 4 GPUs + allocator.update_usages() + + # o3 should get 0 additional GPUs since all are in use + assert allocator._op_budgets[o3].gpu == 0 + + # Test scenario with more GPUs available + global_limits = ExecutionResources(cpu=16, gpu=8, object_store_memory=1000) + resource_manager.get_global_limits = MagicMock(return_value=global_limits) + op_usages[o3] = ExecutionResources(1, 2, 20) # GPU op using 2 GPUs + allocator.update_usages() + + # o3 should get remaining GPUs (8 total - 2 used = 6 available) + assert allocator._op_budgets[o3].gpu == 6 + + # Non-GPU operators should still get 0 + assert allocator._op_budgets[o2].gpu == 0 + assert allocator._op_budgets[o4].gpu == 0 + + def test_multiple_gpu_operators(self, restore_data_context): + """Test that when multiple operators need GPU, each gets all available GPUs.""" + DataContext.get_current().op_resource_reservation_enabled = True + DataContext.get_current().op_resource_reservation_ratio = 0.5 + + o1 = InputDataBuffer(DataContext.get_current(), []) + + # Two GPU operators + o2 = mock_map_op( + o1, + ray_remote_args={"num_cpus": 1, "num_gpus": 1}, + incremental_resource_usage=ExecutionResources(1, 1, 15), + ) + o3 = mock_map_op( + o2, + ray_remote_args={"num_cpus": 1, "num_gpus": 1}, + incremental_resource_usage=ExecutionResources(1, 1, 15), + ) + + # Mock min_max_resource_requirements for both GPU operators + o2.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(1, 1, 15), ExecutionResources(2, 1, 30)) + ) + o3.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(1, 1, 15), ExecutionResources(2, 1, 30)) + ) + + topo, _ = build_streaming_topology(o3, ExecutionOptions()) + + global_limits = ExecutionResources(cpu=16, gpu=4, object_store_memory=1000) + + op_usages = { + o1: ExecutionResources.zero(), + o2: ExecutionResources(1, 1, 20), # Using 1 GPU + o3: ExecutionResources(1, 0, 20), # Not using GPU yet + } + + resource_manager = ResourceManager( + topo, ExecutionOptions(), MagicMock(), DataContext.get_current() + ) + resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) + resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) + resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3], 0) + resource_manager.get_global_limits = MagicMock(return_value=global_limits) + + allocator = resource_manager._op_resource_allocator + allocator.update_usages() + + # Both GPU operators should get allocated the remaining available GPUs + # o2: 4 total - 1 used = 3 available + assert allocator._op_budgets[o2].gpu == 3 + + # o3: 4 total - 0 used = 4 available + assert allocator._op_budgets[o3].gpu == 4 + if __name__ == "__main__": import sys From 647e1a5ee38135c7d7c72bc72e18d635faa85c72 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 8 Jul 2025 15:04:15 -0700 Subject: [PATCH 4/8] simplify test Signed-off-by: Hao Chen --- .../ray/data/tests/test_resource_manager.py | 102 +++++------------- 1 file changed, 24 insertions(+), 78 deletions(-) diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 7044358aa29..9fe95ab0fdc 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -618,128 +618,75 @@ def test_only_handle_eligible_ops(self, restore_data_context): assert o2 not in allocator._op_budgets def test_gpu_allocation(self, restore_data_context): - """Test GPU allocation logic for operators that need GPU vs those that don't.""" + """Test GPU allocation for GPU vs non-GPU operators.""" DataContext.get_current().op_resource_reservation_enabled = True DataContext.get_current().op_resource_reservation_ratio = 0.5 - # Create operators with different GPU requirements o1 = InputDataBuffer(DataContext.get_current(), []) # Non-GPU operator - o2 = mock_map_op(o1, incremental_resource_usage=ExecutionResources(1, 0, 15)) - - # GPU operator - o3 = mock_map_op( - o2, - ray_remote_args={"num_cpus": 1, "num_gpus": 1}, - incremental_resource_usage=ExecutionResources(1, 1, 15), - ) - - # Another non-GPU operator - o4 = mock_map_op(o3, incremental_resource_usage=ExecutionResources(1, 0, 15)) - - # Mock min_max_resource_requirements to return appropriate GPU requirements + o2 = mock_map_op(o1) o2.min_max_resource_requirements = MagicMock( - return_value=(ExecutionResources(0, 0, 0), ExecutionResources(1, 0, 15)) + return_value=(ExecutionResources(0, 0, 0), ExecutionResources(0, 0, 0)) ) + + # GPU operator + o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1}) o3.min_max_resource_requirements = MagicMock( - return_value=(ExecutionResources(1, 1, 15), ExecutionResources(2, 1, 30)) + return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) ) - o4.min_max_resource_requirements = MagicMock( - return_value=(ExecutionResources(0, 0, 0), ExecutionResources(1, 0, 15)) - ) - - topo, _ = build_streaming_topology(o4, ExecutionOptions()) - # Set up global limits with GPUs available - global_limits = ExecutionResources(cpu=16, gpu=4, object_store_memory=1000) + topo, _ = build_streaming_topology(o3, ExecutionOptions()) - # Set up usage tracking + global_limits = ExecutionResources(gpu=4) op_usages = { o1: ExecutionResources.zero(), - o2: ExecutionResources(1, 0, 10), # Non-GPU op using some resources - o3: ExecutionResources(1, 1, 20), # GPU op using 1 GPU - o4: ExecutionResources(1, 0, 5), # Non-GPU op using some resources + o2: ExecutionResources.zero(), + o3: ExecutionResources(gpu=1), # GPU op using 1 GPU } resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current() ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) - resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3, o4], 0) - resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3, o4], 0) + resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) + resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3], 0) resource_manager.get_global_limits = MagicMock(return_value=global_limits) allocator = resource_manager._op_resource_allocator - assert isinstance(allocator, ReservationOpResourceAllocator) - allocator.update_usages() - # Test GPU allocation - # o2 (non-GPU): should get 0 GPU + # Non-GPU operator should get 0 GPU assert allocator._op_budgets[o2].gpu == 0 - # o3 (GPU): should get remaining GPUs (4 total - 1 used = 3 available) + # GPU operator should get remaining GPUs (4 total - 1 used = 3 available) assert allocator._op_budgets[o3].gpu == 3 - # o4 (non-GPU): should get 0 GPU - assert allocator._op_budgets[o4].gpu == 0 - - # Test scenario with no GPUs available - op_usages[o3] = ExecutionResources(1, 4, 20) # GPU op using all 4 GPUs - allocator.update_usages() - - # o3 should get 0 additional GPUs since all are in use - assert allocator._op_budgets[o3].gpu == 0 - - # Test scenario with more GPUs available - global_limits = ExecutionResources(cpu=16, gpu=8, object_store_memory=1000) - resource_manager.get_global_limits = MagicMock(return_value=global_limits) - op_usages[o3] = ExecutionResources(1, 2, 20) # GPU op using 2 GPUs - allocator.update_usages() - - # o3 should get remaining GPUs (8 total - 2 used = 6 available) - assert allocator._op_budgets[o3].gpu == 6 - - # Non-GPU operators should still get 0 - assert allocator._op_budgets[o2].gpu == 0 - assert allocator._op_budgets[o4].gpu == 0 - def test_multiple_gpu_operators(self, restore_data_context): - """Test that when multiple operators need GPU, each gets all available GPUs.""" + """Test GPU allocation for multiple GPU operators.""" DataContext.get_current().op_resource_reservation_enabled = True DataContext.get_current().op_resource_reservation_ratio = 0.5 o1 = InputDataBuffer(DataContext.get_current(), []) # Two GPU operators - o2 = mock_map_op( - o1, - ray_remote_args={"num_cpus": 1, "num_gpus": 1}, - incremental_resource_usage=ExecutionResources(1, 1, 15), - ) - o3 = mock_map_op( - o2, - ray_remote_args={"num_cpus": 1, "num_gpus": 1}, - incremental_resource_usage=ExecutionResources(1, 1, 15), - ) - - # Mock min_max_resource_requirements for both GPU operators + o2 = mock_map_op(o1, ray_remote_args={"num_gpus": 1}) o2.min_max_resource_requirements = MagicMock( - return_value=(ExecutionResources(1, 1, 15), ExecutionResources(2, 1, 30)) + return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) ) + + o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1}) o3.min_max_resource_requirements = MagicMock( - return_value=(ExecutionResources(1, 1, 15), ExecutionResources(2, 1, 30)) + return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) ) topo, _ = build_streaming_topology(o3, ExecutionOptions()) - global_limits = ExecutionResources(cpu=16, gpu=4, object_store_memory=1000) - + global_limits = ExecutionResources(gpu=4) op_usages = { o1: ExecutionResources.zero(), - o2: ExecutionResources(1, 1, 20), # Using 1 GPU - o3: ExecutionResources(1, 0, 20), # Not using GPU yet + o2: ExecutionResources(gpu=1), # Using 1 GPU + o3: ExecutionResources(gpu=0), # Not using GPU yet } resource_manager = ResourceManager( @@ -753,7 +700,6 @@ def test_multiple_gpu_operators(self, restore_data_context): allocator = resource_manager._op_resource_allocator allocator.update_usages() - # Both GPU operators should get allocated the remaining available GPUs # o2: 4 total - 1 used = 3 available assert allocator._op_budgets[o2].gpu == 3 From 2e6c9791d8ea2cf8ecfe8a39ecfcd7cff1117bfa Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 10 Jul 2025 12:55:17 -0700 Subject: [PATCH 5/8] simplify loop Signed-off-by: Hao Chen --- .../execution/streaming_executor_state.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c075cbfa53d..f658a2ff5eb 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -451,18 +451,14 @@ def process_completed_tasks( max_bytes_to_read_per_op: Dict[OpState, int] = {} for op, state in topology.items(): - # Check all backpressure policies for max_task_output_bytes_to_read - # Use the minimum limit from all policies (most restrictive) - max_bytes_to_read = None - for policy in backpressure_policies: - policy_limit = policy.max_task_output_bytes_to_read(op) - if policy_limit is not None: - if max_bytes_to_read is None: - max_bytes_to_read = policy_limit - else: - max_bytes_to_read = min(max_bytes_to_read, policy_limit) - - # If no policy provides a limit, there's no limit + max_bytes_to_read = min( + ( + limit + for policy in backpressure_policies + if (limit := policy.max_task_output_bytes_to_read(op)) is not None + ), + default=None, + ) op.notify_in_task_output_backpressure(max_bytes_to_read == 0) if max_bytes_to_read is not None: max_bytes_to_read_per_op[state] = max_bytes_to_read From 0ca605821cf13c0bed9cdc7a50c763617995ff98 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 10 Jul 2025 14:29:26 -0700 Subject: [PATCH 6/8] refine test Signed-off-by: Hao Chen --- .../ray/data/tests/test_streaming_executor.py | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index e3aea99ab72..ae9f0e4ef28 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -10,7 +10,9 @@ from ray._private.test_utils import run_string_as_driver_nonblocking from ray.data._internal.datasource.parquet_datasink import ParquetDatasink from ray.data._internal.datasource.parquet_datasource import ParquetDatasource -from ray.data._internal.execution.backpressure_policy import BackpressurePolicy +from ray.data._internal.execution.backpressure_policy.resource_budget_backpressure_policy import ( + ResourceBudgetBackpressurePolicy, +) from ray.data._internal.execution.execution_callback import ( EXECUTION_CALLBACKS_ENV_VAR, ExecutionCallback, @@ -276,35 +278,30 @@ def _get_eligible_ops_to_run(ensure_liveness: bool): assert _get_eligible_ops_to_run(ensure_liveness=False) == [o2] # `o2` operator is now back-pressured - class TestBackpressurePolicy(BackpressurePolicy): - def __init__(self, op_to_block): - self._op_to_block = op_to_block - - def can_add_input(self, op): - if op is self._op_to_block: - return False - return True - - def max_task_output_bytes_to_read(self, op): - return None + with patch.object( + ResourceBudgetBackpressurePolicy, "can_add_input" + ) as mock_can_add_input: + mock_can_add_input.side_effect = lambda op: op is not o2 - test_policy = TestBackpressurePolicy(o2) - - def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool): - return get_eligible_operators( - topo, [test_policy], ensure_liveness=ensure_liveness + test_policy = ResourceBudgetBackpressurePolicy( + MagicMock(), MagicMock(), MagicMock() ) - assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3] + def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool): + return get_eligible_operators( + topo, [test_policy], ensure_liveness=ensure_liveness + ) - # Complete `o3` - with patch.object(o3, "completed") as _mock: - _mock.return_value = True - # Clear up input queue - topo[o3].input_queues[0].clear() + assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3] + + # Complete `o3` + with patch.object(o3, "completed") as _mock: + _mock.return_value = True + # Clear up input queue + topo[o3].input_queues[0].clear() - # To ensure liveness back-pressure limits will be ignored - assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2] + # To ensure liveness back-pressure limits will be ignored + assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2] def test_rank_operators(): From 6492f43e0d17a6c46e398e8d39d37a3ce459f5a3 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 10 Jul 2025 14:43:20 -0700 Subject: [PATCH 7/8] remove None Signed-off-by: Hao Chen --- python/ray/data/_internal/execution/resource_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index e7d0bb66484..ae5c6ffc1d0 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -553,7 +553,7 @@ def _update_reservation(self): self._total_shared = remaining def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: - return self._op_budgets.get(op, None) + return self._op_budgets.get(op) def _should_unblock_streaming_output_backpressure( self, op: PhysicalOperator From 5d1c4fb6b013b5832c15e1195163d9e0c5c2e95d Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 10 Jul 2025 19:24:40 -0700 Subject: [PATCH 8/8] fix Signed-off-by: Hao Chen --- python/ray/data/tests/test_resource_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index a64cd3cd11e..09c4c998263 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -700,8 +700,6 @@ def test_multiple_gpu_operators(self, restore_data_context): topo, ExecutionOptions(), MagicMock(), DataContext.get_current() ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) - resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) - resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3], 0) resource_manager.get_global_limits = MagicMock(return_value=global_limits) allocator = resource_manager._op_resource_allocator