Skip to content

Commit b081a6c

Browse files
authored
[data] Allocate GPU resources in ResourceManager (#54445)
Allocate GPU resources in ResourceManager. Currently we just allocate all available GPUs to all operators that need GPUs. If you have multiple GPU ops, each of them will get all GPUs. This PR is mainly to make the resource budget reporting correct. --------- Signed-off-by: Hao Chen <[email protected]>
1 parent 4ecb03b commit b081a6c

File tree

4 files changed

+133
-47
lines changed

4 files changed

+133
-47
lines changed

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def _update_reservation(self):
553553
self._total_shared = remaining
554554

555555
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
556-
return self._op_budgets.get(op, None)
556+
return self._op_budgets.get(op)
557557

558558
def _should_unblock_streaming_output_backpressure(
559559
self, op: PhysicalOperator
@@ -698,9 +698,15 @@ def update_usages(self):
698698
to_borrow,
699699
)
700700
self._op_budgets[op] = self._op_budgets[op].add(op_shared)
701-
# We don't limit GPU resources, as not all operators
702-
# use GPU resources.
703-
self._op_budgets[op].gpu = float("inf")
701+
if op.min_max_resource_requirements()[1].gpu > 0:
702+
# If an operator needs GPU, we just allocate all GPUs to it.
703+
# TODO(hchen): allocate resources across multiple GPU operators.
704+
self._op_budgets[op].gpu = (
705+
self._resource_manager.get_global_limits().gpu
706+
- self._resource_manager.get_op_usage(op).gpu
707+
)
708+
else:
709+
self._op_budgets[op].gpu = 0
704710

705711
# A materializing operator like `AllToAllOperator` waits for all its input
706712
# operator's outputs before processing data. This often forces the input

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -451,18 +451,14 @@ def process_completed_tasks(
451451

452452
max_bytes_to_read_per_op: Dict[OpState, int] = {}
453453
for op, state in topology.items():
454-
# Check all backpressure policies for max_task_output_bytes_to_read
455-
# Use the minimum limit from all policies (most restrictive)
456-
max_bytes_to_read = None
457-
for policy in backpressure_policies:
458-
policy_limit = policy.max_task_output_bytes_to_read(op)
459-
if policy_limit is not None:
460-
if max_bytes_to_read is None:
461-
max_bytes_to_read = policy_limit
462-
else:
463-
max_bytes_to_read = min(max_bytes_to_read, policy_limit)
464-
465-
# If no policy provides a limit, there's no limit
454+
max_bytes_to_read = min(
455+
(
456+
limit
457+
for policy in backpressure_policies
458+
if (limit := policy.max_task_output_bytes_to_read(op)) is not None
459+
),
460+
default=None,
461+
)
466462
op.notify_in_task_output_backpressure(max_bytes_to_read == 0)
467463
if max_bytes_to_read is not None:
468464
max_bytes_to_read_per_op[state] = max_bytes_to_read

python/ray/data/tests/test_resource_manager.py

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,8 @@ def can_submit_new_task(allocator, op):
395395
# 50% of the global limits are shared.
396396
assert allocator._total_shared == ExecutionResources(8, 0, 500)
397397
# Test budgets.
398-
assert allocator._op_budgets[o2] == ExecutionResources(8, float("inf"), 375)
399-
assert allocator._op_budgets[o3] == ExecutionResources(8, float("inf"), 375)
398+
assert allocator._op_budgets[o2] == ExecutionResources(8, 0, 375)
399+
assert allocator._op_budgets[o3] == ExecutionResources(8, 0, 375)
400400
# Test can_submit_new_task and max_task_output_bytes_to_read.
401401
assert can_submit_new_task(allocator, o2)
402402
assert can_submit_new_task(allocator, o3)
@@ -425,9 +425,9 @@ def can_submit_new_task(allocator, op):
425425
# remaining shared = 1000/2 - 275 = 225
426426
# Test budgets.
427427
# memory_budget[o2] = 0 + 225/2 = 112.5
428-
assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5)
428+
assert allocator._op_budgets[o2] == ExecutionResources(3, 0, 112.5)
429429
# memory_budget[o3] = 95 + 225/2 = 207.5
430-
assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 207.5)
430+
assert allocator._op_budgets[o3] == ExecutionResources(5, 0, 207.5)
431431
# Test can_submit_new_task and max_task_output_bytes_to_read.
432432
assert can_submit_new_task(allocator, o2)
433433
assert can_submit_new_task(allocator, o3)
@@ -461,9 +461,9 @@ def can_submit_new_task(allocator, op):
461461

462462
# Test budgets.
463463
# memory_budget[o2] = 0 + 100/2 = 50
464-
assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50)
464+
assert allocator._op_budgets[o2] == ExecutionResources(1.5, 0, 50)
465465
# memory_budget[o3] = 70 + 100/2 = 120
466-
assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 120)
466+
assert allocator._op_budgets[o3] == ExecutionResources(2.5, 0, 120)
467467
# Test can_submit_new_task and max_task_output_bytes_to_read.
468468
assert can_submit_new_task(allocator, o2)
469469
assert can_submit_new_task(allocator, o3)
@@ -624,6 +624,93 @@ def test_only_handle_eligible_ops(self, restore_data_context):
624624
allocator.update_usages()
625625
assert o2 not in allocator._op_budgets
626626

627+
def test_gpu_allocation(self, restore_data_context):
628+
"""Test GPU allocation for GPU vs non-GPU operators."""
629+
DataContext.get_current().op_resource_reservation_enabled = True
630+
DataContext.get_current().op_resource_reservation_ratio = 0.5
631+
632+
o1 = InputDataBuffer(DataContext.get_current(), [])
633+
634+
# Non-GPU operator
635+
o2 = mock_map_op(o1)
636+
o2.min_max_resource_requirements = MagicMock(
637+
return_value=(ExecutionResources(0, 0, 0), ExecutionResources(0, 0, 0))
638+
)
639+
640+
# GPU operator
641+
o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1})
642+
o3.min_max_resource_requirements = MagicMock(
643+
return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0))
644+
)
645+
646+
topo, _ = build_streaming_topology(o3, ExecutionOptions())
647+
648+
global_limits = ExecutionResources(gpu=4)
649+
op_usages = {
650+
o1: ExecutionResources.zero(),
651+
o2: ExecutionResources.zero(),
652+
o3: ExecutionResources(gpu=1), # GPU op using 1 GPU
653+
}
654+
655+
resource_manager = ResourceManager(
656+
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
657+
)
658+
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
659+
resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0)
660+
resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3], 0)
661+
resource_manager.get_global_limits = MagicMock(return_value=global_limits)
662+
663+
allocator = resource_manager._op_resource_allocator
664+
allocator.update_usages()
665+
666+
# Non-GPU operator should get 0 GPU
667+
assert allocator._op_budgets[o2].gpu == 0
668+
669+
# GPU operator should get remaining GPUs (4 total - 1 used = 3 available)
670+
assert allocator._op_budgets[o3].gpu == 3
671+
672+
def test_multiple_gpu_operators(self, restore_data_context):
673+
"""Test GPU allocation for multiple GPU operators."""
674+
DataContext.get_current().op_resource_reservation_enabled = True
675+
DataContext.get_current().op_resource_reservation_ratio = 0.5
676+
677+
o1 = InputDataBuffer(DataContext.get_current(), [])
678+
679+
# Two GPU operators
680+
o2 = mock_map_op(o1, ray_remote_args={"num_gpus": 1})
681+
o2.min_max_resource_requirements = MagicMock(
682+
return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0))
683+
)
684+
685+
o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1})
686+
o3.min_max_resource_requirements = MagicMock(
687+
return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0))
688+
)
689+
690+
topo, _ = build_streaming_topology(o3, ExecutionOptions())
691+
692+
global_limits = ExecutionResources(gpu=4)
693+
op_usages = {
694+
o1: ExecutionResources.zero(),
695+
o2: ExecutionResources(gpu=1), # Using 1 GPU
696+
o3: ExecutionResources(gpu=0), # Not using GPU yet
697+
}
698+
699+
resource_manager = ResourceManager(
700+
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
701+
)
702+
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
703+
resource_manager.get_global_limits = MagicMock(return_value=global_limits)
704+
705+
allocator = resource_manager._op_resource_allocator
706+
allocator.update_usages()
707+
708+
# o2: 4 total - 1 used = 3 available
709+
assert allocator._op_budgets[o2].gpu == 3
710+
711+
# o3: 4 total - 0 used = 4 available
712+
assert allocator._op_budgets[o3].gpu == 4
713+
627714

628715
if __name__ == "__main__":
629716
import sys

python/ray/data/tests/test_streaming_executor.py

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
from ray._private.test_utils import run_string_as_driver_nonblocking
1111
from ray.data._internal.datasource.parquet_datasink import ParquetDatasink
1212
from ray.data._internal.datasource.parquet_datasource import ParquetDatasource
13-
from ray.data._internal.execution.backpressure_policy import BackpressurePolicy
13+
from ray.data._internal.execution.backpressure_policy.resource_budget_backpressure_policy import (
14+
ResourceBudgetBackpressurePolicy,
15+
)
1416
from ray.data._internal.execution.execution_callback import (
1517
EXECUTION_CALLBACKS_ENV_VAR,
1618
ExecutionCallback,
@@ -276,35 +278,30 @@ def _get_eligible_ops_to_run(ensure_liveness: bool):
276278
assert _get_eligible_ops_to_run(ensure_liveness=False) == [o2]
277279

278280
# `o2` operator is now back-pressured
279-
class TestBackpressurePolicy(BackpressurePolicy):
280-
def __init__(self, op_to_block):
281-
self._op_to_block = op_to_block
282-
283-
def can_add_input(self, op):
284-
if op is self._op_to_block:
285-
return False
286-
return True
287-
288-
def max_task_output_bytes_to_read(self, op):
289-
return None
281+
with patch.object(
282+
ResourceBudgetBackpressurePolicy, "can_add_input"
283+
) as mock_can_add_input:
284+
mock_can_add_input.side_effect = lambda op: op is not o2
290285

291-
test_policy = TestBackpressurePolicy(o2)
292-
293-
def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool):
294-
return get_eligible_operators(
295-
topo, [test_policy], ensure_liveness=ensure_liveness
286+
test_policy = ResourceBudgetBackpressurePolicy(
287+
MagicMock(), MagicMock(), MagicMock()
296288
)
297289

298-
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3]
290+
def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool):
291+
return get_eligible_operators(
292+
topo, [test_policy], ensure_liveness=ensure_liveness
293+
)
299294

300-
# Complete `o3`
301-
with patch.object(o3, "completed") as _mock:
302-
_mock.return_value = True
303-
# Clear up input queue
304-
topo[o3].input_queues[0].clear()
295+
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3]
296+
297+
# Complete `o3`
298+
with patch.object(o3, "completed") as _mock:
299+
_mock.return_value = True
300+
# Clear up input queue
301+
topo[o3].input_queues[0].clear()
305302

306-
# To ensure liveness back-pressure limits will be ignored
307-
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2]
303+
# To ensure liveness back-pressure limits will be ignored
304+
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2]
308305

309306

310307
def test_rank_operators():

0 commit comments

Comments
 (0)