Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import warnings
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import Future
import threading
from typing import Any
from typing import Dict
from typing import List
Expand All @@ -43,9 +45,94 @@
from vineyard._C import _connect
from vineyard.core.builder import BuilderContext
from vineyard.core.builder import put
from vineyard.core.resolver import get_current_resolvers
from vineyard.core.resolver import ResolverContext
from vineyard.core.resolver import get

class LazyObject:
"""A helper class for lazy fetching of vineyard objects."""

def __init__(self, client, object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[BuilderContext] = None, fetch: bool = False, **kwargs):
"""
Initialize the LazyObject.

Args:
client (Client): The Vineyard client.
object_id (Optional[ObjectID]): The object id to fetch.
name (Optional[str]): The name of the object to fetch.
resolver (Optional[BuilderContext]): The resolver to use for fetching the object.
fetch (bool): Whether to fetch the object immediately.
"""
self.client = client
self.object_id = object_id
self.name = name
self.resolver = resolver
self.kwargs = kwargs
self._result: Optional[Any] = None
self._exception: Optional[Exception] = None
self._ready_event = threading.Event()

# Mark the future as not started yet
self.future: Optional[Future] = None

# Start the asynchronous fetch
self._start_fetch()

def _start_fetch(self):
"""Start the asynchronous fetch operation."""
if self.future is None:
self.future = self.client._lazy_get_thread_pool.submit(self._fetch)
self.future.add_done_callback(self._callback)

def _fetch(self) -> None:
"""Internal method to fetch the object."""
obj = self.client.get(object_id=self.object_id, name=self.name, resolver=self.resolver, fetch=self.fetch, **self.kwargs)
self._result = obj

def _callback(self, fut: Future):
"""Callback executed when the future completes."""
try:
fut.result() # This will raise if the fetch failed
except Exception as e:
self._exception = e
finally:
self._ready_event.set()

def get(self) -> Any:
"""
Retrieve the object. If the object is not ready, raise an exception.

Returns:
Any: The fetched object.

Raises:
Exception: If an error occurred during fetching.
"""
if self.future is None:
raise RuntimeError("Fetch operation was not started.")

if self.future.done():
if self._exception:
raise self._exception
return self._result
else:
self.cancel()
raise Exception("Data not ready, fetch operation has been canceled.")

def is_ready(self) -> bool:
"""
Check if the tensor has been fetched.

Returns:
bool: True if the tensor has been fetched, False otherwise.
"""
return self._ready_event.is_set()

def cancel(self):
"""Cancel the fetch operation"""
if self.future and not self.future.done():
self.future.cancel()


def _apply_docstring(func):
def _apply(fn):
Expand Down Expand Up @@ -168,6 +255,7 @@ def __init__(
session: int = None,
username: str = None,
password: str = None,
max_workers: int = 8,
config: str = None,
):
"""Connects to the vineyard IPC socket and RPC socket.
Expand Down Expand Up @@ -211,6 +299,8 @@ def __init__(
is enabled.
password: Optional, the required password of vineyardd when authentication
is enabled.
max_workers: Optional, the maximum number of threads that can be used to
asynchronously get/put objects from/to vineyard. Default is 8.
config: Optional, can either be a path to a YAML configuration file or
a path to a directory containing the default config file
`vineyard-config.yaml`. Also, the environment variable
Expand Down Expand Up @@ -292,6 +382,12 @@ def __init__(

self._spread = False
self._compression = True

# Initialize thread pool for lazy_get
self._lazy_get_thread_pool = ThreadPoolExecutor(max_workers=max_workers)
self._lazy_get_futures: Dict[Optional[ObjectID, str], Any] = {}
self._lazy_get_lock = threading.Lock()

if self._ipc_client is None and self._rpc_client is None:
raise ConnectionError(
"Failed to connect to vineyard via both IPC and RPC connection. "
Expand Down Expand Up @@ -874,5 +970,36 @@ def with_spread(self, enabled: bool = True):
yield
self.spread = tmp_spread

def lazy_get(self, object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[BuilderContext] = None, fetch: bool = False, **kwargs) -> LazyObject:
"""Asynchronously fetch an object corresponding to the given object name.

Args:
object_id (Optional[ObjectID]): The object id to fetch.
name (Optional[str]): The name of the object to fetch.
resolver (Optional[BuilderContext]): The resolver to use for fetching the object.
fetch (bool): Whether to fetch the object immediately.

Returns:
LazyObject: An object that represents the future result of the fetch operation.
"""
with self._lazy_get_lock:
if object_id is not None and object_id in self._lazy_get_futures:
return self._lazy_get_futures[object_id]
if name is not None and name in self._lazy_get_futures:
return self._lazy_get_futures[name]

resolver = resolver or get_current_resolvers()
lazy_object = LazyObject(self, object_id,, resolver, **kwargs)
self._lazy_get_futures[name] = lazy_object

def _cleanup(fut):
with self._lazy_get_lock:
self._lazy_get_futures.pop(name, None)

# Attach cleanup logic once the tensor is fetched or fails
if lazy_object.future:
lazy_object.future.add_done_callback(_cleanup)

return lazy_object

__all__ = ['Client']
48 changes: 48 additions & 0 deletions python/vineyard/core/tests/test_lazy_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import vineyard
import torch

Check warning on line 2 in python/vineyard/core/tests/test_lazy_get.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/tests/test_lazy_get.py#L2

Unused import torch
from vineyard.contrib.ml.torch import torch_context

client = vineyard.connect('/var/run/vineyard.sock1')

#create a random tensor
#x = torch.rand(100, 100, 100)
#client.delete(name="test_lazy_tensor")
#with torch_context(client):
# client.put(x, name="test_lazy_tensor", persist=True)

test_kwargs = {'test': 'test'}
with torch_context(client):
lazy_tensor = client.lazy_get(name="test_lazy_tensor111", **test_kwargs)
"""

Check warning on line 16 in python/vineyard/core/tests/test_lazy_get.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/tests/test_lazy_get.py#L16

String statement has no effect
try:
for i in range(8):
name = "test_lazy_tensor" + str(i)
print("Creating tensor", name)
lazy_tensor = client.lazy_get("test_lazy_tensor")
except Exception as e:
print(e)
"""

# At a later point, attempt to access the tensor

try:
if lazy_tensor.is_ready():
print("Tensor is ready")
#try:
# tensor = lazy_tensor.get()
# print(tensor)
#except RuntimeError as e:
# pass
else:
print("Tensor is not ready")
#import time
#time.sleep(5)
# print("Waiting for 5 seconds")
#if lazy_tensor.is_ready():
# print("Tensor is ready")
# tensor = lazy_tensor.get()
# print(tensor)
except Exception as e:
print(e)

print("Done")

Check notice on line 48 in python/vineyard/core/tests/test_lazy_get.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/tests/test_lazy_get.py#L48

Final newline missing
Loading