Skip to content

Commit 807e371

Browse files
authored
[core][task-manager/02] consolidate TaskManager interface (#54317)
This PR is smaller than it looks. The `TaskManager` class currently exposes two interfaces: `TaskFinisher` and `TaskResubmission`. While these interfaces are well-intentioned, they are only implemented by `TaskManager` itself, and the methods they define are not fully independent. As a result, it’s unlikely that these interfaces could be meaningfully separated or implemented in isolation. This change consolidates them into a single `TaskManager` interface, which can be reused where needed. The goal is to reduce the number of concepts and components required to reason about the Ray core, and to simplify the overall design. Test: - CI Signed-off-by: Cuong Nguyen <[email protected]>
1 parent edb52ce commit 807e371

18 files changed

+504
-489
lines changed

src/mock/ray/core_worker/task_manager.h renamed to src/mock/ray/core_worker/task_manager_interface.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,20 @@
1414

1515
#pragma once
1616
#include "gmock/gmock.h"
17+
#include "ray/core_worker/task_manager_interface.h"
18+
1719
namespace ray {
1820
namespace core {
1921

20-
class MockTaskFinisherInterface : public TaskFinisherInterface {
22+
class MockTaskManagerInterface : public TaskManagerInterface {
2123
public:
24+
MOCK_METHOD(std::vector<rpc::ObjectReference>,
25+
AddPendingTask,
26+
(const rpc::Address &caller_address,
27+
const TaskSpecification &spec,
28+
const std::string &call_site,
29+
int max_retries),
30+
(override));
2231
MOCK_METHOD(void,
2332
CompletePendingTask,
2433
(const TaskID &task_id,
@@ -42,6 +51,10 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
4251
bool mark_task_object_failed,
4352
bool fail_immediately),
4453
(override));
54+
MOCK_METHOD(std::optional<rpc::ErrorType>,
55+
ResubmitTask,
56+
(const TaskID &task_id, std::vector<ObjectID> *task_deps),
57+
(override));
4558
MOCK_METHOD(void,
4659
OnTaskDependenciesInlined,
4760
(const std::vector<ObjectID> &inlined_dependency_ids,

src/ray/core_worker/BUILD.bazel

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,8 @@ ray_cc_library(
262262
)
263263

264264
ray_cc_library(
265-
name = "task_finisher",
266-
hdrs = ["task_finisher.h"],
265+
name = "task_manager_interface",
266+
hdrs = ["task_manager_interface.h"],
267267
deps = [
268268
"//src/ray/common:id",
269269
"//src/ray/common:status",
@@ -282,7 +282,7 @@ ray_cc_library(
282282
":actor_manager",
283283
":memory_store",
284284
":task_event_buffer",
285-
":task_finisher",
285+
":task_manager_interface",
286286
"//src/ray/gcs:gcs_pb_util",
287287
"//src/ray/common:id",
288288
"//src/ray/common:ray_object",
@@ -306,7 +306,7 @@ ray_cc_library(
306306
deps = [
307307
":actor_creator",
308308
":memory_store",
309-
":task_finisher",
309+
":task_manager_interface",
310310
"//src/ray/common:id",
311311
"//src/ray/common:task_common",
312312
"@com_google_absl//absl/container:flat_hash_map",

src/ray/core_worker/object_recovery_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
183183
// after ResubmitTask, then it will remain true forever.
184184
// see https://github.com/ray-project/ray/issues/47606 for more details.
185185
reference_counter_.UpdateObjectPendingCreation(object_id, true);
186-
auto error_type_optional = task_resubmitter_.ResubmitTask(task_id, &task_deps);
186+
auto error_type_optional = task_manager_.ResubmitTask(task_id, &task_deps);
187187

188188
if (!error_type_optional.has_value()) {
189189
// Try to recover the task's dependencies.

src/ray/core_worker/object_recovery_manager.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ class ObjectRecoveryManager {
4848
std::shared_ptr<PinObjectsInterface> local_object_pinning_client,
4949
std::function<Status(const ObjectID &object_id,
5050
const ObjectLookupCallback &callback)> object_lookup,
51-
TaskResubmissionInterface &task_resubmitter,
51+
TaskManagerInterface &task_manager,
5252
ReferenceCounter &reference_counter,
5353
CoreWorkerMemoryStore &in_memory_store,
5454
ObjectRecoveryFailureCallback recovery_failure_callback)
55-
: task_resubmitter_(task_resubmitter),
55+
: task_manager_(task_manager),
5656
reference_counter_(reference_counter),
5757
rpc_address_(std::move(rpc_address)),
5858
client_factory_(std::move(client_factory)),
@@ -112,7 +112,7 @@ class ObjectRecoveryManager {
112112
void ReconstructObject(const ObjectID &object_id);
113113

114114
/// Used to resubmit tasks.
115-
TaskResubmissionInterface &task_resubmitter_;
115+
TaskManagerInterface &task_manager_;
116116

117117
/// Used to check whether we own an object.
118118
ReferenceCounter &reference_counter_;

src/ray/core_worker/task_finisher.h

Lines changed: 0 additions & 75 deletions
This file was deleted.

src/ray/core_worker/task_manager.h

Lines changed: 4 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
#include "ray/common/id.h"
2727
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
2828
#include "ray/core_worker/task_event_buffer.h"
29-
#include "ray/core_worker/task_finisher.h"
29+
#include "ray/core_worker/task_manager_interface.h"
3030
#include "ray/stats/metric_defs.h"
3131
#include "ray/util/counter_map.h"
3232
#include "src/ray/protobuf/common.pb.h"
@@ -38,14 +38,6 @@ namespace core {
3838

3939
class ActorManager;
4040

41-
class TaskResubmissionInterface {
42-
public:
43-
virtual std::optional<rpc::ErrorType> ResubmitTask(
44-
const TaskID &task_id, std::vector<ObjectID> *task_deps) = 0;
45-
46-
virtual ~TaskResubmissionInterface() = default;
47-
};
48-
4941
using TaskStatusCounter = CounterMap<std::tuple<std::string, rpc::TaskStatus, bool>>;
5042
using PutInLocalPlasmaCallback =
5143
std::function<void(const RayObject &object, const ObjectID &object_id)>;
@@ -173,7 +165,7 @@ class ObjectRefStream {
173165
int64_t total_num_object_consumed_{};
174166
};
175167

176-
class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface {
168+
class TaskManager : public TaskManagerInterface {
177169
public:
178170
TaskManager(CoreWorkerMemoryStore &in_memory_store,
179171
ReferenceCounter &reference_counter,
@@ -208,36 +200,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
208200
});
209201
}
210202

211-
/// Add a task that is pending execution.
212-
///
213-
/// The local ref count for all return refs (excluding actor creation tasks)
214-
/// will be initialized to 1 so that the ref is considered in scope before
215-
/// returning to the language frontend. The caller is responsible for
216-
/// decrementing the ref count once the frontend ref has gone out of scope.
217-
///
218-
/// \param[in] caller_address The rpc address of the calling task.
219-
/// \param[in] spec The spec of the pending task.
220-
/// \param[in] max_retries Number of times this task may be retried
221-
/// on failure.
222-
/// \return ObjectRefs returned by this task.
223203
std::vector<rpc::ObjectReference> AddPendingTask(const rpc::Address &caller_address,
224204
const TaskSpecification &spec,
225205
const std::string &call_site,
226-
int max_retries = 0);
227-
228-
/// Resubmit a task that has completed execution before. This is used to
229-
/// reconstruct objects stored in Plasma that were lost.
230-
///
231-
/// \param[in] task_id The ID of the task to resubmit.
232-
/// \param[out] task_deps The object dependencies of the resubmitted task,
233-
/// i.e. all arguments that were not inlined in the task spec. The caller is
234-
/// responsible for making sure that these dependencies become available, so
235-
/// that the resubmitted task can run. This is only populated if the task was
236-
/// not already pending and was successfully resubmitted.
237-
/// \return nullopt if the task was successfully resubmitted (task or actor being
238-
/// scheduled, but no guarantee on completion), or was already pending. Return the
239-
/// appopriate error type to propagate for the object if the task was not successfully
240-
/// resubmitted.
206+
int max_retries = 0) override;
207+
241208
std::optional<rpc::ErrorType> ResubmitTask(const TaskID &task_id,
242209
std::vector<ObjectID> *task_deps) override;
243210

@@ -246,13 +213,6 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
246213
/// \param shutdown The shutdown callback to call.
247214
void DrainAndShutdown(std::function<void()> shutdown);
248215

249-
/// Write return objects for a pending task to the memory store.
250-
///
251-
/// \param[in] task_id ID of the pending task.
252-
/// \param[in] reply Proto response to a direct actor or task call.
253-
/// \param[in] worker_addr Address of the worker that executed the task.
254-
/// \param[in] is_application_error Whether this is an Exception return.
255-
/// \return Void.
256216
void CompletePendingTask(const TaskID &task_id,
257217
const rpc::PushTaskReply &reply,
258218
const rpc::Address &worker_addr,
@@ -434,50 +394,18 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
434394
std::pair<ObjectID, bool> PeekObjectRefStream(const ObjectID &generator_id)
435395
ABSL_LOCKS_EXCLUDED(mu_);
436396

437-
/// Called by submitter when a generator task marked for resubmission for intermediate
438-
/// object recovery comes back from the executing worker. We mark the attempt as failed
439-
/// and resubmit it, so we can recover the intermediate return.
440397
void MarkGeneratorFailedAndResubmit(const TaskID &task_id) override;
441398

442-
/// Returns true if task can be retried.
443-
///
444-
/// \param[in] task_id ID of the task to be retried.
445-
/// \return true if task is scheduled to be retried.
446399
bool RetryTaskIfPossible(const TaskID &task_id,
447400
const rpc::RayErrorInfo &error_info) override;
448401

449-
/// A pending task failed. This will either retry the task or mark the task
450-
/// as failed if there are no retries left.
451-
///
452-
/// \param[in] task_id ID of the pending task.
453-
/// \param[in] error_type The type of the specific error.
454-
/// \param[in] status Optional status message.
455-
/// \param[in] ray_error_info The error information of a given error type.
456-
/// Nullptr means that there's no error information.
457-
/// TODO(sang): Remove nullptr case. Every error message should have metadata.
458-
/// \param[in] mark_task_object_failed whether or not it marks the task
459-
/// return object as failed. If this is set to false, then the caller is
460-
/// responsible for later failing or completing the task.
461-
/// \param[in] fail_immediately whether to fail the task and ignore
462-
/// the retries that are available.
463-
/// \return Whether the task will be retried or not.
464402
bool FailOrRetryPendingTask(const TaskID &task_id,
465403
rpc::ErrorType error_type,
466404
const Status *status = nullptr,
467405
const rpc::RayErrorInfo *ray_error_info = nullptr,
468406
bool mark_task_object_failed = true,
469407
bool fail_immediately = false) override;
470408

471-
/// A pending task failed. This will mark the task as failed.
472-
/// This doesn't always mark the return object as failed
473-
/// depending on mark_task_object_failed.
474-
///
475-
/// \param[in] task_id ID of the pending task.
476-
/// \param[in] error_type The type of the specific error.
477-
/// \param[in] status Optional status message.
478-
/// \param[in] ray_error_info The error information of a given error type.
479-
/// \param[in] mark_task_object_failed whether or not it marks the task
480-
/// return object as failed.
481409
void FailPendingTask(const TaskID &task_id,
482410
rpc::ErrorType error_type,
483411
const Status *status = nullptr,
@@ -495,24 +423,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
495423
const rpc::RayErrorInfo *ray_error_info,
496424
const absl::flat_hash_set<ObjectID> &store_in_plasma_ids) ABSL_LOCKS_EXCLUDED(mu_);
497425

498-
/// A task's dependencies were inlined in the task spec. This will decrement
499-
/// the ref count for the dependency IDs. If the dependencies contained other
500-
/// ObjectIDs, then the ref count for these object IDs will be incremented.
501-
///
502-
/// \param[in] inlined_dependency_ids The args that were originally passed by
503-
/// reference into the task, but have now been inlined.
504-
/// \param[in] contained_ids Any ObjectIDs that were newly inlined in the
505-
/// task spec, because a serialized copy of the ID was contained in one of
506-
/// the inlined dependencies.
507426
void OnTaskDependenciesInlined(const std::vector<ObjectID> &inlined_dependency_ids,
508427
const std::vector<ObjectID> &contained_ids) override;
509428

510-
/// Set the task state to be canceled. Set the number of retries to zero.
511-
///
512-
/// \param[in] task_id to cancel.
513429
void MarkTaskCanceled(const TaskID &task_id) override;
514430

515-
/// Return the spec for a pending task.
516431
std::optional<TaskSpecification> GetTaskSpec(const TaskID &task_id) const override;
517432

518433
/// Return specs for pending children tasks of the given parent task.
@@ -524,10 +439,6 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
524439
/// \return Whether the task can be submitted for execution.
525440
bool IsTaskSubmissible(const TaskID &task_id) const;
526441

527-
/// Return whether the task is pending.
528-
///
529-
/// \param[in] task_id ID of the task to query.
530-
/// \return Whether the task is pending.
531442
bool IsTaskPending(const TaskID &task_id) const override;
532443

533444
/// Return whether the task is scheduled adn waiting for execution.
@@ -549,17 +460,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
549460
return total_lineage_footprint_bytes_;
550461
}
551462

552-
/// Record that the given task's dependencies have been created and the task
553-
/// can now be scheduled for execution.
554-
///
555-
/// \param[in] task_id The task that is now scheduled.
556463
void MarkDependenciesResolved(const TaskID &task_id) override;
557464

558-
/// Record that the given task is scheduled and wait for execution.
559-
///
560-
/// \param[in] task_id The task that is will be running.
561-
/// \param[in] node_id The node id that this task wil be running.
562-
/// \param[in] worker_id The worker id that this task wil be running.
563465
void MarkTaskWaitingForExecution(const TaskID &task_id,
564466
const NodeID &node_id,
565467
const WorkerID &worker_id) override;

0 commit comments

Comments
 (0)