Skip to content
Closed
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ dependencies = [
"pyserial>=3.5",
"wandb>=0.20.0",

"torch>=2.2.1,<2.8.0", # TODO: Bumb dependency
"torch>=2.2.1,<=2.8.0", # TODO: Bumb dependency
"torchcodec>=0.2.1,<0.6.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # TODO: Bumb dependency
"torchvision>=0.21.0,<0.23.0", # TODO: Bumb dependency
"torchvision>=0.21.0,<=0.23.0", # TODO: Bumb dependency

"draccus==0.10.0", # TODO: Remove ==
"gymnasium>=0.29.1,<1.0.0", # TODO: Bumb dependency
Expand Down
61 changes: 61 additions & 0 deletions src/lerobot/async_inference/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,64 @@ def to_dict(self) -> dict:
"debug_visualize_queue_size": self.debug_visualize_queue_size,
"aggregate_fn_name": self.aggregate_fn_name,
}


@dataclass
class RobotOpenpiClientConfig:
"""Configuration for RobotClient.

This class defines all configurable parameters for the RobotClient,
including network connection, policy settings, and control behavior.
"""
# Robot configuration (for CLI usage - robot instance will be created from this)
robot: RobotConfig = field(metadata={"help": "Robot configuration"})

# Policies typically output K actions at max, but we can use less to avoid wasting bandwidth (as actions
# would be aggregated on the client side anyway, depending on the value of `chunk_size_threshold`)
actions_per_chunk: int = field(metadata={"help": "Number of actions per chunk"})

# Task instruction for the robot to execute (e.g., 'fold my tshirt')
task: str = field(default="", metadata={"help": "Task instruction for the robot to execute"})

# Network configuration
server_address: str = field(default="localhost:8080", metadata={"help": "Server address to connect to"})

# Control behavior configuration
fps: int = field(default=DEFAULT_FPS, metadata={"help": "Frames per second"})

# Device configuration
device: str = field(default="cpu", metadata={"help": "Device for policy inference"})



@property
def environment_dt(self) -> float:
"""Environment time step, in seconds"""
return 1 / self.fps

def __post_init__(self):
"""Validate configuration after initialization."""
if not self.server_address:
raise ValueError("server_address cannot be empty")

if self.fps <= 0:
raise ValueError(f"fps must be positive, got {self.fps}")

if self.actions_per_chunk <= 0:
raise ValueError(f"actions_per_chunk must be positive, got {self.actions_per_chunk}")


@classmethod
def from_dict(cls, config_dict: dict) -> "RobotClientConfig":
"""Create a RobotClientConfig from a dictionary."""
return cls(**config_dict)

def to_dict(self) -> dict:
"""Convert the configuration to a dictionary."""
return {
"server_address": self.server_address,
"fps": self.fps,
"actions_per_chunk": self.actions_per_chunk,
"task": self.task,
"device": self.device,
}
2 changes: 1 addition & 1 deletion src/lerobot/async_inference/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "pi0", "tdmpc", "vqbet"]

# TODO: Add all other robots
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower"]
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_koch_follower", "koch_follower"]
1 change: 1 addition & 0 deletions src/lerobot/async_inference/robot_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_koch_follower,
koch_follower,
make_robot_from_config,
so100_follower,
Expand Down
209 changes: 209 additions & 0 deletions src/lerobot/async_inference/robot_openpi_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Example command:
```shell
python src/lerobot/scripts/server/robot_openpi_client.py \
--robot.type=so100_follower \
--robot.port=/dev/tty.usbmodem58760431541 \
--robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}" \
--robot.id=black \
--task="dummy" \
--server_address=127.0.0.1:8080 \
--debug_visualize_queue_size=True
```
"""

from lerobot.scripts.server.configs import RobotOpenpiClientConfig
from openpi_client import image_tools
from openpi_client import websocket_client_policy
import numpy as np
import logging
import pickle # nosec
import threading
import time
from collections.abc import Callable
from dataclasses import asdict
from pprint import pformat
from queue import Queue
from typing import Any

import draccus
import grpc
import torch

from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs.policies import PreTrainedConfig
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_koch_follower,
koch_follower,
make_robot_from_config,
so100_follower,
so101_follower,
)
from lerobot.scripts.server.constants import SUPPORTED_ROBOTS
from lerobot.scripts.server.helpers import (
Action,
FPSTracker,
Observation,
RawObservation,
RemotePolicyConfig,
TimedObservation,
get_logger,
map_robot_keys_to_lerobot_features,
validate_robot_cameras_for_policy,
visualize_action_queue_size,
raw_observation_to_observation,
)
from lerobot.transport import (
services_pb2, # type: ignore
services_pb2_grpc, # type: ignore
)
from lerobot.transport.utils import grpc_channel_options, send_bytes_in_chunks
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.configs.types import PolicyFeature, FeatureType


class RobotOpenpiClient:
prefix = "robot_openpi_client"
logger = get_logger(prefix)

def __init__(self, config: RobotOpenpiClientConfig):
"""Initialize RobotOpenpiClient with unified configuration.

Args:
config: RobotOpenpiClientConfig containing all configuration parameters
"""
# Store configuration
self.config = config
self.robot = make_robot_from_config(config.robot)
self.robot.connect()
_init_rerun(session_name="openpi_client")

self.lerobot_features = map_robot_keys_to_lerobot_features(self.robot)
# TODO: this needs to be consistent with the policy config
self.policy_image_features = {
"observation.images.top": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
"observation.images.front": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}

self.server_address = config.server_address
self.host = self.server_address.split(":")[0]
self.port = self.server_address.split(":")[1]

# FPS measurement
self.fps_tracker = FPSTracker(target_fps=self.config.fps)
self.client = websocket_client_policy.WebsocketClientPolicy(host=self.host, port=self.port)

@property
def running(self):
return True

def stop(self):
"""Stop the robot client"""
self.robot.disconnect()
self.logger.debug("Robot disconnected")

def _action_tensor_to_action_dict(self, action_tensor: torch.Tensor) -> dict[str, float]:
action = {key: action_tensor[i].item() for i, key in enumerate(self.robot.action_features)}
return action

def control_loop(self, task: str, verbose: bool = False) -> tuple[Observation, Action]:
"""Combined function for executing actions and streaming observations"""
# Wait at barrier for synchronized start
self.logger.info("Control loop thread starting")

_performed_action = None
_captured_observation = None

timestep_count = 0
while self.running:
control_loop_start = time.perf_counter()
"""Control loop: (1) Performing actions, when available"""

"""Control loop: (2) Streaming observations to the remote policy server"""
start_time = time.perf_counter()
raw_observation: RawObservation = self.robot.get_observation()
observation: Observation = raw_observation_to_observation(
raw_observation,
self.lerobot_features,
self.policy_image_features,
self.config.device,
)
# Convert to numpy. This prompt hack is annoying.
observation = {k: v.numpy().squeeze(0) for k, v in observation.items()} # Remove the batch dimension for openpi policies
observation["prompt"] = task

timed_observation = TimedObservation(
timestamp=time.time(), # need time.time() to compare timestamps across client and server
observation=observation,
timestep=timestep_count,
)
timestep_count += 1

obs_capture_time = time.perf_counter()
action_chunk = self.client.infer(timed_observation.get_observation())["actions"]
self.logger.info(f"Inference time (ms): {(time.perf_counter() - obs_capture_time) * 1000:.2f}")

if self.config.actions_per_chunk > action_chunk.shape[0]:
self.logger.warning(f"Actions per chunk is greater than the number of actions in the chunk: {self.config.actions_per_chunk} > {action_chunk.shape[0]}")
action_chunk = action_chunk[:self.config.actions_per_chunk]

for action in action_chunk:
_performed_action = self.robot.send_action(self._action_tensor_to_action_dict(action))
log_rerun_data(raw_observation, _performed_action)
time.sleep(self.config.environment_dt)

if verbose:
# Calculate comprehensive FPS metrics
fps_metrics = self.fps_tracker.calculate_fps_metrics(timed_observation.get_timestamp())

self.logger.info(
f"Obs #{timed_observation.get_timestep()} | "
f"Avg FPS: {fps_metrics['avg_fps']:.2f} | "
f"Target: {fps_metrics['target_fps']:.2f}"
)

self.logger.info(
f"Ts={observation.get_timestamp():.6f} | Capturing observation took {(obs_capture_time - start_time):.6f}s"
)

# Dynamically adjust sleep time to maintain the desired control frequency
time.sleep(max(0, self.config.environment_dt - (time.perf_counter() - control_loop_start)))

return _captured_observation, _performed_action


@draccus.wrap()
def synchronous_client(cfg: RobotOpenpiClientConfig):
logging.info(pformat(asdict(cfg)))

if cfg.robot.type not in SUPPORTED_ROBOTS:
raise ValueError(f"Robot {cfg.robot.type} not yet supported!")

client = RobotOpenpiClient(cfg)
try:
client.control_loop(task=cfg.task)
finally:
client.stop()
client.logger.info("Client stopped")


if __name__ == "__main__":
synchronous_client() # run the client
18 changes: 18 additions & 0 deletions src/lerobot/robots/bi_koch_follower/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .bi_koch_follower import BiKochFollower
from .config_bi_koch_follower import BiKochFollowerConfig
Loading