diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 57bc4c7e7cce..ae5c6ffc1d03 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -553,7 +553,7 @@ def _update_reservation(self): self._total_shared = remaining def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: - return self._op_budgets.get(op, None) + return self._op_budgets.get(op) def _should_unblock_streaming_output_backpressure( self, op: PhysicalOperator @@ -698,9 +698,15 @@ def update_usages(self): to_borrow, ) self._op_budgets[op] = self._op_budgets[op].add(op_shared) - # We don't limit GPU resources, as not all operators - # use GPU resources. - self._op_budgets[op].gpu = float("inf") + if op.min_max_resource_requirements()[1].gpu > 0: + # If an operator needs GPU, we just allocate all GPUs to it. + # TODO(hchen): allocate resources across multiple GPU operators. + self._op_budgets[op].gpu = ( + self._resource_manager.get_global_limits().gpu + - self._resource_manager.get_op_usage(op).gpu + ) + else: + self._op_budgets[op].gpu = 0 # A materializing operator like `AllToAllOperator` waits for all its input # operator's outputs before processing data. This often forces the input diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c075cbfa53d1..f658a2ff5eb5 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -451,18 +451,14 @@ def process_completed_tasks( max_bytes_to_read_per_op: Dict[OpState, int] = {} for op, state in topology.items(): - # Check all backpressure policies for max_task_output_bytes_to_read - # Use the minimum limit from all policies (most restrictive) - max_bytes_to_read = None - for policy in backpressure_policies: - policy_limit = policy.max_task_output_bytes_to_read(op) - if policy_limit is not None: - if max_bytes_to_read is None: - max_bytes_to_read = policy_limit - else: - max_bytes_to_read = min(max_bytes_to_read, policy_limit) - - # If no policy provides a limit, there's no limit + max_bytes_to_read = min( + ( + limit + for policy in backpressure_policies + if (limit := policy.max_task_output_bytes_to_read(op)) is not None + ), + default=None, + ) op.notify_in_task_output_backpressure(max_bytes_to_read == 0) if max_bytes_to_read is not None: max_bytes_to_read_per_op[state] = max_bytes_to_read diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 7ae70f166efc..09c4c9982633 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -395,8 +395,8 @@ def can_submit_new_task(allocator, op): # 50% of the global limits are shared. assert allocator._total_shared == ExecutionResources(8, 0, 500) # Test budgets. - assert allocator._op_budgets[o2] == ExecutionResources(8, float("inf"), 375) - assert allocator._op_budgets[o3] == ExecutionResources(8, float("inf"), 375) + assert allocator._op_budgets[o2] == ExecutionResources(8, 0, 375) + assert allocator._op_budgets[o3] == ExecutionResources(8, 0, 375) # Test can_submit_new_task and max_task_output_bytes_to_read. assert can_submit_new_task(allocator, o2) assert can_submit_new_task(allocator, o3) @@ -425,9 +425,9 @@ def can_submit_new_task(allocator, op): # remaining shared = 1000/2 - 275 = 225 # Test budgets. # memory_budget[o2] = 0 + 225/2 = 112.5 - assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5) + assert allocator._op_budgets[o2] == ExecutionResources(3, 0, 112.5) # memory_budget[o3] = 95 + 225/2 = 207.5 - assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 207.5) + assert allocator._op_budgets[o3] == ExecutionResources(5, 0, 207.5) # Test can_submit_new_task and max_task_output_bytes_to_read. assert can_submit_new_task(allocator, o2) assert can_submit_new_task(allocator, o3) @@ -461,9 +461,9 @@ def can_submit_new_task(allocator, op): # Test budgets. # memory_budget[o2] = 0 + 100/2 = 50 - assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50) + assert allocator._op_budgets[o2] == ExecutionResources(1.5, 0, 50) # memory_budget[o3] = 70 + 100/2 = 120 - assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 120) + assert allocator._op_budgets[o3] == ExecutionResources(2.5, 0, 120) # Test can_submit_new_task and max_task_output_bytes_to_read. assert can_submit_new_task(allocator, o2) assert can_submit_new_task(allocator, o3) @@ -624,6 +624,93 @@ def test_only_handle_eligible_ops(self, restore_data_context): allocator.update_usages() assert o2 not in allocator._op_budgets + def test_gpu_allocation(self, restore_data_context): + """Test GPU allocation for GPU vs non-GPU operators.""" + DataContext.get_current().op_resource_reservation_enabled = True + DataContext.get_current().op_resource_reservation_ratio = 0.5 + + o1 = InputDataBuffer(DataContext.get_current(), []) + + # Non-GPU operator + o2 = mock_map_op(o1) + o2.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(0, 0, 0), ExecutionResources(0, 0, 0)) + ) + + # GPU operator + o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1}) + o3.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) + ) + + topo, _ = build_streaming_topology(o3, ExecutionOptions()) + + global_limits = ExecutionResources(gpu=4) + op_usages = { + o1: ExecutionResources.zero(), + o2: ExecutionResources.zero(), + o3: ExecutionResources(gpu=1), # GPU op using 1 GPU + } + + resource_manager = ResourceManager( + topo, ExecutionOptions(), MagicMock(), DataContext.get_current() + ) + resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) + resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) + resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3], 0) + resource_manager.get_global_limits = MagicMock(return_value=global_limits) + + allocator = resource_manager._op_resource_allocator + allocator.update_usages() + + # Non-GPU operator should get 0 GPU + assert allocator._op_budgets[o2].gpu == 0 + + # GPU operator should get remaining GPUs (4 total - 1 used = 3 available) + assert allocator._op_budgets[o3].gpu == 3 + + def test_multiple_gpu_operators(self, restore_data_context): + """Test GPU allocation for multiple GPU operators.""" + DataContext.get_current().op_resource_reservation_enabled = True + DataContext.get_current().op_resource_reservation_ratio = 0.5 + + o1 = InputDataBuffer(DataContext.get_current(), []) + + # Two GPU operators + o2 = mock_map_op(o1, ray_remote_args={"num_gpus": 1}) + o2.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) + ) + + o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1}) + o3.min_max_resource_requirements = MagicMock( + return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) + ) + + topo, _ = build_streaming_topology(o3, ExecutionOptions()) + + global_limits = ExecutionResources(gpu=4) + op_usages = { + o1: ExecutionResources.zero(), + o2: ExecutionResources(gpu=1), # Using 1 GPU + o3: ExecutionResources(gpu=0), # Not using GPU yet + } + + resource_manager = ResourceManager( + topo, ExecutionOptions(), MagicMock(), DataContext.get_current() + ) + resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) + resource_manager.get_global_limits = MagicMock(return_value=global_limits) + + allocator = resource_manager._op_resource_allocator + allocator.update_usages() + + # o2: 4 total - 1 used = 3 available + assert allocator._op_budgets[o2].gpu == 3 + + # o3: 4 total - 0 used = 4 available + assert allocator._op_budgets[o3].gpu == 4 + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index e3aea99ab722..ae9f0e4ef286 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -10,7 +10,9 @@ from ray._private.test_utils import run_string_as_driver_nonblocking from ray.data._internal.datasource.parquet_datasink import ParquetDatasink from ray.data._internal.datasource.parquet_datasource import ParquetDatasource -from ray.data._internal.execution.backpressure_policy import BackpressurePolicy +from ray.data._internal.execution.backpressure_policy.resource_budget_backpressure_policy import ( + ResourceBudgetBackpressurePolicy, +) from ray.data._internal.execution.execution_callback import ( EXECUTION_CALLBACKS_ENV_VAR, ExecutionCallback, @@ -276,35 +278,30 @@ def _get_eligible_ops_to_run(ensure_liveness: bool): assert _get_eligible_ops_to_run(ensure_liveness=False) == [o2] # `o2` operator is now back-pressured - class TestBackpressurePolicy(BackpressurePolicy): - def __init__(self, op_to_block): - self._op_to_block = op_to_block - - def can_add_input(self, op): - if op is self._op_to_block: - return False - return True - - def max_task_output_bytes_to_read(self, op): - return None + with patch.object( + ResourceBudgetBackpressurePolicy, "can_add_input" + ) as mock_can_add_input: + mock_can_add_input.side_effect = lambda op: op is not o2 - test_policy = TestBackpressurePolicy(o2) - - def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool): - return get_eligible_operators( - topo, [test_policy], ensure_liveness=ensure_liveness + test_policy = ResourceBudgetBackpressurePolicy( + MagicMock(), MagicMock(), MagicMock() ) - assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3] + def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool): + return get_eligible_operators( + topo, [test_policy], ensure_liveness=ensure_liveness + ) - # Complete `o3` - with patch.object(o3, "completed") as _mock: - _mock.return_value = True - # Clear up input queue - topo[o3].input_queues[0].clear() + assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3] + + # Complete `o3` + with patch.object(o3, "completed") as _mock: + _mock.return_value = True + # Clear up input queue + topo[o3].input_queues[0].clear() - # To ensure liveness back-pressure limits will be ignored - assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2] + # To ensure liveness back-pressure limits will be ignored + assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2] def test_rank_operators():