Skip to content

[data] Extract backpressure-related code from ResourceManager as a policy #54376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 8, 2025
1 change: 0 additions & 1 deletion ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,6 @@ python/ray/data/_internal/execution/streaming_executor_state.py
DOC201: Method `OpBufferQueue.has_next` does not have a return section in docstring
DOC101: Method `OpState.get_output_blocking`: Docstring contains fewer arguments than in function signature.
DOC103: Method `OpState.get_output_blocking`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [output_split_idx: Optional[int]].
DOC103: Function `process_completed_tasks`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [resource_manager: ResourceManager]. Arguments in the docstring but not in the function signature: [backpressure_policies: ].
--------------------
python/ray/data/_internal/iterator/stream_split_iterator.py
DOC101: Method `SplitCoordinator.start_epoch`: Docstring contains fewer arguments than in function signature.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

import ray
from .backpressure_policy import BackpressurePolicy
from .concurrency_cap_backpressure_policy import ConcurrencyCapBackpressurePolicy
from .resource_budget_backpressure_policy import ResourceBudgetBackpressurePolicy
from ray.data.context import DataContext

if TYPE_CHECKING:
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import Topology

# Default enabled backpressure policies and its config key.
# Use `DataContext.set_config` to config it.
ENABLED_BACKPRESSURE_POLICIES = [
ConcurrencyCapBackpressurePolicy,
ResourceBudgetBackpressurePolicy,
]
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled"


def get_backpressure_policies(topology: "Topology"):
data_context = ray.data.DataContext.get_current()
def get_backpressure_policies(
data_context: DataContext,
topology: "Topology",
resource_manager: "ResourceManager",
) -> List[BackpressurePolicy]:
policies = data_context.get_config(
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, ENABLED_BACKPRESSURE_POLICIES
)

return [policy(topology) for policy in policies]
return [policy(data_context, topology, resource_manager) for policy in policies]


__all__ = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from abc import ABC
from typing import TYPE_CHECKING, Optional

from ray.data.context import DataContext

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import Topology


class BackpressurePolicy(ABC):
"""Interface for back pressure policies."""

@abstractmethod
def __init__(self, topology: "Topology"):
...
def __init__(
self,
data_context: DataContext,
topology: "Topology",
resource_manager: "ResourceManager",
):
"""Initialize the backpressure policy.

Args:
data_context: The data context.
topology: The execution topology.
resource_manager: The resource manager.
"""
self._data_context = data_context
self._topology = topology
self._resource_manager = resource_manager

def can_add_input(self, op: "PhysicalOperator") -> bool:
"""Determine if we can add a new input to the operator. If returns False, the
Expand All @@ -26,3 +42,21 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:
backpressured if any of the policies returns False.
"""
return True

def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
"""Return the maximum bytes of pending task outputs can be read for
the given operator. None means no limit.

This is used for output backpressure to limit how much data an operator
can read from its running tasks.

Note, if multiple backpressure policies return non-None values for an operator,
the minimum of those values will be used as the limit.

Args:
op: The operator to get the limit for.

Returns:
The maximum bytes that can be read, or None if no limit.
"""
return None
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)
from ray.data._internal.execution.streaming_executor_state import Topology

logger = logging.getLogger(__name__)

Expand All @@ -25,10 +24,11 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
TODO(chengsu): Consolidate with actor scaling logic of `ActorPoolMapOperator`.
"""

def __init__(self, topology: "Topology"):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._concurrency_caps: dict["PhysicalOperator", float] = {}

for op, _ in topology.items():
for op, _ in self._topology.items():
if isinstance(op, TaskPoolMapOperator) and op.get_concurrency() is not None:
self._concurrency_caps[op] = op.get_concurrency()
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
from typing import TYPE_CHECKING, Optional

from .backpressure_policy import BackpressurePolicy

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)

logger = logging.getLogger(__name__)


class ResourceBudgetBackpressurePolicy(BackpressurePolicy):
"""A backpressure policy based on resource budgets in ResourceManager."""

def can_add_input(self, op: "PhysicalOperator") -> bool:
budget = self._resource_manager.get_budget(op)
if budget is None:
return True
return op.incremental_resource_usage().satisfies_limit(budget)

def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
"""Determine maximum bytes to read based on the resource budgets.

Args:
op: The operator to get the limit for.

Returns:
The maximum bytes that can be read, or None if no limit.
"""
return self._resource_manager.max_task_output_bytes_to_read(op)
Comment on lines +17 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we keep existing coverage of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's already tested in test_resource_manager.

Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
description="Time spent in task submission backpressure.",
metrics_group=MetricsGroup.TASKS,
)
task_output_backpressure_time: float = metric_field(
default=0,
description="Time spent in task output backpressure.",
metrics_group=MetricsGroup.TASKS,
)
Comment on lines +361 to +365
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we track this as per-task metric? I don't think cumulative one is very useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It keeps track of the total time when the op is backpressured.
Op-level metric would be more useful as we can show them in the Grafana dashboard.
task-level metric will result in too many lines and make the chart difficult to read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can be the cumulative time be useful?

task-level metric will result in too many lines

Well, you don't show every line you just show the distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can handle that later. currently we already report cumulative metric for task submission backpressure time.

histogram_buckets_s = [
0.1,
0.25,
Expand Down Expand Up @@ -446,6 +451,8 @@ def __init__(self, op: "PhysicalOperator"):
self._extra_metrics: Dict[str, Any] = {}
# Start time of current pause due to task submission backpressure
self._task_submission_backpressure_start_time = -1
# Start time of current pause due to task output backpressure
self._task_output_backpressure_start_time = -1

self._internal_inqueue = create_bundle_queue()
self._internal_outqueue = create_bundle_queue()
Expand Down Expand Up @@ -667,6 +674,17 @@ def on_toggle_task_submission_backpressure(self, in_backpressure):
)
self._task_submission_backpressure_start_time = -1

def on_toggle_task_output_backpressure(self, in_backpressure):
if in_backpressure and self._task_output_backpressure_start_time == -1:
# backpressure starting, start timer
self._task_output_backpressure_start_time = time.perf_counter()
elif self._task_output_backpressure_start_time != -1:
# backpressure stopping, stop timer
self.task_output_backpressure_time += (
time.perf_counter() - self._task_output_backpressure_start_time
)
self._task_output_backpressure_start_time = -1

def on_output_taken(self, output: RefBundle):
"""Callback when an output is taken from the operator."""
self.num_outputs_taken += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,18 @@ def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
self._in_task_submission_backpressure = in_backpressure

def notify_in_task_output_backpressure(self, in_backpressure: bool) -> None:
"""Called periodically from the executor to update internal output backpressure
status for stats collection purposes.

Args:
in_backpressure: Value this operator's output backpressure should be set to.
"""
# only update on change to in_backpressure
if self._in_task_output_backpressure != in_backpressure:
self._metrics.on_toggle_task_output_backpressure(in_backpressure)
self._in_task_output_backpressure = in_backpressure

def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
return []
Expand Down
35 changes: 19 additions & 16 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,20 @@ def op_resource_allocator(self) -> "OpResourceAllocator":
assert self._op_resource_allocator is not None
return self._op_resource_allocator

def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
"""Return the maximum bytes of pending task outputs can be read for
the given operator. None means no limit."""
if self._op_resource_allocator is None:
return None
return self._op_resource_allocator.max_task_output_bytes_to_read(op)

def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
"""Return the budget for the given operator, or None if the operator
has unlimited budget."""
if self._op_resource_allocator is None:
return None
return self._op_resource_allocator.get_budget(op)


class OpResourceAllocator(ABC):
"""An interface for dynamic operator resource allocation.
Expand All @@ -323,20 +337,16 @@ def update_usages(self):
"""Callback to update resource usages."""
...

@abstractmethod
def can_submit_new_task(self, op: PhysicalOperator) -> bool:
"""Return whether the given operator can submit a new task."""
...

@abstractmethod
def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
"""Return the maximum bytes of pending task outputs can be read for
the given operator. None means no limit."""
...

@abstractmethod
def get_budget(self, op: PhysicalOperator) -> ExecutionResources:
"""Return the budget for the given operator."""
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
"""Return the budget for the given operator, or None if the operator
has unlimited budget."""
...


Expand Down Expand Up @@ -542,15 +552,8 @@ def _update_reservation(self):

self._total_shared = remaining

def can_submit_new_task(self, op: PhysicalOperator) -> bool:
if op not in self._op_budgets:
return True
budget = self._op_budgets[op]
res = op.incremental_resource_usage().satisfies_limit(budget)
return res

def get_budget(self, op: PhysicalOperator) -> ExecutionResources:
return self._op_budgets[op]
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
return self._op_budgets.get(op, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self._op_budgets.get(op, None)
return self._op_budgets.get(op)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally changed this to return an optional. Because ineligible ops don't have budgets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i understand that but what's the point of specifying default? It will return none anyways

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. somehow I thought it raises key error by default.


def _should_unblock_streaming_output_backpressure(
self, op: PhysicalOperator
Expand Down
8 changes: 5 additions & 3 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,17 @@ def execute(
lambda: self._autoscaler.get_total_resources(),
self._data_context,
)
self._backpressure_policies = get_backpressure_policies(self._topology)
self._backpressure_policies = get_backpressure_policies(
self._data_context, self._topology, self._resource_manager
)
self._autoscaler = create_autoscaler(
self._topology,
self._resource_manager,
config=self._data_context.autoscaling_config,
execution_id=self._dataset_id,
)

self._has_op_completed = {op: False for op in self._topology}
self._has_op_completed = dict.fromkeys(self._topology, False)

self._output_node = dag, self._topology[dag]

Expand Down Expand Up @@ -337,7 +339,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
# greater parallelism.
num_errored_blocks = process_completed_tasks(
topology,
self._resource_manager,
self._backpressure_policies,
self._max_errored_blocks,
)
if self._max_errored_blocks > 0:
Expand Down
Loading