diff --git a/python/ray/tests/test.py b/python/ray/tests/test.py new file mode 100644 index 000000000000..bda3c5df98dd --- /dev/null +++ b/python/ray/tests/test.py @@ -0,0 +1,33 @@ +import ray +from ray.cluster_utils import Cluster +import time +import numpy as np + +cluster = Cluster() + +head = cluster.add_node(num_cpus=0) +ray.init(address=cluster.address) +worker = cluster.add_node(num_cpus=1) + + +@ray.remote +class A: + def __init__(self): + self.arr = np.zeros(4 * 1024 * 1024, dtype=np.uint8) + + def generator(self): + for _ in range(100): + yield self.arr + + +a = A.remote() +ray.get(a.__ray_ready__.remote()) + +start_time = time.time() + +streaming_ref = a.generator.remote() +for ref in streaming_ref: + ray.get(ref) + + +print("really done in ", time.time() - start_time) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 74a932c136a5..0a52c514da93 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -192,7 +192,7 @@ RAY_CONFIG(bool, scheduler_report_pinned_bytes_only, true) // The max allowed size in bytes of a return object from direct actor calls. // Objects larger than this size will be spilled/promoted to plasma. -RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) +RAY_CONFIG(int64_t, max_direct_call_object_size, 5 * 1024 * 1024) // The max gRPC message size (the gRPC internal default is 4MB). We use a higher // limit in Ray to avoid crashing with many small inlined task arguments. diff --git a/src/ray/common/ray_object.h b/src/ray/common/ray_object.h index b34588978ca2..224ebc9a64ba 100644 --- a/src/ray/common/ray_object.h +++ b/src/ray/common/ray_object.h @@ -43,8 +43,24 @@ class RayObject { const std::shared_ptr &metadata, const std::vector &nested_refs, bool copy_data = false, - rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE) { - Init(data, metadata, nested_refs, copy_data, tensor_transport); + rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE, + std::optional return_object = std::nullopt) { + RAY_LOG(INFO) << "RayObject constructor"; + Init(data, + metadata, + nested_refs, + copy_data, + tensor_transport, + std::move(return_object)); + } + + RayObject(const RayObject &other) { + RAY_CHECK(false) << "RayObject copy constructor is not allowed"; + } + + RayObject &operator=(const RayObject &other) { + RAY_CHECK(false) << "RayObject copy assignment is not allowed"; + return *this; } /// This constructor creates a ray object instance whose data will be generated @@ -134,18 +150,26 @@ class RayObject { const std::shared_ptr &metadata, const std::vector &nested_refs, bool copy_data = false, - rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE) { + rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE, + std::optional return_object = std::nullopt) { + RAY_LOG(INFO) << "Init RayObject"; data_ = data; metadata_ = metadata; nested_refs_ = nested_refs; has_data_copy_ = copy_data; tensor_transport_ = tensor_transport; creation_time_nanos_ = absl::GetCurrentTimeNanos(); - + if (return_object.has_value()) { + RAY_LOG(INFO) << "Return object has value"; + return_object_ = std::move(return_object.value()); + return; + } if (has_data_copy_) { + RAY_LOG(INFO) << "Copying data"; // If this object is required to hold a copy of the data, // make a copy if the passed in buffers don't already have a copy. if (data_ && !data_->OwnsData()) { + RAY_LOG(INFO) << "Data does not own data"; data_ = std::make_shared(data_->Data(), data_->Size(), /*copy_data=*/true); @@ -174,6 +198,8 @@ class RayObject { int64_t creation_time_nanos_; /// The tensor transport to use for transferring this object. rpc::TensorTransport tensor_transport_ = rpc::TensorTransport::OBJECT_STORE; + /// The return object to use for transferring this object. + std::optional return_object_ = std::nullopt; }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8efe6a7878bf..f431934a8086 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3130,16 +3130,20 @@ void CoreWorker::HandleReportGeneratorItemReturns( rpc::ReportGeneratorItemReturnsRequest request, rpc::ReportGeneratorItemReturnsReply *reply, rpc::SendReplyCallback send_reply_callback) { + absl::Time start_time = absl::Now(); + static double total_time = 0; auto generator_id = ObjectID::FromBinary(request.generator_id()); auto worker_id = WorkerID::FromBinary(request.worker_addr().worker_id()); task_manager_->HandleReportGeneratorItemReturns( - request, + std::move(request), /*execution_signal_callback=*/ [reply, worker_id = std::move(worker_id), generator_id = std::move(generator_id), send_reply_callback = std::move(send_reply_callback)]( const Status &status, int64_t total_num_object_consumed) { + absl::Time callback_start_time = absl::Now(); + static double callback_total_time = 0; RAY_LOG(DEBUG) << "Reply HandleReportGeneratorItemReturns to signal " "executor to resume tasks. " << generator_id << ". Worker ID: " << worker_id @@ -3150,7 +3154,23 @@ void CoreWorker::HandleReportGeneratorItemReturns( reply->set_total_num_object_consumed(total_num_object_consumed); send_reply_callback(status, nullptr, nullptr); + absl::Time callback_end_time = absl::Now(); + RAY_LOG(INFO) << "CoreWorker Callback HandleReportGeneratorItemReturns time: " + << absl::ToDoubleMilliseconds(callback_end_time - + callback_start_time) + << "ms"; + callback_total_time += + absl::ToDoubleMilliseconds(callback_end_time - callback_start_time); + RAY_LOG(INFO) + << "CoreWorker Callback Total HandleReportGeneratorItemReturns time: " + << callback_total_time << "ms"; }); + absl::Time end_time = absl::Now(); + RAY_LOG(INFO) << "CoreWorker HandleReportGeneratorItemReturns time: " + << absl::ToDoubleMilliseconds(end_time - start_time) << "ms"; + total_time += absl::ToDoubleMilliseconds(end_time - start_time); + RAY_LOG(INFO) << "CoreWorker Total HandleReportGeneratorItemReturns time: " + << total_time << "ms"; } std::vector CoreWorker::ExecuteTaskLocalMode( @@ -3402,6 +3422,8 @@ void CoreWorker::HandleRayletNotifyGCSRestart( void CoreWorker::HandleGetObjectStatus(rpc::GetObjectStatusRequest request, rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback) { + static double total_time = 0; + absl::Time start_time = absl::Now(); if (HandleWrongRecipient(WorkerID::FromBinary(request.owner_worker_id()), send_reply_callback)) { RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by a previous worker " @@ -3435,6 +3457,11 @@ void CoreWorker::HandleGetObjectStatus(rpc::GetObjectStatusRequest request, PopulateObjectStatus(object_id, obj, reply); send_reply_callback(Status::OK(), nullptr, nullptr); }); + absl::Time end_time = absl::Now(); + RAY_LOG(INFO) << "CoreWorker HandleGetObjectStatus time: " + << absl::ToDoubleMilliseconds(end_time - start_time) << "ms"; + total_time += absl::ToDoubleMilliseconds(end_time - start_time); + RAY_LOG(INFO) << "CoreWorker Total HandleGetObjectStatus time: " << total_time << "ms"; } void CoreWorker::PopulateObjectStatus(const ObjectID &object_id, @@ -3447,6 +3474,8 @@ void CoreWorker::PopulateObjectStatus(const ObjectID &object_id, // in_plasma indicator on the message, and the caller will // have to facilitate a Plasma object transfer to get the // object value. + static double total_time = 0; + absl::Time start_time = absl::Now(); auto *object = reply->mutable_object(); if (obj->HasData()) { const auto &data = obj->GetData(); @@ -3468,6 +3497,11 @@ void CoreWorker::PopulateObjectStatus(const ObjectID &object_id, } reply->set_object_size(locality_data.value().object_size); } + absl::Time end_time = absl::Now(); + RAY_LOG(INFO) << "CoreWorker PopulateObjectStatus time: " + << absl::ToDoubleMilliseconds(end_time - start_time) << "ms"; + total_time += absl::ToDoubleMilliseconds(end_time - start_time); + RAY_LOG(INFO) << "CoreWorker Total PopulateObjectStatus time: " << total_time << "ms"; } void CoreWorker::HandleWaitForActorRefDeleted( diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 3898e048354e..eb0b6fbb423e 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -178,18 +178,26 @@ std::shared_ptr CoreWorkerMemoryStore::GetIfExists(const ObjectID &ob return ptr; } -void CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { +void CoreWorkerMemoryStore::Put(const RayObject &object, + const ObjectID &object_id, + std::optional return_object) { std::vector)>> async_callbacks; - RAY_LOG(DEBUG).WithField(object_id) << "Putting object into memory store."; + RAY_LOG(INFO).WithField(object_id) << "Putting object into memory store."; std::shared_ptr object_entry = nullptr; + if (return_object.has_value()) { + RAY_LOG(INFO) << "NOT COPYING DATA"; + } else { + RAY_LOG(INFO) << "COPYING DATA"; + } if (object_allocator_ != nullptr) { object_entry = object_allocator_(object, object_id); } else { object_entry = std::make_shared(object.GetData(), object.GetMetadata(), object.GetNestedRefs(), - true, - object.GetTensorTransport()); + return_object.has_value() ? false : true, + object.GetTensorTransport(), + std::move(return_object)); } // TODO(edoakes): we should instead return a flag to the caller to put the object in diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 17d1d4df1555..a0139abd5136 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -67,7 +67,9 @@ class CoreWorkerMemoryStore { /// /// \param[in] object The ray object. /// \param[in] object_id Object ID specified by user. - void Put(const RayObject &object, const ObjectID &object_id); + void Put(const RayObject &object, + const ObjectID &object_id, + std::optional return_object = std::nullopt); /// Get a list of objects from the object store. /// diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 75f84cd21ad4..bb4b5e899de4 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -534,17 +534,18 @@ size_t TaskManager::NumPendingTasks() const { } StatusOr TaskManager::HandleTaskReturn(const ObjectID &object_id, - const rpc::ReturnObject &return_object, + rpc::ReturnObject return_object, const NodeID &worker_node_id, bool store_in_plasma) { bool direct_return = false; reference_counter_.UpdateObjectSize(object_id, return_object.size()); - RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " - << return_object.size(); + RAY_LOG(INFO) << "Task return object " << object_id << " has size " + << return_object.size(); const auto nested_refs = VectorFromProtobuf(return_object.nested_inlined_refs()); if (return_object.in_plasma()) { + RAY_LOG(INFO) << "IN PLASMA"; // NOTE(swang): We need to add the location of the object before marking // it as local in the in-memory store so that the data locality policy // will choose the right raylet for any queued dependent tasks. @@ -552,6 +553,7 @@ StatusOr TaskManager::HandleTaskReturn(const ObjectID &object_id, // Mark it as in plasma with a dummy object. in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); } else { + RAY_LOG(INFO) << "NOT IN PLASMA"; // NOTE(swang): If a direct object was promoted to plasma, then we do not // record the node ID that it was pinned at, which means that we will not // be able to reconstruct it if the plasma object copy is lost. However, @@ -589,8 +591,16 @@ StatusOr TaskManager::HandleTaskReturn(const ObjectID &object_id, return s; } } else { - in_memory_store_.Put(object, object_id); + static double total_time = 0; + RAY_LOG(INFO) << "Put object into memory store"; + absl::Time start_time = absl::Now(); + in_memory_store_.Put(object, object_id, std::move(return_object)); direct_return = true; + absl::Time end_time = absl::Now(); + total_time += absl::ToDoubleMilliseconds(end_time - start_time); + RAY_LOG(INFO) << "Put object into memory store time: " + << absl::ToDoubleMilliseconds(end_time - start_time) << "ms"; + RAY_LOG(INFO) << "Total time: " << total_time << "ms"; } } @@ -763,8 +773,9 @@ void TaskManager::MarkEndOfStream(const ObjectID &generator_id, } bool TaskManager::HandleReportGeneratorItemReturns( - const rpc::ReportGeneratorItemReturnsRequest &request, + rpc::ReportGeneratorItemReturnsRequest request, const ExecutionSignalCallback &execution_signal_callback) { + absl::Time start_time = absl::Now(); const auto &generator_id = ObjectID::FromBinary(request.generator_id()); const auto &task_id = generator_id.TaskId(); int64_t item_index = request.item_index(); @@ -804,8 +815,9 @@ bool TaskManager::HandleReportGeneratorItemReturns( } // TODO(sang): Support the regular return values as well. + RAY_LOG(INFO) << "HandleReportGeneratorItemReturns"; size_t num_objects_written = 0; - for (const auto &return_object : request.dynamic_return_objects()) { + for (auto &return_object : *request.mutable_dynamic_return_objects()) { const auto object_id = ObjectID::FromBinary(return_object.object_id()); RAY_LOG(DEBUG) << "Write an object " << object_id @@ -823,7 +835,7 @@ bool TaskManager::HandleReportGeneratorItemReturns( reference_counter_.UpdateObjectPendingCreation(object_id, false); StatusOr put_res = HandleTaskReturn(object_id, - return_object, + std::move(return_object), NodeID::FromBinary(request.worker_addr().node_id()), /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); if (!put_res.ok()) { @@ -861,6 +873,10 @@ bool TaskManager::HandleReportGeneratorItemReturns( // No need to backpressure. execution_signal_callback(Status::OK(), total_consumed); } + absl::Time end_time = absl::Now(); + static double total_time = 0; + total_time += absl::ToDoubleMilliseconds(end_time - start_time); + RAY_LOG(INFO) << "HandleReportGeneratorItemReturns time: " << total_time << "ms"; return num_objects_written != 0; } @@ -895,7 +911,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &worker_addr, bool is_application_error) { - RAY_LOG(DEBUG) << "Completing task " << task_id; + RAY_LOG(INFO) << "Completing task " << task_id; bool first_execution = false; const auto store_in_plasma_ids = diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 4b1c32e1e873..39ba9ce13753 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -296,7 +296,7 @@ class TaskManager : public TaskManagerInterface { /// /// \return True if a task return is registered. False otherwise. bool HandleReportGeneratorItemReturns( - const rpc::ReportGeneratorItemReturnsRequest &request, + rpc::ReportGeneratorItemReturnsRequest request, const ExecutionSignalCallback &execution_signal_callback) ABSL_LOCKS_EXCLUDED(mu_); /// Temporarily register a given generator return reference. @@ -613,7 +613,7 @@ class TaskManager : public TaskManagerInterface { /// return object. On success, sets direct_return_out to true if the object's value /// was returned directly by value (not stored in plasma). StatusOr HandleTaskReturn(const ObjectID &object_id, - const rpc::ReturnObject &return_object, + rpc::ReturnObject return_object, const NodeID &worker_node_id, bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 684eb7c93daf..6cf063dedd64 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -719,9 +719,12 @@ void NodeManager::WarnResourceDeadlock() { // Check if any progress is being made on this raylet. if (worker_pool_.IsWorkerAvailableForScheduling()) { + RAY_LOG(INFO) << "Progress is being made in a task, don't warn."; // Progress is being made in a task, don't warn. resource_deadlock_warned_ = 0; return; + } else { + RAY_LOG(INFO) << "No progress is being made in a task, warn."; } auto exemplar = cluster_task_manager_.AnyPendingTasksForResourceAcquisition(