Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def _update_reservation(self):
self._total_shared = remaining

def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
return self._op_budgets.get(op, None)
return self._op_budgets.get(op)

def _should_unblock_streaming_output_backpressure(
self, op: PhysicalOperator
Expand Down Expand Up @@ -698,9 +698,15 @@ def update_usages(self):
to_borrow,
)
self._op_budgets[op] = self._op_budgets[op].add(op_shared)
# We don't limit GPU resources, as not all operators
# use GPU resources.
self._op_budgets[op].gpu = float("inf")
if op.min_max_resource_requirements()[1].gpu > 0:
# If an operator needs GPU, we just allocate all GPUs to it.
# TODO(hchen): allocate resources across multiple GPU operators.
self._op_budgets[op].gpu = (
self._resource_manager.get_global_limits().gpu
- self._resource_manager.get_op_usage(op).gpu
)
else:
self._op_budgets[op].gpu = 0

# A materializing operator like `AllToAllOperator` waits for all its input
# operator's outputs before processing data. This often forces the input
Expand Down
20 changes: 8 additions & 12 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,14 @@ def process_completed_tasks(

max_bytes_to_read_per_op: Dict[OpState, int] = {}
for op, state in topology.items():
# Check all backpressure policies for max_task_output_bytes_to_read
# Use the minimum limit from all policies (most restrictive)
max_bytes_to_read = None
for policy in backpressure_policies:
policy_limit = policy.max_task_output_bytes_to_read(op)
if policy_limit is not None:
if max_bytes_to_read is None:
max_bytes_to_read = policy_limit
else:
max_bytes_to_read = min(max_bytes_to_read, policy_limit)

# If no policy provides a limit, there's no limit
max_bytes_to_read = min(
(
limit
for policy in backpressure_policies
if (limit := policy.max_task_output_bytes_to_read(op)) is not None
),
default=None,
)
Comment on lines +454 to +461
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

op.notify_in_task_output_backpressure(max_bytes_to_read == 0)
if max_bytes_to_read is not None:
max_bytes_to_read_per_op[state] = max_bytes_to_read
Expand Down
99 changes: 93 additions & 6 deletions python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ def can_submit_new_task(allocator, op):
# 50% of the global limits are shared.
assert allocator._total_shared == ExecutionResources(8, 0, 500)
# Test budgets.
assert allocator._op_budgets[o2] == ExecutionResources(8, float("inf"), 375)
assert allocator._op_budgets[o3] == ExecutionResources(8, float("inf"), 375)
assert allocator._op_budgets[o2] == ExecutionResources(8, 0, 375)
assert allocator._op_budgets[o3] == ExecutionResources(8, 0, 375)
# Test can_submit_new_task and max_task_output_bytes_to_read.
assert can_submit_new_task(allocator, o2)
assert can_submit_new_task(allocator, o3)
Expand Down Expand Up @@ -425,9 +425,9 @@ def can_submit_new_task(allocator, op):
# remaining shared = 1000/2 - 275 = 225
# Test budgets.
# memory_budget[o2] = 0 + 225/2 = 112.5
assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5)
assert allocator._op_budgets[o2] == ExecutionResources(3, 0, 112.5)
# memory_budget[o3] = 95 + 225/2 = 207.5
assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 207.5)
assert allocator._op_budgets[o3] == ExecutionResources(5, 0, 207.5)
# Test can_submit_new_task and max_task_output_bytes_to_read.
assert can_submit_new_task(allocator, o2)
assert can_submit_new_task(allocator, o3)
Expand Down Expand Up @@ -461,9 +461,9 @@ def can_submit_new_task(allocator, op):

# Test budgets.
# memory_budget[o2] = 0 + 100/2 = 50
assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50)
assert allocator._op_budgets[o2] == ExecutionResources(1.5, 0, 50)
# memory_budget[o3] = 70 + 100/2 = 120
assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 120)
assert allocator._op_budgets[o3] == ExecutionResources(2.5, 0, 120)
# Test can_submit_new_task and max_task_output_bytes_to_read.
assert can_submit_new_task(allocator, o2)
assert can_submit_new_task(allocator, o3)
Expand Down Expand Up @@ -624,6 +624,93 @@ def test_only_handle_eligible_ops(self, restore_data_context):
allocator.update_usages()
assert o2 not in allocator._op_budgets

def test_gpu_allocation(self, restore_data_context):
"""Test GPU allocation for GPU vs non-GPU operators."""
DataContext.get_current().op_resource_reservation_enabled = True
DataContext.get_current().op_resource_reservation_ratio = 0.5
Comment on lines +629 to +630
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and for the test below -- aren't these the defaults? Are they necessary for this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to make the test not depend on the defaults.
so it doesn't break if we change the behavior. (We just need to test the logic. defaults don't matter)


o1 = InputDataBuffer(DataContext.get_current(), [])

# Non-GPU operator
o2 = mock_map_op(o1)
o2.min_max_resource_requirements = MagicMock(
return_value=(ExecutionResources(0, 0, 0), ExecutionResources(0, 0, 0))
)

# GPU operator
o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1})
o3.min_max_resource_requirements = MagicMock(
return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0))
)

topo, _ = build_streaming_topology(o3, ExecutionOptions())

global_limits = ExecutionResources(gpu=4)
op_usages = {
o1: ExecutionResources.zero(),
o2: ExecutionResources.zero(),
o3: ExecutionResources(gpu=1), # GPU op using 1 GPU
}

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0)
resource_manager._mem_op_outputs = dict.fromkeys([o1, o2, o3], 0)
Comment on lines +659 to +660
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC why do we need to configure these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually not needed. removed

resource_manager.get_global_limits = MagicMock(return_value=global_limits)

allocator = resource_manager._op_resource_allocator
allocator.update_usages()

# Non-GPU operator should get 0 GPU
assert allocator._op_budgets[o2].gpu == 0

# GPU operator should get remaining GPUs (4 total - 1 used = 3 available)
assert allocator._op_budgets[o3].gpu == 3

def test_multiple_gpu_operators(self, restore_data_context):
"""Test GPU allocation for multiple GPU operators."""
DataContext.get_current().op_resource_reservation_enabled = True
DataContext.get_current().op_resource_reservation_ratio = 0.5

o1 = InputDataBuffer(DataContext.get_current(), [])

# Two GPU operators
o2 = mock_map_op(o1, ray_remote_args={"num_gpus": 1})
o2.min_max_resource_requirements = MagicMock(
return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0))
)

o3 = mock_map_op(o2, ray_remote_args={"num_gpus": 1})
o3.min_max_resource_requirements = MagicMock(
return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0))
)

topo, _ = build_streaming_topology(o3, ExecutionOptions())

global_limits = ExecutionResources(gpu=4)
op_usages = {
o1: ExecutionResources.zero(),
o2: ExecutionResources(gpu=1), # Using 1 GPU
o3: ExecutionResources(gpu=0), # Not using GPU yet
}

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
resource_manager.get_global_limits = MagicMock(return_value=global_limits)

allocator = resource_manager._op_resource_allocator
allocator.update_usages()

# o2: 4 total - 1 used = 3 available
assert allocator._op_budgets[o2].gpu == 3

# o3: 4 total - 0 used = 4 available
assert allocator._op_budgets[o3].gpu == 4


if __name__ == "__main__":
import sys
Expand Down
47 changes: 22 additions & 25 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from ray._private.test_utils import run_string_as_driver_nonblocking
from ray.data._internal.datasource.parquet_datasink import ParquetDatasink
from ray.data._internal.datasource.parquet_datasource import ParquetDatasource
from ray.data._internal.execution.backpressure_policy import BackpressurePolicy
from ray.data._internal.execution.backpressure_policy.resource_budget_backpressure_policy import (
ResourceBudgetBackpressurePolicy,
)
from ray.data._internal.execution.execution_callback import (
EXECUTION_CALLBACKS_ENV_VAR,
ExecutionCallback,
Expand Down Expand Up @@ -276,35 +278,30 @@ def _get_eligible_ops_to_run(ensure_liveness: bool):
assert _get_eligible_ops_to_run(ensure_liveness=False) == [o2]

# `o2` operator is now back-pressured
class TestBackpressurePolicy(BackpressurePolicy):
def __init__(self, op_to_block):
self._op_to_block = op_to_block

def can_add_input(self, op):
if op is self._op_to_block:
return False
return True

def max_task_output_bytes_to_read(self, op):
return None
with patch.object(
ResourceBudgetBackpressurePolicy, "can_add_input"
) as mock_can_add_input:
mock_can_add_input.side_effect = lambda op: op is not o2

test_policy = TestBackpressurePolicy(o2)

def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool):
return get_eligible_operators(
topo, [test_policy], ensure_liveness=ensure_liveness
test_policy = ResourceBudgetBackpressurePolicy(
MagicMock(), MagicMock(), MagicMock()
)

assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3]
def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool):
return get_eligible_operators(
topo, [test_policy], ensure_liveness=ensure_liveness
)

# Complete `o3`
with patch.object(o3, "completed") as _mock:
_mock.return_value = True
# Clear up input queue
topo[o3].input_queues[0].clear()
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3]

# Complete `o3`
with patch.object(o3, "completed") as _mock:
_mock.return_value = True
# Clear up input queue
topo[o3].input_queues[0].clear()

# To ensure liveness back-pressure limits will be ignored
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2]
# To ensure liveness back-pressure limits will be ignored
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=True) == [o2]


def test_rank_operators():
Expand Down