Skip to content

Commit a9bfa75

Browse files
dayshahccmao1130
authored andcommitted
[core] Inject reorder_wait_seconds for scheduling queue test (ray-project#54404)
This scheduling queue unit test would take >30 seconds before because the default value of reorder_wait_seconds is 30. Now just injecting it in one level higher so can inject it into the test as 1 second. Signed-off-by: dayshah <[email protected]> Signed-off-by: ChanChan Mao <[email protected]>
1 parent c5a7692 commit a9bfa75

File tree

4 files changed

+38
-35
lines changed

4 files changed

+38
-35
lines changed

src/ray/core_worker/test/scheduling_queue_test.cc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ TEST(SchedulingQueueTest, TestTaskEvents) {
8585
auto pool_manager =
8686
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
8787

88-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
88+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
8989
int n_ok = 0;
9090
int n_rej = 0;
9191
auto fn_ok = [&n_ok](const TaskSpecification &task_spec,
@@ -156,7 +156,7 @@ TEST(SchedulingQueueTest, TestInOrder) {
156156
auto pool_manager =
157157
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
158158

159-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
159+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
160160
int n_ok = 0;
161161
int n_rej = 0;
162162
auto fn_ok = [&n_ok](const TaskSpecification &task_spec,
@@ -192,7 +192,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) {
192192
auto pool_manager =
193193
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
194194

195-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
195+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
196196
std::atomic<int> n_ok(0);
197197
std::atomic<int> n_rej(0);
198198

@@ -243,7 +243,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) {
243243
auto pool_manager =
244244
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
245245

246-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
246+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
247247
std::atomic<int> n_ok(0);
248248
std::atomic<int> n_rej(0);
249249

@@ -286,7 +286,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) {
286286
auto pool_manager =
287287
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
288288

289-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
289+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
290290
int n_ok = 0;
291291
int n_rej = 0;
292292
auto fn_ok = [&n_ok](const TaskSpecification &task_spec,
@@ -321,7 +321,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) {
321321
auto pool_manager =
322322
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
323323

324-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
324+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
325325
std::atomic<int> n_ok(0);
326326
std::atomic<int> n_rej(0);
327327

@@ -337,7 +337,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) {
337337
queue.Add(3, -1, fn_ok, fn_rej, nullptr, task_spec);
338338
ASSERT_TRUE(WaitForCondition(CreateEqualsConditionChecker(&n_ok, 1), 1000));
339339
ASSERT_EQ(n_rej, 0);
340-
io_service.run(); // immediately triggers timeout
340+
io_service.run();
341341
ASSERT_TRUE(WaitForCondition(CreateEqualsConditionChecker(&n_ok, 1), 1000));
342342
ASSERT_TRUE(WaitForCondition(CreateEqualsConditionChecker(&n_rej, 2), 1000));
343343
queue.Add(4, -1, fn_ok, fn_rej, nullptr, task_spec);
@@ -362,7 +362,7 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
362362
auto pool_manager =
363363
std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>(concurrency_groups);
364364

365-
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager);
365+
ActorSchedulingQueue queue(io_service, waiter, task_event_buffer, pool_manager, 1);
366366
std::atomic<int> n_ok(0);
367367
std::atomic<int> n_rej(0);
368368
auto fn_ok = [&n_ok](const TaskSpecification &task_spec,

src/ray/core_worker/transport/actor_scheduling_queue.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ ActorSchedulingQueue::ActorSchedulingQueue(
2626
instrumented_io_context &task_execution_service,
2727
DependencyWaiter &waiter,
2828
worker::TaskEventBuffer &task_event_buffer,
29-
std::shared_ptr<ConcurrencyGroupManager<BoundedExecutor>> pool_manager)
30-
: wait_timer_(task_execution_service),
29+
std::shared_ptr<ConcurrencyGroupManager<BoundedExecutor>> pool_manager,
30+
int64_t reorder_wait_seconds)
31+
: reorder_wait_seconds_(reorder_wait_seconds),
32+
wait_timer_(task_execution_service),
3133
main_thread_id_(std::this_thread::get_id()),
3234
waiter_(waiter),
3335
task_event_buffer_(task_event_buffer),

src/ray/core_worker/transport/actor_scheduling_queue.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class ActorSchedulingQueue : public SchedulingQueue {
4545
instrumented_io_context &task_execution_service,
4646
DependencyWaiter &waiter,
4747
worker::TaskEventBuffer &task_event_buffer,
48-
std::shared_ptr<ConcurrencyGroupManager<BoundedExecutor>> pool_manager);
48+
std::shared_ptr<ConcurrencyGroupManager<BoundedExecutor>> pool_manager,
49+
int64_t reorder_wait_seconds);
4950

5051
void Stop() override;
5152

@@ -81,8 +82,7 @@ class ActorSchedulingQueue : public SchedulingQueue {
8182
/// Called when we time out waiting for an earlier task to show up.
8283
void OnSequencingWaitTimeout();
8384
/// Max time in seconds to wait for dependencies to show up.
84-
const int64_t reorder_wait_seconds_ =
85-
::RayConfig::instance().actor_scheduling_queue_max_reorder_wait_seconds();
85+
const int64_t reorder_wait_seconds_;
8686
/// Sorted map of (accept, rej) task callbacks keyed by their sequence number.
8787
std::map<int64_t, InboundRequest> pending_actor_tasks_;
8888
/// The next sequence number we are waiting for to arrive.

src/ray/core_worker/transport/task_receiver.cc

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -196,28 +196,29 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,
196196
if (task_spec.IsActorTask()) {
197197
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
198198
if (it == actor_scheduling_queues_.end()) {
199-
if (execute_out_of_order_) {
200-
it = actor_scheduling_queues_
201-
.emplace(task_spec.CallerWorkerId(),
202-
std::make_unique<OutOfOrderActorSchedulingQueue>(
203-
task_execution_service_,
204-
waiter_,
205-
task_event_buffer_,
206-
pool_manager_,
207-
fiber_state_manager_,
208-
is_asyncio_,
209-
fiber_max_concurrency_,
210-
concurrency_groups_))
211-
.first;
212-
} else {
213-
it = actor_scheduling_queues_
214-
.emplace(task_spec.CallerWorkerId(),
215-
std::make_unique<ActorSchedulingQueue>(task_execution_service_,
216-
waiter_,
217-
task_event_buffer_,
218-
pool_manager_))
219-
.first;
220-
}
199+
it = actor_scheduling_queues_
200+
.emplace(
201+
task_spec.CallerWorkerId(),
202+
execute_out_of_order_
203+
? std::unique_ptr<SchedulingQueue>(
204+
std::make_unique<OutOfOrderActorSchedulingQueue>(
205+
task_execution_service_,
206+
waiter_,
207+
task_event_buffer_,
208+
pool_manager_,
209+
fiber_state_manager_,
210+
is_asyncio_,
211+
fiber_max_concurrency_,
212+
concurrency_groups_))
213+
: std::unique_ptr<SchedulingQueue>(
214+
std::make_unique<ActorSchedulingQueue>(
215+
task_execution_service_,
216+
waiter_,
217+
task_event_buffer_,
218+
pool_manager_,
219+
RayConfig::instance()
220+
.actor_scheduling_queue_max_reorder_wait_seconds())))
221+
.first;
221222
}
222223

223224
it->second->Add(request.sequence_number(),

0 commit comments

Comments
 (0)