Skip to content

[data] Extract backpressure-related code from ResourceManager as a policy #54376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 8, 2025

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Jul 7, 2025

  • Extract backpressure related methods (can_submit_new_tasks and max_task_output_bytes_to_read) from ResourceManager, as a standalone policy ResourceBudgetBackpressurePolicy)
  • Report task_output_backpressure_time metric.

@Copilot Copilot AI review requested due to automatic review settings July 7, 2025 18:37
@raulchen raulchen requested a review from a team as a code owner July 7, 2025 18:37
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR extracts backpressure logic from ResourceManager into a standalone ResourceBudgetBackpressurePolicy, integrates configurable backpressure policies into the streaming executor for both task submission and task output, and adds a new task_output_backpressure_time metric.

  • Introduce a BackpressurePolicy interface with methods for input (can_add_input) and output (max_task_output_bytes_to_read) backpressure and extract a resource-based policy.
  • Refactor process_completed_tasks and get_eligible_operators to use a list of backpressure policies instead of tightly coupling to ResourceManager.
  • Add task_output_backpressure_time metric in OpRuntimeMetrics and notify operators of output backpressure.

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
python/ray/data/tests/test_streaming_executor.py Updated tests to call process_completed_tasks and get_eligible_operators with policy lists and introduced a TestBackpressurePolicy.
python/ray/data/tests/test_resource_manager.py Replaced direct allocator.can_submit_new_task calls with a helper that uses get_budget.
python/ray/data/_internal/execution/streaming_executor_state.py Refactored process_completed_tasks and get_eligible_operators to use backpressure policy lists.
python/ray/data/_internal/execution/streaming_executor.py Updated executor initialization to pass data_context, topology, and resource_manager into policy factory and use a list of policies.
python/ray/data/_internal/execution/resource_manager.py Exposed max_task_output_bytes_to_read and get_budget wrappers and removed can_submit_new_task from the allocator interface.
python/ray/data/_internal/execution/interfaces/physical_operator.py Added notify_in_task_output_backpressure for output backpressure metrics.
python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py Added task_output_backpressure_time metric and timing logic.
python/ray/data/_internal/execution/backpressure_policy/backpressure_policy.py Defined BackpressurePolicy base class with new constructor and default output method.
python/ray/data/_internal/execution/backpressure_policy/resource_budget_backpressure_policy.py New ResourceBudgetBackpressurePolicy implementation.
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py Adapted constructor to accept common parameters via super().
python/ray/data/_internal/execution/backpressure_policy/init.py Updated get_backpressure_policies to pass data_context, topology, and resource_manager.
Comments suppressed due to low confidence (1)

python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py:361

  • Tests for the new task_output_backpressure_time metric are missing. Add unit tests to verify that on_toggle_task_output_backpressure correctly starts, stops, and accumulates the backpressure timer.
    task_output_backpressure_time: float = metric_field(

raulchen added 2 commits July 7, 2025 11:40
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Copy link
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nits but lgtm!

if max_bytes_to_read is not None:
max_bytes_to_read_per_op[state] = max_bytes_to_read
for op, state in topology.items():
# Check all backpressure policies for max_task_output_bytes_to_read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For future debugging purposes, do we want to log the backpressure policy that is being used? Wondering about the case where one might be overly restrictive but it becomes hard to tell which one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I thought of that. but not sure what's the best way to surface this info (too long to progress bars). I'll leave a TODO here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I wonder if just logging at the debug level would be sufficient? If that would cause it to only appear in the ray_data.log file then maybe that would work? Fine to punt on that though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even logging might be too verbose.
given that we don't have that many policies right now. Logging that doesn't add much value.
So I'll leave it for now

in_backpressure = not under_resource_limits or not all(
p.can_add_input(op) for p in backpressure_policies
)
# Operator is considered being in task-submission back-pressure any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think missing an if: "Operator is considered being in task-submission back-pressure any" -> "Operator is considered being in task-submission back-pressure if any"

Signed-off-by: Hao Chen <[email protected]>
Comment on lines 23 to 24
"""Initialize the backpressure policy.
Args:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Initialize the backpressure policy.
Args:
"""Initialize the backpressure policy.
Args:

raulchen added 3 commits July 7, 2025 17:07
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
@raulchen raulchen enabled auto-merge (squash) July 8, 2025 00:31
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Jul 8, 2025
raulchen added 4 commits July 8, 2025 13:09
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
@github-actions github-actions bot disabled auto-merge July 8, 2025 20:26
@raulchen raulchen enabled auto-merge (squash) July 8, 2025 20:48
Comment on lines +361 to +365
task_output_backpressure_time: float = metric_field(
default=0,
description="Time spent in task output backpressure.",
metrics_group=MetricsGroup.TASKS,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we track this as per-task metric? I don't think cumulative one is very useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It keeps track of the total time when the op is backpressured.
Op-level metric would be more useful as we can show them in the Grafana dashboard.
task-level metric will result in too many lines and make the chart difficult to read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can be the cumulative time be useful?

task-level metric will result in too many lines

Well, you don't show every line you just show the distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can handle that later. currently we already report cumulative metric for task submission backpressure time.

def get_budget(self, op: PhysicalOperator) -> ExecutionResources:
return self._op_budgets[op]
def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
return self._op_budgets.get(op, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self._op_budgets.get(op, None)
return self._op_budgets.get(op)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally changed this to return an optional. Because ineligible ops don't have budgets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i understand that but what's the point of specifying default? It will return none anyways

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. somehow I thought it raises key error by default.

Comment on lines +456 to +463
max_bytes_to_read = None
for policy in backpressure_policies:
policy_limit = policy.max_task_output_bytes_to_read(op)
if policy_limit is not None:
if max_bytes_to_read is None:
max_bytes_to_read = policy_limit
else:
max_bytes_to_read = min(max_bytes_to_read, policy_limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
max_bytes_to_read = None
for policy in backpressure_policies:
policy_limit = policy.max_task_output_bytes_to_read(op)
if policy_limit is not None:
if max_bytes_to_read is None:
max_bytes_to_read = policy_limit
else:
max_bytes_to_read = min(max_bytes_to_read, policy_limit)
max_bytes = min([p.max_task_output_bytes_to_read(op) or float("inf") for p in policies])

Comment on lines +600 to +602
# Operator is considered being in task-submission back-pressure if any
# back-pressure policy is violated
in_backpressure = any(not p.can_add_input(op) for p in backpressure_policies)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @omatthew98 pointed out above -- let's make back-pressuring traceable (by logging when the op becomes throttled first time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The op will be flipping between backpressure and non-backpressure status pretty frequently. I don't think logging the first time would be useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be every-time it goes from no-throttling to being throttled

) as _mock:
_mock.side_effect = lambda op: False if op is o2 else True
assert _get_eligible_ops_to_run(ensure_liveness=False) == [o3]
class TestBackpressurePolicy(BackpressurePolicy):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's actually test actual back-pressure policies that we have

Comment on lines +17 to +32
def can_add_input(self, op: "PhysicalOperator") -> bool:
budget = self._resource_manager.get_budget(op)
if budget is None:
return True
return op.incremental_resource_usage().satisfies_limit(budget)

def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
"""Determine maximum bytes to read based on the resource budgets.

Args:
op: The operator to get the limit for.

Returns:
The maximum bytes that can be read, or None if no limit.
"""
return self._resource_manager.max_task_output_bytes_to_read(op)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we keep existing coverage of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's already tested in test_resource_manager.

@alexeykudinkin alexeykudinkin disabled auto-merge July 8, 2025 21:07
@raulchen raulchen merged commit bbf024d into ray-project:master Jul 8, 2025
6 checks passed
@raulchen raulchen deleted the refactor-resource-manager branch July 8, 2025 22:51
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Jul 9, 2025
…licy (ray-project#54376)

* Extract backpressure related methods (`can_submit_new_tasks` and
`max_task_output_bytes_to_read`) from ResourceManager, as a standalone
policy `ResourceBudgetBackpressurePolicy`)
* Report task_output_backpressure_time metric.

---------

Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: doyoung <[email protected]>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Jul 9, 2025
…licy (ray-project#54376)

* Extract backpressure related methods (`can_submit_new_tasks` and
`max_task_output_bytes_to_read`) from ResourceManager, as a standalone
policy `ResourceBudgetBackpressurePolicy`)
* Report task_output_backpressure_time metric.

---------

Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: doyoung <[email protected]>
ccmao1130 pushed a commit to ccmao1130/ray that referenced this pull request Jul 29, 2025
…licy (ray-project#54376)

* Extract backpressure related methods (`can_submit_new_tasks` and
`max_task_output_bytes_to_read`) from ResourceManager, as a standalone
policy `ResourceBudgetBackpressurePolicy`)
* Report task_output_backpressure_time metric.

---------

Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: ChanChan Mao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants