-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Description
What happened + What you expected to happen
See #25177
Sending large binary strings over the Ray object store scales superlinearly. Benchmarks:
Checkpoint ser/de for 0.50 GB took -2.34 seconds.
Checkpoint ser/de for 1.00 GB took -5.18 seconds.
Checkpoint ser/de for 2.00 GB took -28.52 seconds.
Checkpoint ser/de for 4.00 GB took -90.82 seconds.
We would expect this to scale linearly (i.e. 2GB should take 10 seconds, 4GB should take 20 seconds).
These benchmarks were on a Macbook 2019 16 inch. This might be related to object spilling (?) - we should run this on beefier instances to confirm (I can do that in a bit).
In Ray Tune, we avoid this currently by chunking the data and streaming it via an actor: https://github.com/ray-project/ray/blob/master/python/ray/tune/utils/file_transfer.py#L280
We can do such chunking in Ray AIR as well, however, I think it would be good to resolve this on the core side, rather than having to do this on the library level.
Thus, a few ideas:
1. Ray core support for generators (see @stephanie-wang's proposal) and maybe a utility for chunking data
2. Streaming support out of the box (automatic chunking/resolving of generators)
For 2, we could automatically adjust to available object store size, so that potentially we could transfer objects that are larger than the object store.
(this doesn't really work as the data remains in the object store)
Versions / Dependencies
Latest master
Reproduction script
import os
import shutil
import time
import ray
from ray.ml import Checkpoint
def create_large_checkpoint(size_in_gb: float) -> str:
path = f"/tmp/checkpoint_large_{size_in_gb:.2f}_gb"
if os.path.exists(path):
print(f"Return existing checkpoint in {path}")
return path
print(f"Creating checkpoint in {path}")
os.makedirs(path)
with open(os.path.join(path, "checkpoint.bin"), "wb") as fp:
fp.write(os.urandom(int(size_in_gb * 1024 * 1024 * 1024)))
return path
checkpoint_size = 1.0
checkpoint_path = create_large_checkpoint(size_in_gb=checkpoint_size)
ray.init()
shutil.rmtree("/tmp/checkpoint_benchmark_output", ignore_errors=True)
start_time = time.monotonic()
Checkpoint.from_object_ref(
Checkpoint.from_directory(checkpoint_path).to_object_ref()
).to_directory("/tmp/checkpoint_benchmark_output")
time_taken = start_time - time.monotonic()
print(f"Checkpoint ser/de for {checkpoint_size:.2f} GB took {time_taken:.2f} seconds.")
Issue Severity
Medium: It is a significant difficulty but I can work around it.