|
14 | 14 |
|
15 | 15 | import importlib
|
16 | 16 | import torch
|
| 17 | +import numpy as np |
17 | 18 |
|
18 | 19 | import pytest
|
19 | 20 |
|
|
27 | 28 | Se2KeyboardCfg,
|
28 | 29 | Se2SpaceMouse,
|
29 | 30 | Se2SpaceMouseCfg,
|
| 31 | + Se2Phone, |
| 32 | + Se2PhoneCfg, |
| 33 | + Se3Phone, |
| 34 | + Se3PhoneCfg, |
30 | 35 | Se3Gamepad,
|
31 | 36 | Se3GamepadCfg,
|
32 | 37 | Se3Keyboard,
|
@@ -260,6 +265,68 @@ def test_se3spacemouse_constructors(mock_environment, mocker):
|
260 | 265 | assert isinstance(result, torch.Tensor)
|
261 | 266 | assert result.shape == (7,) # (pos_x, pos_y, pos_z, rot_x, rot_y, rot_z, gripper)
|
262 | 267 |
|
| 268 | +""" |
| 269 | +Test phone devices. |
| 270 | +""" |
| 271 | + |
| 272 | + |
| 273 | +def test_se2phone_constructor(mock_environment, mocker): |
| 274 | + """Test constructor and delta-output behavior for Se2Phone.""" |
| 275 | + |
| 276 | + # --- Fake Teleop that captures the callback and allows us to emit messages --- |
| 277 | + class _FakeTeleop: |
| 278 | + def __init__(self, **kwargs): |
| 279 | + self._cb = None |
| 280 | + |
| 281 | + def subscribe(self, cb): |
| 282 | + self._cb = cb |
| 283 | + |
| 284 | + def run(self): |
| 285 | + # No-op: don't start any network/server loop in tests |
| 286 | + return |
| 287 | + |
| 288 | + def emit(self, msg: dict): |
| 289 | + assert self._cb is not None, "Callback not registered" |
| 290 | + # The device ignores the pose argument; pass a dummy np.array |
| 291 | + self._cb(np.zeros(7), msg) |
| 292 | + |
| 293 | + # Import the device module and patch Teleop with our fake |
| 294 | + device_mod = importlib.import_module("isaaclab.devices.phone.se2_phone") |
| 295 | + mocker.patch.object(device_mod, "Teleop", _FakeTeleop) |
| 296 | + |
| 297 | + # Build config with custom sensitivities (we'll verify via output) |
| 298 | + cfg = Se2PhoneCfg(v_x_sensitivity=0.9, v_y_sensitivity=0.5, omega_z_sensitivity=1.2) |
| 299 | + |
| 300 | + # Create the device |
| 301 | + phone = Se2Phone(cfg) |
| 302 | + assert isinstance(phone, Se2Phone) |
| 303 | + |
| 304 | + # Grab the fake teleop instance to push messages |
| 305 | + fake = phone._teleop |
| 306 | + assert isinstance(fake, _FakeTeleop) |
| 307 | + |
| 308 | + # Helper: orientation kept constant so omega_z delta is zero (easier assertion) |
| 309 | + orient = {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0} |
| 310 | + |
| 311 | + # 1) First message initializes internal reference; advance() should return zeros. |
| 312 | + fake.emit({"move": True, "position": {"x": 0.0, "z": 0.0}, "orientation": orient}) |
| 313 | + out1 = phone.advance() |
| 314 | + assert isinstance(out1, torch.Tensor) |
| 315 | + assert out1.shape == (3,) |
| 316 | + assert torch.allclose(out1, torch.zeros(3, dtype=torch.float32, device=out1.device)) |
| 317 | + |
| 318 | + # 2) Second message changes position (x, z). Se2Phone maps latest_v_x=-z, latest_v_y=-x. |
| 319 | + # Deltas: dvx = -(2.0 - 0.0) = -2.0, dvy = -(1.0 - 0.0) = -1.0 |
| 320 | + fake.emit({"move": True, "position": {"x": 1.0, "z": 2.0}, "orientation": orient}) |
| 321 | + out2 = phone.advance() |
| 322 | + |
| 323 | + # Expected scaled command: |
| 324 | + # vx = dvx * 0.9 = -1.8 |
| 325 | + # vy = dvy * 0.5 = -0.5 |
| 326 | + # omega_z = 0.0 (orientation unchanged) |
| 327 | + expected = torch.tensor([-1.8, -0.5, 0.0], dtype=torch.float32, device=out2.device) |
| 328 | + assert out2.shape == (3,) |
| 329 | + assert torch.allclose(out2, expected, atol=1e-5) |
263 | 330 |
|
264 | 331 | """
|
265 | 332 | Test OpenXR devices.
|
@@ -320,6 +387,97 @@ def test_openxr_constructors(mock_environment, mocker):
|
320 | 387 | # Test reset functionality
|
321 | 388 | device.reset()
|
322 | 389 |
|
| 390 | +def test_se3phone_constructor(mocker): |
| 391 | + """Test constructor and delta-output behavior for Se3Phone.""" |
| 392 | + # --- Fake Teleop that captures the callback and lets us emit messages --- |
| 393 | + class _FakeTeleop: |
| 394 | + def __init__(self, **kwargs): |
| 395 | + self._cb = None |
| 396 | + |
| 397 | + def subscribe(self, cb): |
| 398 | + self._cb = cb |
| 399 | + |
| 400 | + def run(self): |
| 401 | + return # no thread loop in tests |
| 402 | + |
| 403 | + def emit(self, msg: dict): |
| 404 | + assert self._cb is not None, "Callback not registered" |
| 405 | + self._cb(np.zeros(7), msg) # pose is unused by device |
| 406 | + |
| 407 | + # Import the device module and patch Teleop with our fake |
| 408 | + device_mod = importlib.import_module("isaaclab.devices.phone.se3_phone") |
| 409 | + mocker.patch.object(device_mod, "Teleop", _FakeTeleop) |
| 410 | + |
| 411 | + # Build config with custom sensitivities to verify scaling |
| 412 | + cfg = Se3PhoneCfg(pos_sensitivity=0.5, rot_sensitivity=0.4, gripper_term=True) |
| 413 | + |
| 414 | + # Create the device |
| 415 | + phone = Se3Phone(cfg) |
| 416 | + assert isinstance(phone, Se3Phone) |
| 417 | + |
| 418 | + fake = phone._teleop |
| 419 | + assert isinstance(fake, _FakeTeleop) |
| 420 | + |
| 421 | + # Keep gripper explicit; start with OPEN |
| 422 | + # Keep orientation constant on first emit to initialize state |
| 423 | + orient0 = {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0} |
| 424 | + |
| 425 | + # 1) First message initializes internal reference; advance() -> zeros (with gripper) |
| 426 | + fake.emit({ |
| 427 | + "move": True, |
| 428 | + "position": {"x": 0.0, "y": 0.0, "z": 0.0}, |
| 429 | + "orientation": orient0, |
| 430 | + "gripper": "open", |
| 431 | + }) |
| 432 | + out1 = phone.advance() |
| 433 | + assert isinstance(out1, torch.Tensor) |
| 434 | + assert out1.shape == (7,) |
| 435 | + # first call yields zeros for deltas; gripper=+1 |
| 436 | + expected1 = torch.zeros(7, dtype=torch.float32, device=out1.device) |
| 437 | + expected1[6] = 1.0 |
| 438 | + assert torch.allclose(out1, expected1, atol=1e-6) |
| 439 | + |
| 440 | + # 2) Second message changes position & orientation. |
| 441 | + # Position mapping inside device: latest_pos = [-z, -x, y] |
| 442 | + # Use x=1, y=2, z=3 -> latest_pos = [-3, -1, 2], prev was [0,0,0] |
| 443 | + # Orientation: apply yaw rotation by theta about Z: |
| 444 | + # quat (w,x,y,z) = (cos(theta/2), 0, 0, sin(theta/2)) |
| 445 | + # axis_angle_from_quat -> [0,0,theta] |
| 446 | + # device remaps: rot[[0,1,2]] = rot[[2,0,1]] * [-1,-1,1] |
| 447 | + # so -> [theta, 0, 0] * [-1,-1,1] = [-theta, 0, 0] |
| 448 | + theta = 0.2 # radians |
| 449 | + w = float(np.cos(theta / 2.0)) |
| 450 | + z = float(np.sin(theta / 2.0)) |
| 451 | + orient1 = {"x": 0.0, "y": 0.0, "z": z, "w": w} |
| 452 | + |
| 453 | + fake.emit({ |
| 454 | + "move": True, |
| 455 | + "position": {"x": 1.0, "y": 2.0, "z": 3.0}, |
| 456 | + "orientation": orient1, |
| 457 | + "gripper": "open", |
| 458 | + }) |
| 459 | + out2 = phone.advance() |
| 460 | + |
| 461 | + # Expected scaled deltas |
| 462 | + dpos = torch.tensor([-3.0, -1.0, 2.0], dtype=torch.float32, device=out2.device) * 0.5 |
| 463 | + drot = torch.tensor([-theta, 0.0, 0.0], dtype=torch.float32, device=out2.device) * 0.4 |
| 464 | + expected2 = torch.cat([dpos, drot, torch.tensor([1.0], dtype=torch.float32, device=out2.device)], dim=0) |
| 465 | + |
| 466 | + assert out2.shape == (7,) |
| 467 | + assert torch.allclose(out2, expected2, atol=1e-5) |
| 468 | + |
| 469 | + # 3) Gate OFF should zero deltas and resync reference |
| 470 | + fake.emit({ |
| 471 | + "move": False, # gate off |
| 472 | + "position": {"x": 4.0, "y": 5.0, "z": 6.0}, |
| 473 | + "orientation": orient1, # unchanged quaternion |
| 474 | + "gripper": "close", |
| 475 | + }) |
| 476 | + out3 = phone.advance() |
| 477 | + # deltas should be zero; gripper should now be -1 |
| 478 | + expected3 = torch.zeros(7, dtype=torch.float32, device=out3.device) |
| 479 | + expected3[6] = -1.0 |
| 480 | + assert torch.allclose(out3, expected3, atol=1e-6) |
323 | 481 |
|
324 | 482 | """
|
325 | 483 | Test teleop device factory.
|
|
0 commit comments