Skip to content

Commit 9392c25

Browse files
authored
[core][gpu-objects] Garbage collection (#53911)
* Add a callback to send a RPC `FreeActorObject` to the sender actor to remove the GPU object when the GPU object is out-of-scope. * Avoid updating the reference count immediately when GPU object references are inlined. Without this change, the following sequence may occur for small objects: * (1) The sender actor stores tensors in the GPUObjectManager. * (2) The reference count is decremented when the argument is passed to the destination actor task, and the driver sends an RPC FreeActorObject to the sender actor. * (3) The driver submits `__ray_send__` to the sender actor, but by then, the tensors have already been removed. * Instead, we update the reference count when the task finishes. ## Related issue number Closes #51262 Closes #51273 --------- Signed-off-by: Kai-Hsun Chen <[email protected]>
1 parent 3a2b6d8 commit 9392c25

18 files changed

+309
-53
lines changed

python/ray/_private/serialization.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,6 @@ def _deserialize_pickle5_data(
292292
gpu_object_manager.fetch_gpu_object(object_id)
293293
tensors = gpu_object_manager.gpu_object_store.get_gpu_object(object_id)
294294
ctx.reset_out_of_band_tensors(tensors)
295-
# TODO(kevin85421): The current garbage collection implementation for the in-actor object store
296-
# is naive. We garbage collect each object after it is consumed once.
297295
gpu_object_manager.gpu_object_store.remove_gpu_object(object_id)
298296

299297
try:

python/ray/_raylet.pyx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2287,6 +2287,12 @@ cdef execute_task_with_cancellation_handler(
22872287
f"Exited because worker reached max_calls={execution_info.max_calls}"
22882288
" for this method.")
22892289

2290+
cdef void free_actor_object_callback(const CObjectID &c_object_id) nogil:
2291+
with gil:
2292+
object_id = c_object_id.Hex().decode()
2293+
gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
2294+
gpu_object_manager.gpu_object_store.remove_gpu_object(object_id)
2295+
22902296
cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
22912297
cdef bytes py_bytes = ray_error.to_bytes()
22922298
return make_shared[LocalMemoryBuffer](
@@ -2998,6 +3004,7 @@ cdef class CoreWorker:
29983004
options.driver_name = driver_name
29993005
options.initialize_thread_callback = initialize_pygilstate_for_thread
30003006
options.task_execution_callback = task_execution_handler
3007+
options.free_actor_object_callback = free_actor_object_callback
30013008
options.check_signals = check_signals
30023009
options.gc_collect = gc_collect
30033010
options.spill_objects = spill_objects_handler

python/ray/experimental/gpu_object_manager/gpu_object_manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ def __ray_fetch_gpu_object__(self, obj_id: str):
4444
obj_id
4545
), f"obj_id={obj_id} not found in GPU object store"
4646
tensors = gpu_object_store.get_gpu_object(obj_id)
47-
# TODO(kevin85421): The current garbage collection implementation for the
48-
# in-actor object store is naive. We garbage collect each object after it
49-
# is consumed once.
50-
gpu_object_store.remove_gpu_object(obj_id)
5147
return tensors
5248

5349

python/ray/experimental/gpu_object_manager/gpu_object_store.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ def __ray_send__(self, communicator_name: str, obj_id: str, dst_rank: int):
5656
f"tensor device {tensor.device} does not match device {device}"
5757
)
5858
collective.send(tensor, dst_rank, group_name=communicator_name)
59-
# TODO(kevin85421): The current garbage collection implementation for the
60-
# in-actor object store is naive. We garbage collect each object after it
61-
# is consumed once.
62-
gpu_object_store.remove_gpu_object(obj_id)
6359

6460

6561
def __ray_recv__(
@@ -94,10 +90,6 @@ def __ray_fetch_gpu_object__(self, obj_id: str):
9490
obj_id
9591
), f"obj_id={obj_id} not found in GPU object store"
9692
tensors = gpu_object_store.get_gpu_object(obj_id)
97-
# TODO(kevin85421): The current garbage collection implementation for the
98-
# in-actor object store is naive. We garbage collect each object after it
99-
# is consumed once.
100-
gpu_object_store.remove_gpu_object(obj_id)
10193
return tensors
10294

10395

python/ray/includes/libcoreworker.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
406406
int64_t generator_backpressure_num_objects,
407407
CTensorTransport tensor_transport
408408
) nogil) task_execution_callback
409+
(void(const CObjectID &) nogil) free_actor_object_callback
409410
(function[void()]() nogil) initialize_thread_callback
410411
(CRayStatus() nogil) check_signals
411412
(void(c_bool) nogil) gc_collect

python/ray/tests/test_gpu_objects_gloo.py

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import ray
66
from ray.experimental.collective import create_collective_group
77
from ray._private.custom_types import TensorTransportEnum
8+
from ray._common.test_utils import wait_for_condition
89

910
# tensordict is not supported on macos ci, so we skip the tests
1011
support_tensordict = sys.platform != "darwin"
@@ -32,10 +33,97 @@ def get_gpu_object(self, obj_id: str):
3233
)
3334
if gpu_object_store.has_gpu_object(obj_id):
3435
gpu_object = gpu_object_store.get_gpu_object(obj_id)
35-
print(f"gpu_object: {gpu_object}")
3636
return gpu_object
3737
return None
3838

39+
def get_num_gpu_objects(self):
40+
gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
41+
return len(gpu_object_manager.gpu_object_store.gpu_object_store)
42+
43+
44+
@pytest.mark.parametrize("data_size_bytes", [100])
45+
def test_gc_gpu_object(ray_start_regular, data_size_bytes):
46+
"""
47+
For small data, GPU objects are inlined, but the actual data lives
48+
on the remote actor. Therefore, if we decrement the reference count
49+
upon inlining, we may cause the tensors on the sender actor to be
50+
freed before transferring to the receiver actor.
51+
52+
# TODO(kevin85421): Add a test for large CPU data that is not inlined
53+
# after https://github.com/ray-project/ray/issues/54281 is fixed.
54+
"""
55+
world_size = 2
56+
actors = [GPUTestActor.remote() for _ in range(world_size)]
57+
create_collective_group(actors, backend="torch_gloo")
58+
59+
small_tensor = torch.randn((1,))
60+
cpu_data = b"1" * data_size_bytes
61+
data = [small_tensor, cpu_data]
62+
sender = actors[0]
63+
receiver = actors[1]
64+
65+
ref1 = sender.echo.remote(data)
66+
ref2 = receiver.double.remote(ref1)
67+
ref3 = receiver.double.remote(ref1)
68+
69+
result = ray.get(ref2)
70+
assert result[0] == pytest.approx(small_tensor * 2)
71+
assert result[1] == cpu_data * 2
72+
result = ray.get(ref3)
73+
assert result[0] == pytest.approx(small_tensor * 2)
74+
assert result[1] == cpu_data * 2
75+
76+
wait_for_condition(
77+
lambda: ray.get(receiver.get_num_gpu_objects.remote()) == 0,
78+
timeout=10,
79+
retry_interval_ms=100,
80+
)
81+
82+
del ref1
83+
84+
wait_for_condition(
85+
lambda: ray.get(sender.get_num_gpu_objects.remote()) == 0,
86+
timeout=10,
87+
retry_interval_ms=100,
88+
)
89+
90+
91+
@pytest.mark.parametrize("data_size_bytes", [100])
92+
def test_gc_del_ref_before_recv_finish(ray_start_regular, data_size_bytes):
93+
"""
94+
This test deletes the ObjectRef of the GPU object before calling
95+
`ray.get` to ensure the receiver finishes receiving the GPU object.
96+
"""
97+
world_size = 2
98+
actors = [GPUTestActor.remote() for _ in range(world_size)]
99+
create_collective_group(actors, backend="torch_gloo")
100+
101+
small_tensor = torch.randn((1,))
102+
cpu_data = b"1" * data_size_bytes
103+
data = [small_tensor, cpu_data]
104+
sender = actors[0]
105+
receiver = actors[1]
106+
107+
ref1 = sender.echo.remote(data)
108+
ref2 = receiver.double.remote(ref1)
109+
110+
del ref1
111+
112+
result = ray.get(ref2)
113+
assert result[0] == pytest.approx(small_tensor * 2)
114+
assert result[1] == cpu_data * 2
115+
116+
wait_for_condition(
117+
lambda: ray.get(receiver.get_num_gpu_objects.remote()) == 0,
118+
timeout=10,
119+
retry_interval_ms=100,
120+
)
121+
wait_for_condition(
122+
lambda: ray.get(sender.get_num_gpu_objects.remote()) == 0,
123+
timeout=10,
124+
retry_interval_ms=100,
125+
)
126+
39127

40128
def test_p2p(ray_start_regular):
41129
world_size = 2
@@ -149,9 +237,10 @@ def test_trigger_out_of_band_tensor_transfer(ray_start_regular):
149237

150238
tensor = torch.tensor([1, 2, 3])
151239
gpu_ref = src_actor.echo.remote(tensor)
240+
gpu_obj_id = gpu_ref.hex()
152241

153242
# Check src_actor has the GPU object
154-
ret_val_src = ray.get(src_actor.get_gpu_object.remote(gpu_ref.hex()))
243+
ret_val_src = ray.get(src_actor.get_gpu_object.remote(gpu_obj_id))
155244
assert ret_val_src is not None
156245
assert len(ret_val_src) == 1
157246
assert torch.equal(ret_val_src[0], tensor)
@@ -160,15 +249,11 @@ def test_trigger_out_of_band_tensor_transfer(ray_start_regular):
160249
gpu_object_manager.add_gpu_object_ref(gpu_ref, src_actor, TensorTransportEnum.GLOO)
161250

162251
# Trigger out-of-band tensor transfer from src_actor to dst_actor.
163-
# The GPU object will be removed from src_actor's GPU object store
164-
# because the current GC implementation garbage collects GPU objects
165-
# whenever they are consumed once.
166252
task_args = (gpu_ref,)
167253
gpu_object_manager.trigger_out_of_band_tensor_transfer(dst_actor, task_args)
168-
assert ray.get(src_actor.get_gpu_object.remote(gpu_ref.hex())) is None
169254

170255
# Check dst_actor has the GPU object
171-
ret_val_dst = ray.get(dst_actor.get_gpu_object.remote(gpu_ref.hex()))
256+
ret_val_dst = ray.get(dst_actor.get_gpu_object.remote(gpu_obj_id))
172257
assert ret_val_dst is not None
173258
assert len(ret_val_dst) == 1
174259
assert torch.equal(ret_val_dst[0], tensor)

src/ray/core_worker/core_worker.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,13 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
734734
},
735735
push_error_callback,
736736
RayConfig::instance().max_lineage_bytes(),
737-
*task_event_buffer_);
737+
*task_event_buffer_,
738+
/*get_actor_rpc_client_callback=*/
739+
[this](const ActorID &actor_id) {
740+
auto addr = actor_task_submitter_->GetActorAddress(actor_id);
741+
RAY_CHECK(addr.has_value()) << "Actor address not found for actor " << actor_id;
742+
return core_worker_client_pool_->GetOrConnect(addr.value());
743+
});
738744

739745
// Create an entry for the driver task in the task table. This task is
740746
// added immediately with status RUNNING. This allows us to push errors
@@ -4952,6 +4958,14 @@ void CoreWorker::HandlePlasmaObjectReady(rpc::PlasmaObjectReadyRequest request,
49524958
send_reply_callback(Status::OK(), nullptr, nullptr);
49534959
}
49544960

4961+
void CoreWorker::HandleFreeActorObject(rpc::FreeActorObjectRequest request,
4962+
rpc::FreeActorObjectReply *reply,
4963+
rpc::SendReplyCallback send_reply_callback) {
4964+
ObjectID object_id = ObjectID::FromBinary(request.object_id());
4965+
options_.free_actor_object_callback(object_id);
4966+
send_reply_callback(Status::OK(), nullptr, nullptr);
4967+
}
4968+
49554969
void CoreWorker::SetActorId(const ActorID &actor_id) {
49564970
absl::MutexLock lock(&mutex_);
49574971
if (!options_.is_local_mode) {

src/ray/core_worker/core_worker.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
12681268
void HandleNumPendingTasks(rpc::NumPendingTasksRequest request,
12691269
rpc::NumPendingTasksReply *reply,
12701270
rpc::SendReplyCallback send_reply_callback) override;
1271+
1272+
// Free GPU objects from the in-actor GPU object store.
1273+
void HandleFreeActorObject(rpc::FreeActorObjectRequest request,
1274+
rpc::FreeActorObjectReply *reply,
1275+
rpc::SendReplyCallback send_reply_callback) override;
1276+
12711277
///
12721278
/// Public methods related to async actor call. This should only be used when
12731279
/// the actor is (1) direct actor and (2) using async mode.

src/ray/core_worker/core_worker_options.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ struct CoreWorkerOptions {
8787
raylet_ip_address(""),
8888
driver_name(""),
8989
task_execution_callback(nullptr),
90+
free_actor_object_callback(nullptr),
9091
check_signals(nullptr),
9192
initialize_thread_callback(nullptr),
9293
gc_collect(nullptr),
@@ -146,6 +147,8 @@ struct CoreWorkerOptions {
146147
std::string driver_name;
147148
/// Application-language worker callback to execute tasks.
148149
TaskExecutionCallback task_execution_callback;
150+
/// Callback to free GPU object from the in-actor object store.
151+
std::function<void(const ObjectID &)> free_actor_object_callback;
149152
/// Application-language callback to check for signals that have been received
150153
/// since calling into C++. This will be called periodically (at least every
151154
/// 1s) during long-running operations. If the function returns anything but StatusOK,

src/ray/core_worker/reference_count.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
771771
/// counting is enabled, then some raylet must be pinning the object value.
772772
/// This is the address of that raylet.
773773
std::optional<NodeID> pinned_at_raylet_id;
774+
/// TODO(kevin85421): Make tensor_transport a required field for all constructors.
775+
///
774776
/// The transport used for the object.
775777
rpc::TensorTransport tensor_transport = rpc::TensorTransport::OBJECT_STORE;
776778
/// Whether we own the object. If we own the object, then we are

0 commit comments

Comments
 (0)