From 243857571b8dec193b17c3c6571ea43c16fddbe4 Mon Sep 17 00:00:00 2001 From: LarryLian <554538252@qq.com> Date: Mon, 22 May 2023 20:13:57 +0800 Subject: [PATCH] [Core]Add batch remote api for batch submit actor task Signed-off-by: LarryLian <554538252@qq.com> --- python/ray/_raylet.pyx | 54 +++++++++ python/ray/experimental/batch_remote.py | 141 ++++++++++++++++++++++++ python/ray/includes/libcoreworker.pxd | 5 + python/ray/tests/test_batch_remote.py | 39 +++++++ src/ray/core_worker/core_worker.cc | 14 +++ src/ray/core_worker/core_worker.h | 15 +++ 6 files changed, 268 insertions(+) create mode 100644 python/ray/experimental/batch_remote.py create mode 100644 python/ray/tests/test_batch_remote.py diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 071792162439..491efaa86947 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3001,6 +3001,60 @@ cdef class CoreWorker: raise Exception(f"Failed to submit task to actor {actor_id} " f"due to {status.message()}") + def batch_submit_actor_task(self, + Language language, + actor_ids, + FunctionDescriptor function_descriptor, + args, + c_string name, + int num_returns, + double num_method_cpus, + c_string concurrency_group_name): + cdef: + c_vector[CActorID] c_actor_ids + unordered_map[c_string, double] c_resources + CRayFunction ray_function + c_vector[unique_ptr[CTaskArg]] args_vector + c_vector[CObjectReference] return_refs + c_vector[CObjectID] incremented_put_arg_ids + + with self.profile_event(b"submit_task"): + if num_method_cpus > 0: + c_resources[b"CPU"] = num_method_cpus + ray_function = CRayFunction( + language.lang, function_descriptor.descriptor) + prepare_args_and_increment_put_refs( + self, language, args, &args_vector, function_descriptor, + &incremented_put_arg_ids) + + for actor_id in actor_ids: + c_actor_ids.push_back((actor_id).native()) + with nogil: + status = CCoreWorkerProcess.GetCoreWorker().BatchSubmitActorTask( + c_actor_ids, + ray_function, + args_vector, + CTaskOptions( + name, num_returns, c_resources, concurrency_group_name), + return_refs) + # These arguments were serialized and put into the local object + # store during task submission. The backend increments their local + # ref count initially to ensure that they remain in scope until we + # add to their submitted task ref count. Now that the task has + # been submitted, it's safe to remove the initial local ref. + for put_arg_id in incremented_put_arg_ids: + CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( + put_arg_id) + + if status.ok(): + # The initial local reference is already acquired internally + # when adding the pending task. + return VectorToObjectRefs(return_refs, + skip_adding_local_ref=True) + else: + raise Exception(f"Failed to submit task to actors {actor_ids} " + f"due to {status.message()}") + def kill_actor(self, ActorID actor_id, c_bool no_restart): cdef: CActorID c_actor_id = actor_id.native() diff --git a/python/ray/experimental/batch_remote.py b/python/ray/experimental/batch_remote.py new file mode 100644 index 000000000000..8d0c107d6e09 --- /dev/null +++ b/python/ray/experimental/batch_remote.py @@ -0,0 +1,141 @@ +import logging +from typing import Any, Dict, List, Optional + +import ray +import ray._private.ray_constants as ray_constants +import ray._private.signature as signature +import ray._raylet +from ray._raylet import PythonFunctionDescriptor + +logger = logging.getLogger(__name__) + + +class BatchActorsHandle: + def __init__(self, actors: List[ray.actor.ActorHandle]) -> None: + self.actors = actors + actor_template = actors[0] + self._ray_is_cross_language = actor_template._ray_is_cross_language + self._ray_actor_language = actor_template._ray_actor_language + self._ray_original_handle = actor_template._ray_original_handle + self._ray_method_decorators = actor_template._ray_method_decorators + self._ray_method_signatures = actor_template._ray_method_signatures + self._ray_method_num_returns = actor_template._ray_method_num_returns + self._ray_actor_method_cpus = actor_template._ray_actor_method_cpus + self._ray_session_and_job = actor_template._ray_session_and_job + self._ray_actor_creation_function_descriptor = ( + actor_template._ray_actor_creation_function_descriptor + ) + self._ray_function_descriptor = actor_template._ray_function_descriptor + + if not self._ray_is_cross_language: + assert isinstance( + self._ray_actor_creation_function_descriptor, PythonFunctionDescriptor + ) + module_name = self._ray_actor_creation_function_descriptor.module_name + class_name = self._ray_actor_creation_function_descriptor.class_name + for method_name in self._ray_method_signatures.keys(): + function_descriptor = PythonFunctionDescriptor( + module_name, method_name, class_name + ) + self._ray_function_descriptor[method_name] = function_descriptor + method = ray.actor.ActorMethod( + self, + method_name, + self._ray_method_num_returns[method_name], + decorator=self._ray_method_decorators.get(method_name), + hardref=self, + ) + setattr(self, method_name, method) + + def _actor_method_call( + self, + method_name: str, + args: List[Any] = None, + kwargs: Dict[str, Any] = None, + name: str = "", + num_returns: Optional[int] = None, + concurrency_group_name: Optional[str] = None, + ): + worker = ray._private.worker.global_worker + + args = args or [] + kwargs = kwargs or {} + if self._ray_is_cross_language: + list_args = ray.cross_language._format_args(worker, args, kwargs) + function_descriptor = ray.cross_language._get_function_descriptor_for_actor_method( # noqa: E501 + self._ray_actor_language, + self._ray_actor_creation_function_descriptor, + method_name, + # The signature for xlang should be "{length_of_arguments}" to handle + # overloaded methods. + signature=str(len(args) + len(kwargs)), + ) + else: + function_signature = self._ray_method_signatures[method_name] + + if not args and not kwargs and not function_signature: + list_args = [] + else: + list_args = signature.flatten_args(function_signature, args, kwargs) + function_descriptor = self._ray_function_descriptor[method_name] + + if worker.mode == ray.LOCAL_MODE: + assert ( + not self._ray_is_cross_language + ), "Cross language remote actor method cannot be executed locally." + + if num_returns == "dynamic": + num_returns = -1 + + object_refs = worker.core_worker.batch_submit_actor_task( + self.actors[0]._ray_actor_language, + [actor._actor_id for actor in self.actors], + function_descriptor, + list_args, + name, + num_returns, + self._ray_actor_method_cpus, + concurrency_group_name if concurrency_group_name is not None else b"", + ) + + if len(object_refs) == 0: + object_refs = None + + return object_refs + + def __getattr__(self, item): + if not self._ray_is_cross_language: + raise AttributeError( + f"'{type(self).__name__}' object has " f"no attribute '{item}'" + ) + if item in ["__ray_terminate__"]: + + class FakeActorMethod(object): + def __call__(self, *args, **kwargs): + raise TypeError( + "Actor methods cannot be called directly. Instead " + "of running 'object.{}()', try 'object.{}.remote()'.".format( + item, item + ) + ) + + def remote(self, *args, **kwargs): + logger.warning( + f"Actor method {item} is not supported by cross language." + ) + + return FakeActorMethod() + + return ray.actor.ActorMethod( + self, + item, + ray_constants. + # Currently, we use default num returns + DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS, + # Currently, cross-lang actor method not support decorator + decorator=None, + ) + + +def batch_remote(actors: List[ray.actor.ActorHandle]) -> BatchActorsHandle: + return BatchActorsHandle(actors) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 3998de724433..444ef015f3a0 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -123,6 +123,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[unique_ptr[CTaskArg]] &args, const CTaskOptions &options, c_vector[CObjectReference]&) + CRayStatus BatchSubmitActorTask( + const c_vector[CActorID] &actor_ids, const CRayFunction &function, + const c_vector[unique_ptr[CTaskArg]] &args, + const CTaskOptions &options, + c_vector[CObjectReference]&) CRayStatus KillActor( const CActorID &actor_id, c_bool force_kill, c_bool no_restart) diff --git a/python/ray/tests/test_batch_remote.py b/python/ray/tests/test_batch_remote.py new file mode 100644 index 000000000000..c741c600711a --- /dev/null +++ b/python/ray/tests/test_batch_remote.py @@ -0,0 +1,39 @@ +import os +import sys + +import pytest + +import ray +from ray.experimental.batch_remote import batch_remote + + +@ray.remote +class Counter: + def __init__(self, value): + self.value = value + + def increase(self): + self.value += 1 + return self.value + + def get_value(self): + return self.value + + def reset(self): + self.value = 0 + + +def test_basic_batch_remote(ray_start_regular_shared): + num_actors = 10 + # Create multiple actors. + actors = [Counter.remote(i) for i in range(num_actors)] + # print(actors[0].get_value.remote()) + object_refs = batch_remote(actors).get_value.remote() + print(ray.get(object_refs)) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 093163105226..72d521c05cbf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2278,6 +2278,20 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, return Status::OK(); } +Status CoreWorker::BatchSubmitActorTask(const std::vector &actor_ids, + const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, + std::vector &task_returns) { + for (const auto actor_id : actor_ids) { + auto status = SubmitActorTask(actor_id, function, args, task_options, task_returns); + if (!status.ok()) { + return status; + } + } + return Status::OK(); +} + Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill, bool recursive) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e4793944d425..1be45882e4e0 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -897,6 +897,21 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const TaskOptions &task_options, std::vector &task_returns); + /// Batch submit actor tasks. + /// + /// \param[in] actor_ids Handle to the actors. + /// \param[in] function The remote function to execute. + /// \param[in] args Arguments of this task. + /// \param[in] task_options Options for this task. + /// \param[out] task_returns The object returned by this task + /// + /// \return Status of this submission + Status BatchSubmitActorTask(const std::vector &actor_ids, + const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, + std::vector &task_returns); + /// Tell an actor to exit immediately, without completing outstanding work. /// /// \param[in] actor_id ID of the actor to kill.