Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3001,6 +3001,60 @@ cdef class CoreWorker:
raise Exception(f"Failed to submit task to actor {actor_id} "
f"due to {status.message()}")

def batch_submit_actor_task(self,
Language language,
actor_ids,
FunctionDescriptor function_descriptor,
args,
c_string name,
int num_returns,
double num_method_cpus,
c_string concurrency_group_name):
cdef:
c_vector[CActorID] c_actor_ids
unordered_map[c_string, double] c_resources
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectReference] return_refs
c_vector[CObjectID] incremented_put_arg_ids

with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
c_resources[b"CPU"] = num_method_cpus
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args_and_increment_put_refs(
self, language, args, &args_vector, function_descriptor,
&incremented_put_arg_ids)

for actor_id in actor_ids:
c_actor_ids.push_back((<ActorID>actor_id).native())
with nogil:
status = CCoreWorkerProcess.GetCoreWorker().BatchSubmitActorTask(
c_actor_ids,
ray_function,
args_vector,
CTaskOptions(
name, num_returns, c_resources, concurrency_group_name),
return_refs)
# These arguments were serialized and put into the local object
# store during task submission. The backend increments their local
# ref count initially to ensure that they remain in scope until we
# add to their submitted task ref count. Now that the task has
# been submitted, it's safe to remove the initial local ref.
for put_arg_id in incremented_put_arg_ids:
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
put_arg_id)

if status.ok():
# The initial local reference is already acquired internally
# when adding the pending task.
return VectorToObjectRefs(return_refs,
skip_adding_local_ref=True)
else:
raise Exception(f"Failed to submit task to actors {actor_ids} "
f"due to {status.message()}")

def kill_actor(self, ActorID actor_id, c_bool no_restart):
cdef:
CActorID c_actor_id = actor_id.native()
Expand Down
141 changes: 141 additions & 0 deletions python/ray/experimental/batch_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import logging
from typing import Any, Dict, List, Optional

import ray
import ray._private.ray_constants as ray_constants
import ray._private.signature as signature
import ray._raylet
from ray._raylet import PythonFunctionDescriptor

logger = logging.getLogger(__name__)


class BatchActorsHandle:
def __init__(self, actors: List[ray.actor.ActorHandle]) -> None:
self.actors = actors
actor_template = actors[0]
self._ray_is_cross_language = actor_template._ray_is_cross_language
self._ray_actor_language = actor_template._ray_actor_language
self._ray_original_handle = actor_template._ray_original_handle
self._ray_method_decorators = actor_template._ray_method_decorators
self._ray_method_signatures = actor_template._ray_method_signatures
self._ray_method_num_returns = actor_template._ray_method_num_returns
self._ray_actor_method_cpus = actor_template._ray_actor_method_cpus
self._ray_session_and_job = actor_template._ray_session_and_job
self._ray_actor_creation_function_descriptor = (
actor_template._ray_actor_creation_function_descriptor
)
self._ray_function_descriptor = actor_template._ray_function_descriptor

if not self._ray_is_cross_language:
assert isinstance(
self._ray_actor_creation_function_descriptor, PythonFunctionDescriptor
)
module_name = self._ray_actor_creation_function_descriptor.module_name
class_name = self._ray_actor_creation_function_descriptor.class_name
for method_name in self._ray_method_signatures.keys():
function_descriptor = PythonFunctionDescriptor(
module_name, method_name, class_name
)
self._ray_function_descriptor[method_name] = function_descriptor
method = ray.actor.ActorMethod(
self,
method_name,
self._ray_method_num_returns[method_name],
decorator=self._ray_method_decorators.get(method_name),
hardref=self,
)
setattr(self, method_name, method)

def _actor_method_call(
self,
method_name: str,
args: List[Any] = None,
kwargs: Dict[str, Any] = None,
name: str = "",
num_returns: Optional[int] = None,
concurrency_group_name: Optional[str] = None,
):
worker = ray._private.worker.global_worker

args = args or []
kwargs = kwargs or {}
if self._ray_is_cross_language:
list_args = ray.cross_language._format_args(worker, args, kwargs)
function_descriptor = ray.cross_language._get_function_descriptor_for_actor_method( # noqa: E501
self._ray_actor_language,
self._ray_actor_creation_function_descriptor,
method_name,
# The signature for xlang should be "{length_of_arguments}" to handle
# overloaded methods.
signature=str(len(args) + len(kwargs)),
)
else:
function_signature = self._ray_method_signatures[method_name]

if not args and not kwargs and not function_signature:
list_args = []
else:
list_args = signature.flatten_args(function_signature, args, kwargs)
function_descriptor = self._ray_function_descriptor[method_name]

if worker.mode == ray.LOCAL_MODE:
assert (
not self._ray_is_cross_language
), "Cross language remote actor method cannot be executed locally."

if num_returns == "dynamic":
num_returns = -1

object_refs = worker.core_worker.batch_submit_actor_task(
self.actors[0]._ray_actor_language,
[actor._actor_id for actor in self.actors],
function_descriptor,
list_args,
name,
num_returns,
self._ray_actor_method_cpus,
concurrency_group_name if concurrency_group_name is not None else b"",
)

if len(object_refs) == 0:
object_refs = None

return object_refs

def __getattr__(self, item):
if not self._ray_is_cross_language:
raise AttributeError(
f"'{type(self).__name__}' object has " f"no attribute '{item}'"
)
if item in ["__ray_terminate__"]:

class FakeActorMethod(object):
def __call__(self, *args, **kwargs):
raise TypeError(
"Actor methods cannot be called directly. Instead "
"of running 'object.{}()', try 'object.{}.remote()'.".format(
item, item
)
)

def remote(self, *args, **kwargs):
logger.warning(
f"Actor method {item} is not supported by cross language."
)

return FakeActorMethod()

return ray.actor.ActorMethod(
self,
item,
ray_constants.
# Currently, we use default num returns
DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS,
# Currently, cross-lang actor method not support decorator
decorator=None,
)


def batch_remote(actors: List[ray.actor.ActorHandle]) -> BatchActorsHandle:
return BatchActorsHandle(actors)
5 changes: 5 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options,
c_vector[CObjectReference]&)
CRayStatus BatchSubmitActorTask(
const c_vector[CActorID] &actor_ids, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options,
c_vector[CObjectReference]&)
CRayStatus KillActor(
const CActorID &actor_id, c_bool force_kill,
c_bool no_restart)
Expand Down
39 changes: 39 additions & 0 deletions python/ray/tests/test_batch_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
import sys

import pytest

import ray
from ray.experimental.batch_remote import batch_remote


@ray.remote
class Counter:
def __init__(self, value):
self.value = value

def increase(self):
self.value += 1
return self.value

def get_value(self):
return self.value

def reset(self):
self.value = 0


def test_basic_batch_remote(ray_start_regular_shared):
num_actors = 10
# Create multiple actors.
actors = [Counter.remote(i) for i in range(num_actors)]
# print(actors[0].get_value.remote())
object_refs = batch_remote(actors).get_value.remote()
print(ray.get(object_refs))


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
14 changes: 14 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,20 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id,
return Status::OK();
}

Status CoreWorker::BatchSubmitActorTask(const std::vector<ActorID> &actor_ids,
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::vector<rpc::ObjectReference> &task_returns) {
for (const auto actor_id : actor_ids) {
auto status = SubmitActorTask(actor_id, function, args, task_options, task_returns);
if (!status.ok()) {
return status;
}
}
return Status::OK();
}

Status CoreWorker::CancelTask(const ObjectID &object_id,
bool force_kill,
bool recursive) {
Expand Down
15 changes: 15 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,21 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const TaskOptions &task_options,
std::vector<rpc::ObjectReference> &task_returns);

/// Batch submit actor tasks.
///
/// \param[in] actor_ids Handle to the actors.
/// \param[in] function The remote function to execute.
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] task_returns The object returned by this task
///
/// \return Status of this submission
Status BatchSubmitActorTask(const std::vector<ActorID> &actor_ids,
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::vector<rpc::ObjectReference> &task_returns);

/// Tell an actor to exit immediately, without completing outstanding work.
///
/// \param[in] actor_id ID of the actor to kill.
Expand Down