Skip to content

Commit 46ac298

Browse files
sampan-s-nayaksampan
andauthored
[core] Reapply aggregator refactoring changes + improvements to match existing memory consumption (#57078)
Signed-off-by: sampan <[email protected]> Co-authored-by: sampan <[email protected]>
1 parent f975a67 commit 46ac298

12 files changed

+1303
-344
lines changed

python/ray/dashboard/modules/aggregator/aggregator_agent.py

Lines changed: 91 additions & 327 deletions
Large diffs are not rendered by default.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
AGGREGATOR_AGENT_METRIC_PREFIX = "aggregator_agent"
2+
CONSUMER_TAG_KEY = "consumer"
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import asyncio
2+
import time
3+
from collections import deque
4+
from dataclasses import dataclass
5+
from typing import Dict, List, Optional
6+
7+
from ray._private.telemetry.open_telemetry_metric_recorder import (
8+
OpenTelemetryMetricRecorder,
9+
)
10+
from ray.core.generated import (
11+
events_base_event_pb2,
12+
)
13+
from ray.core.generated.events_base_event_pb2 import RayEvent
14+
from ray.dashboard.modules.aggregator.constants import (
15+
AGGREGATOR_AGENT_METRIC_PREFIX,
16+
CONSUMER_TAG_KEY,
17+
)
18+
19+
20+
@dataclass
21+
class _ConsumerState:
22+
# Index of the next event to be consumed by this consumer
23+
cursor_index: int
24+
25+
26+
class MultiConsumerEventBuffer:
27+
"""A buffer which allows adding one event at a time and consuming events in batches.
28+
Supports multiple consumers, each with their own cursor index. Tracks the number of events evicted for each consumer.
29+
30+
Buffer is not thread-safe but is asyncio-friendly. All operations must be called from within the same event loop.
31+
32+
Arguments:
33+
max_size: Maximum number of events to store in the buffer.
34+
max_batch_size: Maximum number of events to return in a batch when calling wait_for_batch.
35+
common_metric_tags: Tags to add to all metrics.
36+
"""
37+
38+
def __init__(
39+
self,
40+
max_size: int,
41+
max_batch_size: int,
42+
common_metric_tags: Optional[Dict[str, str]] = None,
43+
):
44+
self._buffer = deque(maxlen=max_size)
45+
self._max_size = max_size
46+
self._lock = asyncio.Lock()
47+
self._has_new_events_to_consume = asyncio.Condition(self._lock)
48+
self._consumers: Dict[str, _ConsumerState] = {}
49+
50+
self._max_batch_size = max_batch_size
51+
52+
self._common_metrics_tags = common_metric_tags or {}
53+
self._metric_recorder = OpenTelemetryMetricRecorder()
54+
self.evicted_events_metric_name = (
55+
f"{AGGREGATOR_AGENT_METRIC_PREFIX}_queue_dropped_events"
56+
)
57+
self._metric_recorder.register_counter_metric(
58+
self.evicted_events_metric_name,
59+
"Total number of events dropped because the publish/buffer queue was full.",
60+
)
61+
62+
async def add_event(self, event: events_base_event_pb2.RayEvent) -> None:
63+
"""Add an event to the buffer.
64+
65+
If the buffer is full, the oldest event is dropped.
66+
"""
67+
async with self._lock:
68+
dropped_event = None
69+
if len(self._buffer) >= self._max_size:
70+
dropped_event = self._buffer.popleft()
71+
self._buffer.append(event)
72+
73+
if dropped_event is not None:
74+
for consumer_name, consumer_state in self._consumers.items():
75+
# Update consumer cursor index and evicted events metric if an event was dropped
76+
if consumer_state.cursor_index == 0:
77+
# The dropped event was the next event this consumer would have consumed, publish eviction metric
78+
self._metric_recorder.set_metric_value(
79+
self.evicted_events_metric_name,
80+
{
81+
**self._common_metrics_tags,
82+
CONSUMER_TAG_KEY: consumer_name,
83+
"event_type": RayEvent.EventType.Name(
84+
dropped_event.event_type
85+
),
86+
},
87+
1,
88+
)
89+
else:
90+
# The dropped event was already consumed by the consumer, so we need to adjust the cursor
91+
consumer_state.cursor_index -= 1
92+
93+
# Signal the consumers that there are new events to consume
94+
self._has_new_events_to_consume.notify_all()
95+
96+
def _evict_old_events(self) -> None:
97+
"""Clean the buffer by removing events from the buffer who have index lower than
98+
all the cursor indexes of all consumers and updating the cursor index of all
99+
consumers.
100+
"""
101+
if not self._consumers:
102+
return
103+
104+
min_cursor_index = min(
105+
consumer_state.cursor_index for consumer_state in self._consumers.values()
106+
)
107+
for _ in range(min_cursor_index):
108+
self._buffer.popleft()
109+
110+
# update the cursor index of all consumers
111+
for consumer_state in self._consumers.values():
112+
consumer_state.cursor_index -= min_cursor_index
113+
114+
async def wait_for_batch(
115+
self, consumer_name: str, timeout_seconds: float = 1.0
116+
) -> List[events_base_event_pb2.RayEvent]:
117+
"""Wait for batch respecting self.max_batch_size and timeout_seconds.
118+
119+
Returns a batch of up to self.max_batch_size items. Waits for up to
120+
timeout_seconds after receiving the first event that will be in
121+
the next batch. After the timeout, returns as many items as are ready.
122+
123+
Always returns a batch with at least one item - will block
124+
indefinitely until an item comes in.
125+
126+
Arguments:
127+
consumer_name: name of the consumer consuming the batch
128+
timeout_seconds: maximum time to wait for a batch
129+
130+
Returns:
131+
A list of up to max_batch_size events ready for consumption.
132+
The list always contains at least one event.
133+
"""
134+
max_batch = self._max_batch_size
135+
batch = []
136+
async with self._has_new_events_to_consume:
137+
consumer_state = self._consumers.get(consumer_name)
138+
if consumer_state is None:
139+
raise KeyError(f"unknown consumer '{consumer_name}'")
140+
141+
# Phase 1: read the first event, wait indefinitely until there is at least one event to consume
142+
while consumer_state.cursor_index >= len(self._buffer):
143+
await self._has_new_events_to_consume.wait()
144+
145+
# Add the first event to the batch
146+
event = self._buffer[consumer_state.cursor_index]
147+
consumer_state.cursor_index += 1
148+
batch.append(event)
149+
150+
# Phase 2: add items to the batch up to timeout or until full
151+
deadline = time.monotonic() + max(0.0, float(timeout_seconds))
152+
while len(batch) < max_batch:
153+
remaining = deadline - time.monotonic()
154+
if remaining <= 0:
155+
break
156+
157+
# Drain whatever is available
158+
while len(batch) < max_batch and consumer_state.cursor_index < len(
159+
self._buffer
160+
):
161+
batch.append(self._buffer[consumer_state.cursor_index])
162+
consumer_state.cursor_index += 1
163+
164+
if len(batch) >= max_batch:
165+
break
166+
167+
# There is still room in the batch, but no new events to consume; wait until notified or timeout
168+
try:
169+
await asyncio.wait_for(
170+
self._has_new_events_to_consume.wait(), remaining
171+
)
172+
except asyncio.TimeoutError:
173+
# Timeout, return the current batch
174+
break
175+
176+
self._evict_old_events()
177+
return batch
178+
179+
async def register_consumer(self, consumer_name: str) -> None:
180+
"""Register a new consumer with a name.
181+
182+
Arguments:
183+
consumer_name: A unique name for the consumer.
184+
185+
"""
186+
async with self._lock:
187+
if self._consumers.get(consumer_name) is not None:
188+
raise ValueError(f"consumer '{consumer_name}' already registered")
189+
190+
self._consumers[consumer_name] = _ConsumerState(cursor_index=0)
191+
192+
async def size(self) -> int:
193+
"""Get total number of events in the buffer. Does not take consumer cursors into account."""
194+
return len(self._buffer)

python/ray/dashboard/modules/aggregator/publisher/__init__.py

Whitespace-only changes.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import json
2+
import logging
3+
from abc import ABC, abstractmethod
4+
from concurrent.futures import ThreadPoolExecutor
5+
from dataclasses import dataclass
6+
from typing import Callable
7+
8+
import aiohttp
9+
10+
from ray._common.utils import get_or_create_event_loop
11+
from ray._private.protobuf_compat import message_to_json
12+
from ray.core.generated import events_base_event_pb2
13+
from ray.dashboard.modules.aggregator.publisher.configs import PUBLISHER_TIMEOUT_SECONDS
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
@dataclass
19+
class PublishStats:
20+
"""Data class that represents stats of publishing a batch of events."""
21+
22+
# Whether the publish was successful
23+
is_publish_successful: bool
24+
# Number of events published
25+
num_events_published: int
26+
# Number of events filtered out
27+
num_events_filtered_out: int
28+
29+
30+
@dataclass
31+
class PublishBatch:
32+
"""Data class that represents a batch of events to publish."""
33+
34+
# The list of events to publish
35+
events: list[events_base_event_pb2.RayEvent]
36+
37+
38+
class PublisherClientInterface(ABC):
39+
"""Abstract interface for publishing Ray event batches to external destinations.
40+
41+
Implementations should handle the actual publishing logic, filtering,
42+
and format conversion appropriate for their specific destination type.
43+
"""
44+
45+
def count_num_events_in_batch(self, batch: PublishBatch) -> int:
46+
"""Count the number of events in a given batch."""
47+
return len(batch.events)
48+
49+
@abstractmethod
50+
async def publish(self, batch: PublishBatch) -> PublishStats:
51+
"""Publish a batch of events to the destination."""
52+
pass
53+
54+
@abstractmethod
55+
async def close(self) -> None:
56+
"""Clean up any resources used by this client. Should be called when the publisherClient is no longer required"""
57+
pass
58+
59+
60+
class AsyncHttpPublisherClient(PublisherClientInterface):
61+
"""Client for publishing ray event batches to an external HTTP service."""
62+
63+
def __init__(
64+
self,
65+
endpoint: str,
66+
executor: ThreadPoolExecutor,
67+
events_filter_fn: Callable[[object], bool],
68+
timeout: float = PUBLISHER_TIMEOUT_SECONDS,
69+
) -> None:
70+
self._endpoint = endpoint
71+
self._executor = executor
72+
self._events_filter_fn = events_filter_fn
73+
self._timeout = aiohttp.ClientTimeout(total=timeout)
74+
self._session = None
75+
76+
async def publish(self, batch: PublishBatch) -> PublishStats:
77+
events_batch: list[events_base_event_pb2.RayEvent] = batch.events
78+
if not events_batch:
79+
# Nothing to publish -> success but nothing published
80+
return PublishStats(True, 0, 0)
81+
filtered = [e for e in events_batch if self._events_filter_fn(e)]
82+
num_filtered_out = len(events_batch) - len(filtered)
83+
if not filtered:
84+
# All filtered out -> success but nothing published
85+
return PublishStats(True, 0, num_filtered_out)
86+
87+
# Convert protobuf objects to python dictionaries for HTTP POST. Run in executor to avoid blocking the event loop.
88+
filtered_json = await get_or_create_event_loop().run_in_executor(
89+
self._executor,
90+
lambda: [
91+
json.loads(
92+
message_to_json(e, always_print_fields_with_no_presence=True)
93+
)
94+
for e in filtered
95+
],
96+
)
97+
98+
try:
99+
# Create session on first use (lazy initialization)
100+
if not self._session:
101+
self._session = aiohttp.ClientSession(timeout=self._timeout)
102+
103+
return await self._send_http_request(filtered_json, num_filtered_out)
104+
except Exception as e:
105+
logger.error("Failed to send events to external service. Error: %s", e)
106+
return PublishStats(False, 0, 0)
107+
108+
async def _send_http_request(self, json_data, num_filtered_out) -> PublishStats:
109+
async with self._session.post(
110+
self._endpoint,
111+
json=json_data,
112+
) as resp:
113+
resp.raise_for_status()
114+
return PublishStats(True, len(json_data), num_filtered_out)
115+
116+
async def close(self) -> None:
117+
"""Closes the http session if one was created. Should be called when the publisherClient is no longer required"""
118+
if self._session:
119+
await self._session.close()
120+
self._session = None
121+
122+
def set_session(self, session) -> None:
123+
"""Inject an HTTP client session.
124+
125+
If a session is set explicitly, it will be used and managed by close().
126+
"""
127+
self._session = session
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Environment variables for the aggregator agent publisher component.
2+
from ray._private import ray_constants
3+
4+
env_var_prefix = "RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER"
5+
# Timeout for the publisher to publish events to the destination
6+
PUBLISHER_TIMEOUT_SECONDS = ray_constants.env_integer(
7+
f"{env_var_prefix}_TIMEOUT_SECONDS", 3
8+
)
9+
# Maximum number of retries for publishing events to the destination, if less than 0, will retry indefinitely
10+
PUBLISHER_MAX_RETRIES = ray_constants.env_integer(f"{env_var_prefix}_MAX_RETRIES", -1)
11+
# Initial backoff time for publishing events to the destination
12+
PUBLISHER_INITIAL_BACKOFF_SECONDS = ray_constants.env_float(
13+
f"{env_var_prefix}_INITIAL_BACKOFF_SECONDS", 0.01
14+
)
15+
# Maximum backoff time for publishing events to the destination
16+
PUBLISHER_MAX_BACKOFF_SECONDS = ray_constants.env_float(
17+
f"{env_var_prefix}_MAX_BACKOFF_SECONDS", 5.0
18+
)
19+
# Jitter ratio for publishing events to the destination
20+
PUBLISHER_JITTER_RATIO = ray_constants.env_float(f"{env_var_prefix}_JITTER_RATIO", 0.1)
21+
# Maximum sleep time between sending batches of events to the destination, should be greater than 0.0 to avoid busy looping
22+
PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS = ray_constants.env_float(
23+
f"{env_var_prefix}_MAX_BUFFER_SEND_INTERVAL_SECONDS", 0.1
24+
)

0 commit comments

Comments
 (0)