Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,41 @@ test/e2e/v1/*.log
test/e2e/*.log
# Exclude build artifacts
bin/

# ---------------- Python SDK & tooling ----------------
# Virtual environments
.venv/
venv/
env/

# Bytecode caches
__pycache__/
*.py[cod]

# Build artifacts / metadata
python/dfcache_sdk/.pytest_cache/
.pytest_cache/
*.egg-info/
*.egg
dist/
build/

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Tool caches
.mypy_cache/
.ruff_cache/
.coverage
coverage.xml
htmlcov/

# Temporary files
*.swp
*.bak
*.tmp

# Jupyter (if added later)
.ipynb_checkpoints/

60 changes: 60 additions & 0 deletions python/dfcache_sdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Dragonfly Dfcache Python SDK

This is an initial, lightweight Python interface to Dragonfly's `dfcache` operations aimed at AI and data workflows.

## Features
- Import a local file into Dragonfly P2P cache
- Export a cached file by content ID (CID) to a local path
- Stat (existence check) for a CID (optionally local-only)
- Delete a cached CID
- Health check convenience

## Design
MVP implementation shells out to the existing `dfcache` CLI binary rather than using gRPC. This avoids proto generation overhead. A future iteration can switch to direct gRPC calls and piece streaming for advanced scenarios.

## Install
```bash
pip install -e python/dfcache_sdk
```
Ensure the Dragonfly `dfcache` binary is built:
```bash
make build-dfcache
```

## Usage
```python
from dragonfly_dfcache import DfCacheClient, NotFoundError

client = DfCacheClient()

cid = "sha256:abcdef1234567890" # Example digest
source_file = "/data/model.bin"
export_path = "/tmp/model.bin"

# Import
client.import_cache(cid=cid, path=source_file)

# Stat
exists = client.stat(cid)
print("Exists?", exists)

# Export
client.export(cid=cid, output=export_path)

# Delete
client.delete(cid)
```

## Health
```python
client.check_health() # True if dfcache CLI and dfdaemon responsive
```

## Roadmap
- Replace CLI subprocess calls with gRPC client (requires Python stubs)
- Add streaming progress callbacks for export/import
- Optional async API using asyncio subprocess
- Support rate limiting and advanced flags

## License
Apache 2.0
38 changes: 38 additions & 0 deletions python/dfcache_sdk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "dragonfly-dfcache-sdk"
version = "0.1.0"
description = "Unofficial Python SDK for Dragonfly dfcache/dfdaemon operations (stat, import, export, delete)."
readme = "README.md"
authors = [{name = "Dragonfly Authors"}]
license = {text = "Apache-2.0"}
keywords = ["dragonfly", "p2p", "cache", "dfcache", "dfdaemon"]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent"
]
requires-python = ">=3.9"
dependencies = [
"typing-extensions>=4.0.0"
]

[project.optional-dependencies]
dev = ["pytest", "mypy", "ruff"]

[tool.pytest.ini_options]
testpaths = ["python/dfcache_sdk/tests"]
pythonpath = ["python/dfcache_sdk/src"]

[tool.setuptools.packages.find]
where = ["src"]

[tool.mypy]
python_version = "3.11"
strict = true

[tool.ruff]
line-length = 100
7 changes: 7 additions & 0 deletions python/dfcache_sdk/src/dragonfly_dfcache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .client import DfCacheClient, DfCacheError, NotFoundError

__all__ = [
"DfCacheClient",
"DfCacheError",
"NotFoundError",
]
114 changes: 114 additions & 0 deletions python/dfcache_sdk/src/dragonfly_dfcache/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

import os
import subprocess
import json
from typing import Optional

from .errors import DfCacheError, NotFoundError, DaemonUnavailableError

_D7Y_SCHEME = "d7y:/" # prefix used to build internal URL from cid


def _cid_to_url(cid: str) -> str:
from urllib.parse import quote
return f"{_D7Y_SCHEME}{quote(cid, safe='') }"


def _run_cmd(args: list[str], timeout: float) -> subprocess.CompletedProcess:
return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout)


class DfCacheClient:
"""Thin wrapper invoking existing dfcache CLI for stat/import/export/delete.

This avoids needing Python gRPC stubs initially; later we can switch to gRPC.
"""

def __init__(self, binary: Optional[str] = None, timeout: float = 10.0) -> None:
self._binary = binary or self._detect_binary()
self._timeout = timeout

def _detect_binary(self) -> str:
# Allow explicit override via env
env_bin = os.getenv("DRAGONFLY_DFCACHE_BINARY")
if env_bin and os.path.isfile(env_bin):
return env_bin

# Search common build output directories (linux/darwin, amd64/arm64)
bin_dir = os.path.join(os.getcwd(), "bin")
candidates: list[str] = []
if os.path.isdir(bin_dir):
for root, dirs, files in os.walk(bin_dir):
if "dfcache" in files:
candidates.append(os.path.join(root, "dfcache"))
# Fallback to PATH lookup
candidates.append("dfcache")
for c in candidates:
if os.path.isfile(c) and os.access(c, os.X_OK):
return c
raise DaemonUnavailableError(
"dfcache binary not found. Build it via 'make build-dfcache' or set DRAGONFLY_DFCACHE_BINARY."
)

def stat(self, cid: str, tag: str = "", local_only: bool = False, timeout: Optional[float] = None) -> bool:
args = [self._binary, "stat", "-i", cid]
if tag:
args += ["-t", tag]
if local_only:
args += ["-l"]
cp = _run_cmd(args, timeout or self._timeout)
if cp.returncode == 0:
return True
if "not exist" in cp.stderr.lower() or "not exist" in cp.stdout.lower():
return False
if cp.returncode != 0:
raise DfCacheError(f"stat failed: {cp.stderr.strip() or cp.stdout.strip()}")
return False

def import_cache(self, cid: str, path: str, tag: str = "", timeout: Optional[float] = None) -> None:
if not os.path.isfile(path):
raise FileNotFoundError(path)
args = [self._binary, "import", "-i", cid, "-I", path]
if tag:
args += ["-t", tag]
cp = _run_cmd(args, timeout or self._timeout)
if cp.returncode != 0:
raise DfCacheError(f"import failed: {cp.stderr.strip() or cp.stdout.strip()}")

def export(self, cid: str, output: str, tag: str = "", local_only: bool = False, timeout: Optional[float] = None) -> None:
parent = os.path.dirname(os.path.abspath(output))
os.makedirs(parent, exist_ok=True)
args = [self._binary, "export", "-i", cid, "-O", output]
if tag:
args += ["-t", tag]
if local_only:
args += ["-l"]
cp = _run_cmd(args, timeout or self._timeout)
if cp.returncode != 0:
if "not exist" in cp.stderr.lower() or "not exist" in cp.stdout.lower():
raise NotFoundError(f"cache {cid} not found")
raise DfCacheError(f"export failed: {cp.stderr.strip() or cp.stdout.strip()}")

def delete(self, cid: str, tag: str = "", timeout: Optional[float] = None) -> None:
args = [self._binary, "delete", "-i", cid]
if tag:
args += ["-t", tag]
cp = _run_cmd(args, timeout or self._timeout)
if cp.returncode != 0 and "not exist" not in cp.stderr.lower():
raise DfCacheError(f"delete failed: {cp.stderr.strip() or cp.stdout.strip()}")

def check_health(self) -> bool:
try:
self.stat("health-check-cid")
return True
except DfCacheError:
return False

def info_json(self) -> str:
data = {
"binary": self._binary,
"timeout": self._timeout,
}
return json.dumps(data)

10 changes: 10 additions & 0 deletions python/dfcache_sdk/src/dragonfly_dfcache/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class DfCacheError(Exception):
"""Base exception for dfcache SDK."""


class NotFoundError(DfCacheError):
"""Raised when a cache entry does not exist (maps to os.ErrNotExist)."""


class DaemonUnavailableError(DfCacheError):
"""Raised when dfdaemon is not reachable or unhealthy."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
syntax = "proto3";
package dfdaemon.v1;

// Minimal subset of Dragonfly dfdaemon v1 API required for dfcache operations.
// NOTE: This is a reduced, compatibility proto. For full capability use upstream d7y.io/api.

message UrlMeta {
string tag = 1;
}

message StatTaskRequest {
string url = 1; // d7y:/<cid>
UrlMeta url_meta = 2; // tag differentiates tasks
bool local_only = 3; // only check local cache
}

message ImportTaskRequest {
string type = 1; // Task type, "DfCache"
string url = 2; // d7y:/<cid>
string path = 3; // source file path
UrlMeta url_meta = 4; // tag
}

message ExportTaskRequest {
string url = 1; // d7y:/<cid>
string output = 2; // destination file path
uint64 timeout = 3; // seconds
double limit = 4; // rate limit (bytes per second) - optional
UrlMeta url_meta = 5; // tag
int64 uid = 6;
int64 gid = 7;
bool local_only = 8; // only export from local cache
}

message DeleteTaskRequest {
string url = 1; // d7y:/<cid>
UrlMeta url_meta = 2; // tag
}

message Empty {}

// Error reporting simplified: success = empty response, not found -> gRPC status NOT_FOUND

service Daemon {
rpc StatTask(StatTaskRequest) returns (Empty);
rpc ImportTask(ImportTaskRequest) returns (Empty);
rpc ExportTask(ExportTaskRequest) returns (Empty);
rpc DeleteTask(DeleteTaskRequest) returns (Empty);
// Health check: upstream exposes CheckHealth; we model it as StatTask on a reserved cid, or rely on channel connectivity.
}
Loading
Loading