Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/ray/tests/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import ray
from ray.cluster_utils import Cluster
import time
import numpy as np

cluster = Cluster()

head = cluster.add_node(num_cpus=0)
ray.init(address=cluster.address)
worker = cluster.add_node(num_cpus=1)


@ray.remote
class A:
def __init__(self):
self.arr = np.zeros(4 * 1024 * 1024, dtype=np.uint8)

def generator(self):
for _ in range(100):
yield self.arr


a = A.remote()
ray.get(a.__ray_ready__.remote())

start_time = time.time()

streaming_ref = a.generator.remote()
for ref in streaming_ref:
ray.get(ref)


print("really done in ", time.time() - start_time)
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ RAY_CONFIG(bool, scheduler_report_pinned_bytes_only, true)

// The max allowed size in bytes of a return object from direct actor calls.
// Objects larger than this size will be spilled/promoted to plasma.
RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
RAY_CONFIG(int64_t, max_direct_call_object_size, 5 * 1024 * 1024)

// The max gRPC message size (the gRPC internal default is 4MB). We use a higher
// limit in Ray to avoid crashing with many small inlined task arguments.
Expand Down
34 changes: 30 additions & 4 deletions src/ray/common/ray_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,24 @@ class RayObject {
const std::shared_ptr<Buffer> &metadata,
const std::vector<rpc::ObjectReference> &nested_refs,
bool copy_data = false,
rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE) {
Init(data, metadata, nested_refs, copy_data, tensor_transport);
rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE,
std::optional<rpc::ReturnObject> return_object = std::nullopt) {
RAY_LOG(INFO) << "RayObject constructor";
Init(data,
metadata,
nested_refs,
copy_data,
tensor_transport,
std::move(return_object));
}

RayObject(const RayObject &other) {
RAY_CHECK(false) << "RayObject copy constructor is not allowed";
}

RayObject &operator=(const RayObject &other) {
RAY_CHECK(false) << "RayObject copy assignment is not allowed";
return *this;
}

/// This constructor creates a ray object instance whose data will be generated
Expand Down Expand Up @@ -134,18 +150,26 @@ class RayObject {
const std::shared_ptr<Buffer> &metadata,
const std::vector<rpc::ObjectReference> &nested_refs,
bool copy_data = false,
rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE) {
rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE,
std::optional<rpc::ReturnObject> return_object = std::nullopt) {
RAY_LOG(INFO) << "Init RayObject";
data_ = data;
metadata_ = metadata;
nested_refs_ = nested_refs;
has_data_copy_ = copy_data;
tensor_transport_ = tensor_transport;
creation_time_nanos_ = absl::GetCurrentTimeNanos();

if (return_object.has_value()) {
RAY_LOG(INFO) << "Return object has value";
return_object_ = std::move(return_object.value());
return;
}
if (has_data_copy_) {
RAY_LOG(INFO) << "Copying data";
// If this object is required to hold a copy of the data,
// make a copy if the passed in buffers don't already have a copy.
if (data_ && !data_->OwnsData()) {
RAY_LOG(INFO) << "Data does not own data";
data_ = std::make_shared<LocalMemoryBuffer>(data_->Data(),
data_->Size(),
/*copy_data=*/true);
Expand Down Expand Up @@ -174,6 +198,8 @@ class RayObject {
int64_t creation_time_nanos_;
/// The tensor transport to use for transferring this object.
rpc::TensorTransport tensor_transport_ = rpc::TensorTransport::OBJECT_STORE;
/// The return object to use for transferring this object.
std::optional<rpc::ReturnObject> return_object_ = std::nullopt;
};

} // namespace ray
36 changes: 35 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3130,16 +3130,20 @@ void CoreWorker::HandleReportGeneratorItemReturns(
rpc::ReportGeneratorItemReturnsRequest request,
rpc::ReportGeneratorItemReturnsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
absl::Time start_time = absl::Now();
static double total_time = 0;
auto generator_id = ObjectID::FromBinary(request.generator_id());
auto worker_id = WorkerID::FromBinary(request.worker_addr().worker_id());
task_manager_->HandleReportGeneratorItemReturns(
request,
std::move(request),
/*execution_signal_callback=*/
[reply,
worker_id = std::move(worker_id),
generator_id = std::move(generator_id),
send_reply_callback = std::move(send_reply_callback)](
const Status &status, int64_t total_num_object_consumed) {
absl::Time callback_start_time = absl::Now();
static double callback_total_time = 0;
RAY_LOG(DEBUG) << "Reply HandleReportGeneratorItemReturns to signal "
"executor to resume tasks. "
<< generator_id << ". Worker ID: " << worker_id
Expand All @@ -3150,7 +3154,23 @@ void CoreWorker::HandleReportGeneratorItemReturns(

reply->set_total_num_object_consumed(total_num_object_consumed);
send_reply_callback(status, nullptr, nullptr);
absl::Time callback_end_time = absl::Now();
RAY_LOG(INFO) << "CoreWorker Callback HandleReportGeneratorItemReturns time: "
<< absl::ToDoubleMilliseconds(callback_end_time -
callback_start_time)
<< "ms";
callback_total_time +=
absl::ToDoubleMilliseconds(callback_end_time - callback_start_time);
RAY_LOG(INFO)
<< "CoreWorker Callback Total HandleReportGeneratorItemReturns time: "
<< callback_total_time << "ms";
});
absl::Time end_time = absl::Now();
RAY_LOG(INFO) << "CoreWorker HandleReportGeneratorItemReturns time: "
<< absl::ToDoubleMilliseconds(end_time - start_time) << "ms";
total_time += absl::ToDoubleMilliseconds(end_time - start_time);
RAY_LOG(INFO) << "CoreWorker Total HandleReportGeneratorItemReturns time: "
<< total_time << "ms";
}

std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
Expand Down Expand Up @@ -3402,6 +3422,8 @@ void CoreWorker::HandleRayletNotifyGCSRestart(
void CoreWorker::HandleGetObjectStatus(rpc::GetObjectStatusRequest request,
rpc::GetObjectStatusReply *reply,
rpc::SendReplyCallback send_reply_callback) {
static double total_time = 0;
absl::Time start_time = absl::Now();
if (HandleWrongRecipient(WorkerID::FromBinary(request.owner_worker_id()),
send_reply_callback)) {
RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by a previous worker "
Expand Down Expand Up @@ -3435,6 +3457,11 @@ void CoreWorker::HandleGetObjectStatus(rpc::GetObjectStatusRequest request,
PopulateObjectStatus(object_id, obj, reply);
send_reply_callback(Status::OK(), nullptr, nullptr);
});
absl::Time end_time = absl::Now();
RAY_LOG(INFO) << "CoreWorker HandleGetObjectStatus time: "
<< absl::ToDoubleMilliseconds(end_time - start_time) << "ms";
total_time += absl::ToDoubleMilliseconds(end_time - start_time);
RAY_LOG(INFO) << "CoreWorker Total HandleGetObjectStatus time: " << total_time << "ms";
}

void CoreWorker::PopulateObjectStatus(const ObjectID &object_id,
Expand All @@ -3447,6 +3474,8 @@ void CoreWorker::PopulateObjectStatus(const ObjectID &object_id,
// in_plasma indicator on the message, and the caller will
// have to facilitate a Plasma object transfer to get the
// object value.
static double total_time = 0;
absl::Time start_time = absl::Now();
auto *object = reply->mutable_object();
if (obj->HasData()) {
const auto &data = obj->GetData();
Expand All @@ -3468,6 +3497,11 @@ void CoreWorker::PopulateObjectStatus(const ObjectID &object_id,
}
reply->set_object_size(locality_data.value().object_size);
}
absl::Time end_time = absl::Now();
RAY_LOG(INFO) << "CoreWorker PopulateObjectStatus time: "
<< absl::ToDoubleMilliseconds(end_time - start_time) << "ms";
total_time += absl::ToDoubleMilliseconds(end_time - start_time);
RAY_LOG(INFO) << "CoreWorker Total PopulateObjectStatus time: " << total_time << "ms";
}

void CoreWorker::HandleWaitForActorRefDeleted(
Expand Down
16 changes: 12 additions & 4 deletions src/ray/core_worker/store_provider/memory_store/memory_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,26 @@ std::shared_ptr<RayObject> CoreWorkerMemoryStore::GetIfExists(const ObjectID &ob
return ptr;
}

void CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) {
void CoreWorkerMemoryStore::Put(const RayObject &object,
const ObjectID &object_id,
std::optional<rpc::ReturnObject> return_object) {
std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks;
RAY_LOG(DEBUG).WithField(object_id) << "Putting object into memory store.";
RAY_LOG(INFO).WithField(object_id) << "Putting object into memory store.";
std::shared_ptr<RayObject> object_entry = nullptr;
if (return_object.has_value()) {
RAY_LOG(INFO) << "NOT COPYING DATA";
} else {
RAY_LOG(INFO) << "COPYING DATA";
}
if (object_allocator_ != nullptr) {
object_entry = object_allocator_(object, object_id);
} else {
object_entry = std::make_shared<RayObject>(object.GetData(),
object.GetMetadata(),
object.GetNestedRefs(),
true,
object.GetTensorTransport());
return_object.has_value() ? false : true,
object.GetTensorTransport(),
std::move(return_object));
}

// TODO(edoakes): we should instead return a flag to the caller to put the object in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class CoreWorkerMemoryStore {
///
/// \param[in] object The ray object.
/// \param[in] object_id Object ID specified by user.
void Put(const RayObject &object, const ObjectID &object_id);
void Put(const RayObject &object,
const ObjectID &object_id,
std::optional<rpc::ReturnObject> return_object = std::nullopt);

/// Get a list of objects from the object store.
///
Expand Down
32 changes: 24 additions & 8 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -534,24 +534,26 @@ size_t TaskManager::NumPendingTasks() const {
}

StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id,
const rpc::ReturnObject &return_object,
rpc::ReturnObject return_object,
const NodeID &worker_node_id,
bool store_in_plasma) {
bool direct_return = false;
reference_counter_.UpdateObjectSize(object_id, return_object.size());
RAY_LOG(DEBUG) << "Task return object " << object_id << " has size "
<< return_object.size();
RAY_LOG(INFO) << "Task return object " << object_id << " has size "
<< return_object.size();
const auto nested_refs =
VectorFromProtobuf<rpc::ObjectReference>(return_object.nested_inlined_refs());

if (return_object.in_plasma()) {
RAY_LOG(INFO) << "IN PLASMA";
// NOTE(swang): We need to add the location of the object before marking
// it as local in the in-memory store so that the data locality policy
// will choose the right raylet for any queued dependent tasks.
reference_counter_.UpdateObjectPinnedAtRaylet(object_id, worker_node_id);
// Mark it as in plasma with a dummy object.
in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
} else {
RAY_LOG(INFO) << "NOT IN PLASMA";
// NOTE(swang): If a direct object was promoted to plasma, then we do not
// record the node ID that it was pinned at, which means that we will not
// be able to reconstruct it if the plasma object copy is lost. However,
Expand Down Expand Up @@ -589,8 +591,16 @@ StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id,
return s;
}
} else {
in_memory_store_.Put(object, object_id);
static double total_time = 0;
RAY_LOG(INFO) << "Put object into memory store";
absl::Time start_time = absl::Now();
in_memory_store_.Put(object, object_id, std::move(return_object));
direct_return = true;
absl::Time end_time = absl::Now();
total_time += absl::ToDoubleMilliseconds(end_time - start_time);
RAY_LOG(INFO) << "Put object into memory store time: "
<< absl::ToDoubleMilliseconds(end_time - start_time) << "ms";
RAY_LOG(INFO) << "Total time: " << total_time << "ms";
}
}

Expand Down Expand Up @@ -763,8 +773,9 @@ void TaskManager::MarkEndOfStream(const ObjectID &generator_id,
}

bool TaskManager::HandleReportGeneratorItemReturns(
const rpc::ReportGeneratorItemReturnsRequest &request,
rpc::ReportGeneratorItemReturnsRequest request,
const ExecutionSignalCallback &execution_signal_callback) {
absl::Time start_time = absl::Now();
const auto &generator_id = ObjectID::FromBinary(request.generator_id());
const auto &task_id = generator_id.TaskId();
int64_t item_index = request.item_index();
Expand Down Expand Up @@ -804,8 +815,9 @@ bool TaskManager::HandleReportGeneratorItemReturns(
}

// TODO(sang): Support the regular return values as well.
RAY_LOG(INFO) << "HandleReportGeneratorItemReturns";
size_t num_objects_written = 0;
for (const auto &return_object : request.dynamic_return_objects()) {
for (auto &return_object : *request.mutable_dynamic_return_objects()) {
const auto object_id = ObjectID::FromBinary(return_object.object_id());

RAY_LOG(DEBUG) << "Write an object " << object_id
Expand All @@ -823,7 +835,7 @@ bool TaskManager::HandleReportGeneratorItemReturns(
reference_counter_.UpdateObjectPendingCreation(object_id, false);
StatusOr<bool> put_res =
HandleTaskReturn(object_id,
return_object,
std::move(return_object),
NodeID::FromBinary(request.worker_addr().node_id()),
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
if (!put_res.ok()) {
Expand Down Expand Up @@ -861,6 +873,10 @@ bool TaskManager::HandleReportGeneratorItemReturns(
// No need to backpressure.
execution_signal_callback(Status::OK(), total_consumed);
}
absl::Time end_time = absl::Now();
static double total_time = 0;
total_time += absl::ToDoubleMilliseconds(end_time - start_time);
RAY_LOG(INFO) << "HandleReportGeneratorItemReturns time: " << total_time << "ms";
return num_objects_written != 0;
}

Expand Down Expand Up @@ -895,7 +911,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply,
const rpc::Address &worker_addr,
bool is_application_error) {
RAY_LOG(DEBUG) << "Completing task " << task_id;
RAY_LOG(INFO) << "Completing task " << task_id;

bool first_execution = false;
const auto store_in_plasma_ids =
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class TaskManager : public TaskManagerInterface {
///
/// \return True if a task return is registered. False otherwise.
bool HandleReportGeneratorItemReturns(
const rpc::ReportGeneratorItemReturnsRequest &request,
rpc::ReportGeneratorItemReturnsRequest request,
const ExecutionSignalCallback &execution_signal_callback) ABSL_LOCKS_EXCLUDED(mu_);

/// Temporarily register a given generator return reference.
Expand Down Expand Up @@ -613,7 +613,7 @@ class TaskManager : public TaskManagerInterface {
/// return object. On success, sets direct_return_out to true if the object's value
/// was returned directly by value (not stored in plasma).
StatusOr<bool> HandleTaskReturn(const ObjectID &object_id,
const rpc::ReturnObject &return_object,
rpc::ReturnObject return_object,
const NodeID &worker_node_id,
bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_);

Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,9 +719,12 @@ void NodeManager::WarnResourceDeadlock() {

// Check if any progress is being made on this raylet.
if (worker_pool_.IsWorkerAvailableForScheduling()) {
RAY_LOG(INFO) << "Progress is being made in a task, don't warn.";
// Progress is being made in a task, don't warn.
resource_deadlock_warned_ = 0;
return;
} else {
RAY_LOG(INFO) << "No progress is being made in a task, warn.";
}

auto exemplar = cluster_task_manager_.AnyPendingTasksForResourceAcquisition(
Expand Down