Skip to content

[core] Objects created by num_returns="dynamic" temporarily leak if generator errors #28686

@stephanie-wang

Description

@stephanie-wang

What happened + What you expected to happen

If a dynamic generator stores some objects in shared memory, then errors, we store an error in the outer ObjectRef instead of the ObjectRefGenerator. Currently, the already-created objects will not get cleaned up until the outer ObjectRef goes out of scope, even though their ObjectRefs aren't accessible by any Python worker.

We should instead clean up the objects ASAP. We can do it by either:

  1. Having the owner notify the worker's raylet to unpin the objects if the dynamic objects returned by the worker don't match the objects that the worker's raylet thinks were generated.
  2. When the worker's raylet asks the owner for notifications about the inner ObjectRefs, we can queue the response until the outer ObjectRef has been created.

We can split the issue down into two sub-issues:

  • Handling application-level errors (e.g., Python exception)
  • Handling system-level errors (e.g., worker died)

Versions / Dependencies

4.0dev

Reproduction script

import pytest
import numpy as np
import sys

import ray
from ray._private.internal_api import memory_summary
from ray._private.test_utils import wait_for_condition


DRIVER_PID = "Driver"

def count(memory_str, substr):
    substr = substr[:42]
    n = 0
    for line in memory_str.split("\n"):
        if substr in line:
            n += 1
    return n

def test_dynamic_generator_exception(ray_start_regular):
    address = ray_start_regular["address"]
    @ray.remote(num_returns="dynamic")
    def dynamic_generator(num_returns):
        for i in range(num_returns):
            yield np.ones(1_000_000, dtype=np.int8) * i
        raise RuntimeError("error")

    @ray.remote
    def read(gen):
        for i, ref in enumerate(gen):
            if ray.get(ref)[0] != i:
                return False
        return True

    gen_ref = dynamic_generator.remote(10)

    with pytest.raises(ray.exceptions.RayTaskError):
        ray.get(gen_ref)

    # Fails if this line is deleted.
    del gen_ref

    info = memory_summary(address)
    wait_for_condition(lambda: count(memory_summary(address), DRIVER_PID) <= 1)

Issue Severity

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    P2Important issue, but not time-criticalbugSomething that is supposed to be working; but isn'tcoreIssues that should be addressed in Ray Corecore-correctnessLeak, crash, hangcore-object-store

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions