reachy2_sdk.reachy_sdk
ReachySDK package.
This package provides remote access (via socket) to a Reachy robot. It automatically handles the synchronization with the robot. In particular, you can easily get an always up-to-date robot state (joint positions, sensors value). You can also send joint commands, compute forward or inverse kinematics.
1"""ReachySDK package. 2 3This package provides remote access (via socket) to a Reachy robot. 4It automatically handles the synchronization with the robot. 5In particular, you can easily get an always up-to-date robot state (joint positions, sensors value). 6You can also send joint commands, compute forward or inverse kinematics. 7""" 8 9# from reachy2_sdk_api.dynamixel_motor_pb2_grpc import DynamixelMotorServiceStub 10# from .dynamixel_motor import DynamixelMotor 11 12from __future__ import annotations 13 14import threading 15import time 16from collections import namedtuple 17from logging import getLogger 18from typing import Any, Dict, Optional, Type 19 20import grpc 21from google.protobuf.empty_pb2 import Empty 22from google.protobuf.timestamp_pb2 import Timestamp 23from grpc._channel import _InactiveRpcError 24from reachy2_sdk_api import reachy_pb2, reachy_pb2_grpc 25from reachy2_sdk_api.arm_pb2 import ArmComponentsCommands 26from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToGoalStatus, GoToId 27from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub 28from reachy2_sdk_api.hand_pb2 import HandPositionRequest 29from reachy2_sdk_api.head_pb2 import HeadComponentsCommands 30from reachy2_sdk_api.reachy_pb2 import ReachyComponentsCommands, ReachyState 31 32from .config.reachy_info import ReachyInfo 33from .media.audio import Audio 34from .media.camera_manager import CameraManager 35from .orbita.orbita2d import Orbita2d 36from .orbita.orbita3d import Orbita3d 37from .orbita.orbita_joint import OrbitaJoint 38from .parts.arm import Arm 39from .parts.hand import Hand 40from .parts.head import Head 41from .parts.joints_based_part import JointsBasedPart 42from .parts.mobile_base import MobileBase 43from .parts.tripod import Tripod 44from .utils.custom_dict import CustomDict 45from .utils.goto_based_element import process_goto_request 46from .utils.utils import SimplifiedRequest 47 48GoToHomeId = namedtuple("GoToHomeId", ["head", "r_arm", "l_arm"]) 49"""Named tuple for easy access to goto request on full body""" 50 51 52class ReachySDK: 53 """The ReachySDK class manages the connection and interaction with a Reachy robot. 54 55 This class handles: 56 - Establishing and maintaining a connection with the robot via gRPC. 57 - Accessing and controlling various parts of the robot, such as the arms, head, and mobile base. 58 - Managing robot components including actuators, joints, cameras, and audio. 59 - Synchronizing robot state with the server in the background to keep data up-to-date. 60 - Providing utility functions for common tasks such as turning on/off motors, sending goal positions, 61 and performing movements. 62 """ 63 64 _instances_by_host: Dict[str, "ReachySDK"] = {} 65 66 def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: 67 """Ensure only one connected instance per IP is created.""" 68 if host in cls._instances_by_host: 69 if cls._instances_by_host[host]._grpc_connected: 70 return cls._instances_by_host[host] 71 else: 72 del cls._instances_by_host[host] 73 74 instance = super().__new__(cls) 75 cls._instances_by_host[host] = instance 76 return instance 77 78 def __init__( 79 self, 80 host: str, 81 sdk_port: int = 50051, 82 audio_port: int = 50063, 83 video_port: int = 50065, 84 ) -> None: 85 """Initialize a connection to the robot. 86 87 Args: 88 host: The IP address or hostname of the robot. 89 sdk_port: The gRPC port for the SDK. Default is 50051. 90 audio_port: The gRPC port for audio services. Default is 50063. 91 video_port: The gRPC port for video services. Default is 50065. 92 """ 93 self._logger = getLogger(__name__) 94 95 if hasattr(self, "_initialized"): 96 self._logger.warning("An instance already exists.") 97 return 98 99 self._host = host 100 self._sdk_port = sdk_port 101 self._audio_port = audio_port 102 self._video_port = video_port 103 104 self._grpc_connected = False 105 self._initialized = True 106 107 self._r_arm: Optional[Arm] = None 108 self._l_arm: Optional[Arm] = None 109 self._head: Optional[Head] = None 110 self._cameras: Optional[CameraManager] = None 111 self._mobile_base: Optional[MobileBase] = None 112 self._info: Optional[ReachyInfo] = None 113 self._tripod: Optional[Tripod] = None 114 115 self._update_timestamp: Timestamp = Timestamp(seconds=0) 116 117 self.connect() 118 119 def connect(self) -> None: 120 """Connects the SDK to the robot.""" 121 if self._grpc_connected: 122 self._logger.warning("Already connected to Reachy.") 123 return 124 125 self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}") 126 127 self._stop_flag = threading.Event() 128 129 try: 130 self._get_info() 131 except ConnectionError: 132 self._logger.error( 133 f"Could not connect to Reachy with on IP address {self._host}, " 134 "check that the sdk server is running and that the IP is correct." 135 ) 136 self._grpc_connected = False 137 return 138 139 self._setup_parts() 140 self._setup_audio() 141 self._cameras = self._setup_video() 142 143 self._sync_thread = threading.Thread(target=self._start_sync_in_bg) 144 self._sync_thread.daemon = True 145 self._sync_thread.start() 146 147 self._audit_thread = threading.Thread(target=self._audit) 148 self._audit_thread.daemon = True 149 self._audit_thread.start() 150 151 self._grpc_connected = True 152 self._logger.info("Connected to Reachy.") 153 154 def disconnect(self, lost_connection: bool = False) -> None: 155 """Disconnect the SDK from the robot's server. 156 157 Args: 158 lost_connection: If `True`, indicates that the connection was lost unexpectedly. 159 """ 160 if self._host in self._instances_by_host: 161 del self._instances_by_host[self._host] 162 163 if not self._grpc_connected: 164 self._logger.warning("Already disconnected from Reachy.") 165 return 166 167 self._grpc_connected = False 168 self._grpc_channel.close() 169 self._grpc_channel = None 170 171 self._head = None 172 self._r_arm = None 173 self._l_arm = None 174 self._mobile_base = None 175 176 self._logger.info("Disconnected from Reachy.") 177 178 def __repr__(self) -> str: 179 """Clean representation of a Reachy.""" 180 if not self._grpc_connected or self.info is None: 181 return "Reachy is not connected" 182 183 s = "\n\t".join([part_name + ": " + str(part) for part_name, part in self.info._enabled_parts.items()]) 184 repr_template = ( 185 '<Reachy host="{host}" connected={connected} on={on} \n' 186 " battery_voltage={battery_voltage} \n" 187 " parts=\n\t{parts} \n>" 188 ) 189 return repr_template.format( 190 host=self._host, 191 connected=self._grpc_connected, 192 on=self.is_on(), 193 battery_voltage=self.info.battery_voltage, 194 parts=s, 195 ) 196 197 @property 198 def info(self) -> Optional[ReachyInfo]: 199 """Get ReachyInfo if connected.""" 200 if not self._grpc_connected: 201 self._logger.error("Cannot get info, not connected to Reachy") 202 return None 203 return self._info 204 205 @property 206 def head(self) -> Optional[Head]: 207 """Get Reachy's head.""" 208 if not self._grpc_connected: 209 self._logger.error("Cannot get head, not connected to Reachy") 210 return None 211 if self._head is None: 212 self._logger.error("head does not exist with this configuration") 213 return None 214 return self._head 215 216 @property 217 def r_arm(self) -> Optional[Arm]: 218 """Get Reachy's right arm.""" 219 if not self._grpc_connected: 220 self._logger.error("Cannot get r_arm, not connected to Reachy") 221 return None 222 if self._r_arm is None: 223 self._logger.error("r_arm does not exist with this configuration") 224 return None 225 return self._r_arm 226 227 @property 228 def l_arm(self) -> Optional[Arm]: 229 """Get Reachy's left arm.""" 230 if not self._grpc_connected: 231 self._logger.error("Cannot get l_arm, not connected to Reachy") 232 return None 233 if self._l_arm is None: 234 self._logger.error("l_arm does not exist with this configuration") 235 return None 236 return self._l_arm 237 238 @property 239 def mobile_base(self) -> Optional[MobileBase]: 240 """Get Reachy's mobile base.""" 241 if not self._grpc_connected: 242 self._logger.error("Cannot get mobile_base, not connected to Reachy") 243 return None 244 if self._mobile_base is None: 245 self._logger.error("mobile_base does not exist with this configuration") 246 return None 247 return self._mobile_base 248 249 @property 250 def tripod(self) -> Optional[Tripod]: 251 """Get Reachy's fixed tripod.""" 252 if not self._grpc_connected: 253 self._logger.error("Cannot get tripod, not connected to Reachy") 254 return None 255 if self._tripod is None: 256 self._logger.error("tripod does not exist with this configuration") 257 return None 258 return self._tripod 259 260 @property 261 def joints(self) -> CustomDict[str, OrbitaJoint]: 262 """Return a dictionary of all joints of the robot. 263 264 The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects. 265 """ 266 if not self._grpc_connected or not self.info: 267 self._logger.warning("Cannot get joints, not connected to Reachy.") 268 return CustomDict({}) 269 _joints: CustomDict[str, OrbitaJoint] = CustomDict({}) 270 for part_name in self.info._enabled_parts: 271 part = getattr(self, part_name) 272 for joint_name, joint in part.joints.items(): 273 _joints[part_name + "." + joint_name] = joint 274 return _joints 275 276 @property 277 def _actuators(self) -> Dict[str, Orbita2d | Orbita3d]: 278 """Return a dictionary of all actuators of the robot. 279 280 The dictionary keys are the actuator names, and the values are the corresponding actuator objects. 281 """ 282 if not self._grpc_connected or not self.info: 283 self._logger.warning("Cannot get actuators, not connected to Reachy.") 284 return {} 285 _actuators: Dict[str, Orbita2d | Orbita3d] = {} 286 for part_name in self.info._enabled_parts: 287 part = getattr(self, part_name) 288 for actuator_name, actuator in part._actuators.items(): 289 _actuators[part_name + "." + actuator_name] = actuator 290 return _actuators 291 292 def is_connected(self) -> bool: 293 """Check if the SDK is connected to the robot. 294 295 Returns: 296 `True` if connected, `False` otherwise. 297 """ 298 return self._grpc_connected 299 300 @property 301 def _grpc_status(self) -> str: 302 """Get the current connection status of the robot's gRPC server. 303 304 Returns: 305 "connected" if connected, "disconnected" otherwise. 306 """ 307 if self._grpc_connected: 308 return "connected" 309 else: 310 return "disconnected" 311 312 @_grpc_status.setter 313 def _grpc_status(self, status: str) -> None: 314 """Set the connection status of the robot's gRPC server. 315 316 Args: 317 status: The connection status to set, must be either "connected" or "disconnected". 318 319 Raises: 320 ValueError: If the status is not "connected" or "disconnected". 321 """ 322 if status == "connected": 323 self._grpc_connected = True 324 elif status == "disconnected": 325 self._grpc_connected = False 326 else: 327 raise ValueError("_grpc_status can only be set to 'connected' or 'disconnected'") 328 329 @property 330 def cameras(self) -> Optional[CameraManager]: 331 """Get the camera manager if available and connected.""" 332 if not self._grpc_connected: 333 self._logger.error("Cannot get cameras, not connected to Reachy") 334 return None 335 return self._cameras 336 337 def _get_info(self) -> None: 338 """Retrieve basic information about the robot. 339 340 Gathers data on the robot's parts, hardware and software versions, and serial number. 341 """ 342 self._stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel) 343 try: 344 self._robot = self._stub.GetReachy(Empty()) 345 except _InactiveRpcError: 346 raise ConnectionError() 347 348 self._info = ReachyInfo(self._robot) 349 self._grpc_connected = True 350 351 def _setup_audio(self) -> None: 352 """Initializes the audio grpc client.""" 353 self.audio = Audio(self._host, self._audio_port) 354 355 def _setup_video(self) -> Optional[CameraManager]: 356 """Set up the video server for the robot. 357 358 Returns: 359 A CameraManager instance if the video server connection is successful, otherwise None. 360 """ 361 try: 362 return CameraManager(self._host, self._video_port) 363 364 except Exception as e: 365 self._logger.error(f"Failed to connect to video server with error: {e}.\nReachySDK.video will not be available.") 366 return None 367 368 def _setup_part_r_arm(self, initial_state: ReachyState) -> None: 369 """Set up the robot's right arm based on the initial state.""" 370 if not self.info: 371 self._logger.warning("Reachy is not connected") 372 return None 373 374 if self._robot.HasField("r_arm"): 375 if initial_state.r_arm_state.activated: 376 r_arm = Arm(self._robot.r_arm, initial_state.r_arm_state, self._grpc_channel, self._goto_stub) 377 self._r_arm = r_arm 378 self.info._enabled_parts["r_arm"] = self._r_arm 379 if self._robot.HasField("r_hand"): 380 self._r_arm._init_hand(self._robot.r_hand, initial_state.r_hand_state) 381 else: 382 self.info._disabled_parts.append("r_arm") 383 384 def _setup_part_l_arm(self, initial_state: ReachyState) -> None: 385 """Set up the robot's left arm based on the initial state.""" 386 if not self.info: 387 self._logger.warning("Reachy is not connected") 388 return None 389 390 if self._robot.HasField("l_arm"): 391 if initial_state.l_arm_state.activated: 392 l_arm = Arm(self._robot.l_arm, initial_state.l_arm_state, self._grpc_channel, self._goto_stub) 393 self._l_arm = l_arm 394 self.info._enabled_parts["l_arm"] = self._l_arm 395 if self._robot.HasField("l_hand"): 396 self._l_arm._init_hand(self._robot.l_hand, initial_state.l_hand_state) 397 else: 398 self.info._disabled_parts.append("l_arm") 399 400 def _setup_part_mobile_base(self, initial_state: ReachyState) -> None: 401 """Set up the robot's mobile base based on the initial state.""" 402 if not self.info: 403 self._logger.warning("Reachy is not connected") 404 return None 405 406 if self._robot.HasField("mobile_base"): 407 self._mobile_base = MobileBase( 408 self._robot.mobile_base, initial_state.mobile_base_state, self._grpc_channel, self._goto_stub 409 ) 410 self.info._set_mobile_base(self._mobile_base) 411 412 def _setup_part_head(self, initial_state: ReachyState) -> None: 413 """Set up the robot's head based on the initial state.""" 414 if not self.info: 415 self._logger.warning("Reachy is not connected") 416 return None 417 418 if self._robot.HasField("head"): 419 if initial_state.head_state.activated: 420 head = Head(self._robot.head, initial_state.head_state, self._grpc_channel, self._goto_stub) 421 self._head = head 422 self.info._enabled_parts["head"] = self._head 423 else: 424 self.info._disabled_parts.append("head") 425 426 def _setup_part_tripod(self, initial_state: ReachyState) -> None: 427 """Set up the robot's tripod based on the initial state.""" 428 if not self.info: 429 self._logger.warning("Reachy is not connected") 430 return None 431 432 if self._robot.HasField("tripod"): 433 tripod = Tripod(self._robot.tripod, initial_state.tripod_state, self._grpc_channel) 434 self._tripod = tripod 435 436 def _setup_parts(self) -> None: 437 """Initialize all parts of the robot. 438 439 Retrieves the state of each part, creates instances, and adds them to the ReachySDK instance. 440 """ 441 setup_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel) 442 self._goto_stub = GoToServiceStub(self._grpc_channel) 443 initial_state = setup_stub.GetReachyState(self._robot.id) 444 445 self._setup_part_r_arm(initial_state) 446 self._setup_part_l_arm(initial_state) 447 self._setup_part_head(initial_state) 448 self._setup_part_mobile_base(initial_state) 449 self._setup_part_tripod(initial_state) 450 451 def get_update_timestamp(self) -> int: 452 """Returns the timestamp (ns) of the last update. 453 454 The timestamp is generated by ROS running on Reachy. 455 456 Returns: 457 timestamp (int) in nanoseconds. 458 """ 459 return self._update_timestamp.ToNanoseconds() 460 461 def _start_sync_in_bg(self) -> None: 462 """Start background synchronization with the robot.""" 463 reachy_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel) 464 self._get_stream_update_loop(reachy_stub, freq=100) 465 466 def _get_stream_update_loop(self, reachy_stub: reachy_pb2_grpc.ReachyServiceStub, freq: float) -> None: 467 """Update the robot's state at a specified frequency. 468 469 Args: 470 reachy_stub: The gRPC stub for communication with the robot. 471 freq: The frequency (in Hz) at which to update the robot's state. 472 """ 473 stream_req = reachy_pb2.ReachyStreamStateRequest(id=self._robot.id, publish_frequency=freq) 474 try: 475 for state_update in reachy_stub.StreamReachyState(stream_req): 476 self._update_timestamp = state_update.timestamp 477 478 self._update_part(self._l_arm, state_update.l_arm_state) 479 self._update_part(self._r_arm, state_update.r_arm_state) 480 self._update_part(self._head, state_update.head_state) 481 self._update_part(self._mobile_base, state_update.mobile_base_state) 482 self._update_part(self._tripod, state_update.tripod_state) 483 484 if self._l_arm and self._l_arm.gripper: 485 self._l_arm.gripper._update_with(state_update.l_hand_state) 486 if self._r_arm and self._r_arm.gripper: 487 self._r_arm.gripper._update_with(state_update.r_hand_state) 488 489 except grpc.RpcError as e: 490 if e.code() == grpc.StatusCode.CANCELLED: 491 self._logger.warning("Reachy gRPC stream is shutting down.") 492 else: 493 self._grpc_connected = False 494 raise ConnectionError(f"Connection with Reachy ip:{self._host} lost, check the SDK server status.") 495 496 def _update_part(self, part: Optional[Any], state: Any) -> None: 497 """Helper function to update a robot part if it exists.""" 498 if part is not None: 499 part._update_with(state) 500 501 def _audit(self) -> None: 502 """Periodically perform an audit of the robot's components.""" 503 while self._grpc_connected: 504 audit_status = self._stub.Audit(self._robot.id) 505 if self._l_arm is not None and audit_status.HasField("l_arm_status"): 506 self._l_arm._update_audit_status(audit_status.l_arm_status) 507 if self._l_arm.gripper is not None and audit_status.HasField("l_hand_status"): 508 self._l_arm.gripper._update_audit_status(audit_status.l_hand_status) 509 if self._r_arm is not None and audit_status.HasField("r_arm_status"): 510 self._r_arm._update_audit_status(audit_status.r_arm_status) 511 if self._r_arm.gripper is not None and audit_status.HasField("r_hand_status"): 512 self._r_arm.gripper._update_audit_status(audit_status.r_hand_status) 513 if self._head is not None and audit_status.HasField("head_status"): 514 self._head._update_audit_status(audit_status.head_status) 515 if self._mobile_base is not None and audit_status.HasField("mobile_base_status"): 516 self._mobile_base._update_audit_status(audit_status.mobile_base_status) 517 time.sleep(1) 518 519 @property 520 def audit(self) -> Dict[str, Dict[str, str]]: 521 """Return the audit status of all enabled parts of the robot.""" 522 audit_dict: Dict[str, Dict[str, str]] = {} 523 if not self._grpc_connected or not self.info: 524 self._logger.warning("Reachy is not connected!") 525 if self.info is not None: 526 for part in self.info._enabled_parts.values(): 527 audit_dict[part._part_id.name] = part.audit 528 return audit_dict 529 530 def turn_on(self) -> bool: 531 """Activate all motors of the robot's parts if all of them are not already turned on. 532 533 Returns: 534 `True` if successful, `False` otherwise. 535 """ 536 if not self._grpc_connected or not self.info: 537 self._logger.warning("Cannot turn on Reachy, not connected.") 538 return False 539 540 speed_limit_high = 25 541 max_iterations = 10 542 ite = 0 543 544 while not self._is_fully_on() and ite < max_iterations: 545 for part in self.info._enabled_parts.values(): 546 part.set_speed_limits(1) 547 time.sleep(0.05) 548 for part in self.info._enabled_parts.values(): 549 part._turn_on() 550 if self._mobile_base is not None: 551 self._mobile_base._turn_on() 552 time.sleep(0.05) 553 for part in self.info._enabled_parts.values(): 554 part.set_speed_limits(speed_limit_high) 555 time.sleep(0.4) 556 ite += 1 557 558 if ite == max_iterations: 559 self._logger.warning("Failed to turn on Reachy,") 560 return False 561 562 return True 563 564 def turn_off(self) -> bool: 565 """Turn all motors of enabled parts off. 566 567 All enabled parts' motors will then be compliant. 568 """ 569 if not self._grpc_connected or not self.info: 570 self._logger.warning("Cannot turn off Reachy, not connected.") 571 return False 572 for part in self.info._enabled_parts.values(): 573 part._turn_off() 574 if self._mobile_base is not None: 575 self._mobile_base._turn_off() 576 time.sleep(0.5) 577 578 return True 579 580 def turn_off_smoothly(self) -> bool: 581 """Turn all motors of robot parts off. 582 583 Arm torques are reduced during 3 seconds, then all parts' motors will be compliant. 584 """ 585 if not self._grpc_connected or not self.info: 586 self._logger.warning("Cannot turn off Reachy, not connected.") 587 return False 588 speed_limit_high = 25 589 # Enough to sustain the arm weight 590 torque_limit_low = 50 591 torque_limit_high = 100 592 duration = 3 593 arms_list = [] 594 595 if hasattr(self, "_mobile_base") and self._mobile_base is not None: 596 self._mobile_base._turn_off() 597 for part in self.info._enabled_parts.values(): 598 if "arm" in part._part_id.name: 599 part.set_torque_limits(torque_limit_low) 600 part.set_speed_limits(speed_limit_high) 601 part.goto_posture(duration=duration, wait_for_goto_end=False) 602 arms_list.append(part) 603 else: 604 part._turn_off() 605 606 countingTime = 0 607 while countingTime < duration: 608 time.sleep(1) 609 torque_limit_low -= 15 610 for arm_part in arms_list: 611 arm_part.set_torque_limits(torque_limit_low) 612 countingTime += 1 613 614 for arm_part in arms_list: 615 arm_part._turn_off() 616 arm_part.set_torque_limits(torque_limit_high) 617 618 time.sleep(0.5) 619 return True 620 621 def is_on(self) -> bool: 622 """Check if all actuators of Reachy parts are on (stiff). 623 624 Returns: 625 `True` if all are stiff, `False` otherwise. 626 """ 627 if not self.info: 628 self._logger.warning("Reachy is not connected!") 629 return False 630 631 for part in self.info._enabled_parts.values(): 632 if not part.is_on(): 633 return False 634 if self._mobile_base is not None and self._mobile_base.is_off(): 635 return False 636 return True 637 638 def is_off(self) -> bool: 639 """Check if all actuators of Reachy parts are off (compliant). 640 641 Returns: 642 `True` if all are compliant, `False` otherwise. 643 """ 644 if not self.info: 645 self._logger.warning("Reachy is not connected!") 646 return True 647 648 for part in self.info._enabled_parts.values(): 649 if part.is_on(): 650 return False 651 if self._mobile_base is not None and self._mobile_base.is_on(): 652 return False 653 return True 654 655 def _is_fully_on(self) -> bool: 656 """Check if the robot and its grippers (if they exist) are turned on.""" 657 return self.is_on() and all(arm.gripper.is_on() if arm and arm.gripper else True for arm in [self._l_arm, self._r_arm]) 658 659 def reset_default_limits(self) -> None: 660 """Set back speed and torque limits of all parts to maximum value (100).""" 661 if not self.info: 662 self._logger.warning("Reachy is not connected!") 663 return 664 665 for part in self.info._enabled_parts.values(): 666 if issubclass(type(part), JointsBasedPart): 667 part.set_speed_limits(100) 668 part.set_torque_limits(100) 669 time.sleep(0.5) 670 671 def goto_posture( 672 self, 673 common_posture: str = "default", 674 duration: float = 2, 675 wait: bool = False, 676 wait_for_goto_end: bool = True, 677 interpolation_mode: str = "minimum_jerk", 678 open_gripper: bool = False, 679 ) -> GoToHomeId: 680 """Move the robot to a predefined posture. 681 682 Args: 683 common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'. 684 duration: The time duration in seconds for the robot to move to the specified posture. 685 Defaults to 2. 686 wait: Determines whether the program should wait for the movement to finish before 687 returning. If set to `True`, the program waits for the movement to complete before continuing 688 execution. Defaults to `False`. 689 wait_for_goto_end: Specifies whether commands will be sent to a part immediately or 690 only after all previous commands in the queue have been executed. If set to `False`, the program 691 will cancel all executing moves and queues. Defaults to `True`. 692 interpolation_mode: The type of interpolation used when moving the arm's joints. 693 Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'. 694 open_gripper: If `True`, the gripper will open, if `False`, it stays in its current position. 695 Defaults to `False`. 696 697 Returns: 698 A GoToHomeId containing movement GoToIds for each part. 699 """ 700 if common_posture not in ["default", "elbow_90"]: 701 raise ValueError(f"common_posture {common_posture} not supported! Should be 'default' or 'elbow_90'") 702 head_id = None 703 r_arm_id = None 704 l_arm_id = None 705 if not wait_for_goto_end: 706 self.cancel_all_goto() 707 if self.head is not None: 708 is_last_commmand = self.r_arm is None and self.l_arm is None 709 wait_head = wait and is_last_commmand 710 head_id = self.head.goto_posture( 711 duration=duration, wait=wait_head, wait_for_goto_end=wait_for_goto_end, interpolation_mode=interpolation_mode 712 ) 713 if self.r_arm is not None: 714 is_last_commmand = self.l_arm is None 715 wait_r_arm = wait and is_last_commmand 716 r_arm_id = self.r_arm.goto_posture( 717 common_posture, 718 duration=duration, 719 wait=wait_r_arm, 720 wait_for_goto_end=wait_for_goto_end, 721 interpolation_mode=interpolation_mode, 722 open_gripper=open_gripper, 723 ) 724 if self.l_arm is not None: 725 l_arm_id = self.l_arm.goto_posture( 726 common_posture, 727 duration=duration, 728 wait=wait, 729 wait_for_goto_end=wait_for_goto_end, 730 interpolation_mode=interpolation_mode, 731 open_gripper=open_gripper, 732 ) 733 ids = GoToHomeId( 734 head=head_id, 735 r_arm=r_arm_id, 736 l_arm=l_arm_id, 737 ) 738 return ids 739 740 def is_goto_finished(self, goto_id: GoToId) -> bool: 741 """Check if a goto command has completed. 742 743 Args: 744 goto_id: The unique GoToId of the goto command. 745 746 Returns: 747 `True` if the command is complete, `False` otherwise. 748 """ 749 if not self._grpc_connected: 750 self._logger.warning("Reachy is not connected!") 751 return False 752 if not isinstance(goto_id, GoToId): 753 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 754 if goto_id.id == -1: 755 self._logger.error("is_goto_finished() asked for unvalid movement. Goto not played.") 756 return True 757 state = self._get_goto_state(goto_id) 758 result = bool( 759 state.goal_status == GoalStatus.STATUS_ABORTED 760 or state.goal_status == GoalStatus.STATUS_CANCELED 761 or state.goal_status == GoalStatus.STATUS_SUCCEEDED 762 ) 763 return result 764 765 def get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]: 766 """Retrieve the details of a goto command based on its GoToId. 767 768 Args: 769 goto_id: The ID of the goto command for which details are requested. 770 771 Returns: 772 A `SimplifiedRequest` object containing the part name, joint goal positions 773 (in degrees), movement duration, and interpolation mode. 774 Returns `None` if the robot is not connected or if the `goto_id` is invalid. 775 776 Raises: 777 TypeError: If `goto_id` is not an instance of `GoToId`. 778 ValueError: If `goto_id` is -1, indicating an invalid command. 779 """ 780 if not self._grpc_connected: 781 self._logger.warning("Reachy is not connected!") 782 return None 783 if not isinstance(goto_id, GoToId): 784 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 785 if goto_id.id == -1: 786 raise ValueError("No answer was found for given move, goto_id is -1") 787 788 response = self._goto_stub.GetGoToRequest(goto_id) 789 790 full_request = process_goto_request(response) 791 792 return full_request 793 794 def _get_goto_state(self, goto_id: GoToId) -> GoToGoalStatus: 795 """Retrieve the current state of a goto command. 796 797 Args: 798 goto_id: The unique GoToId of the goto command. 799 800 Returns: 801 The current state of the command. 802 """ 803 response = self._goto_stub.GetGoToState(goto_id) 804 return response 805 806 def cancel_goto_by_id(self, goto_id: GoToId) -> GoToAck: 807 """Request the cancellation of a specific goto command based on its GoToId. 808 809 Args: 810 goto_id: The ID of the goto command to cancel. 811 812 Returns: 813 A `GoToAck` object indicating whether the cancellation was acknowledged. 814 If the robot is not connected, returns None. 815 816 Raises: 817 TypeError: If `goto_id` is not an instance of `GoToId`. 818 """ 819 if not self._grpc_connected: 820 self._logger.warning("Reachy is not connected!") 821 return None 822 if not isinstance(goto_id, GoToId): 823 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 824 if goto_id.id == -1: 825 self._logger.error("cancel_goto_by_id() asked for unvalid movement. Goto not played.") 826 return GoToAck(ack=True) 827 response = self._goto_stub.CancelGoTo(goto_id) 828 return response 829 830 def cancel_all_goto(self) -> GoToAck: 831 """Cancel all active goto commands. 832 833 Returns: 834 A `GoToAck` object indicating whether the cancellation was acknowledged. 835 """ 836 if not self._grpc_connected: 837 self._logger.warning("Reachy is not connected!") 838 return None 839 response = self._goto_stub.CancelAllGoTo(Empty()) 840 return response 841 842 def send_goal_positions(self, check_positions: bool = False) -> None: 843 """Send the goal positions to the robot. 844 845 If goal positions have been specified for any joint of the robot, sends them to the robot. 846 847 Args : 848 check_positions: A boolean indicating whether to check the positions after sending the command. 849 Defaults to True. 850 """ 851 if not self.info: 852 self._logger.warning("Reachy is not connected!") 853 return 854 855 commands: Dict[str, ArmComponentsCommands | HeadComponentsCommands | HandPositionRequest] = {} 856 for part in [self.r_arm, self.l_arm, self.head]: 857 self._add_component_commands(part, commands, check_positions) 858 859 if self.r_arm is not None: 860 self._add_component_commands(self.r_arm.gripper, commands, check_positions) 861 if self.l_arm is not None: 862 self._add_component_commands(self.l_arm.gripper, commands, check_positions) 863 864 components_commands = ReachyComponentsCommands(**commands) 865 self._stub.SendComponentsCommands(components_commands) 866 867 def _add_component_commands( 868 self, 869 part: JointsBasedPart | Hand | None, 870 commands: Dict[str, HeadComponentsCommands | ArmComponentsCommands | HandPositionRequest], 871 check_positions: bool, 872 ) -> None: 873 """Get the current component commands.""" 874 if part is not None: 875 if part.is_off(): 876 self._logger.warning(f"{part._part_id.name} is off. Command not sent.") 877 return 878 part_command = part._get_goal_positions_message() 879 if part_command is not None: 880 commands[f"{part._part_id.name}_commands"] = part_command 881 part._clean_outgoing_goal_positions() 882 if check_positions: 883 part._post_send_goal_positions()
Named tuple for easy access to goto request on full body
Inherited Members
- builtins.tuple
- index
- count
53class ReachySDK: 54 """The ReachySDK class manages the connection and interaction with a Reachy robot. 55 56 This class handles: 57 - Establishing and maintaining a connection with the robot via gRPC. 58 - Accessing and controlling various parts of the robot, such as the arms, head, and mobile base. 59 - Managing robot components including actuators, joints, cameras, and audio. 60 - Synchronizing robot state with the server in the background to keep data up-to-date. 61 - Providing utility functions for common tasks such as turning on/off motors, sending goal positions, 62 and performing movements. 63 """ 64 65 _instances_by_host: Dict[str, "ReachySDK"] = {} 66 67 def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: 68 """Ensure only one connected instance per IP is created.""" 69 if host in cls._instances_by_host: 70 if cls._instances_by_host[host]._grpc_connected: 71 return cls._instances_by_host[host] 72 else: 73 del cls._instances_by_host[host] 74 75 instance = super().__new__(cls) 76 cls._instances_by_host[host] = instance 77 return instance 78 79 def __init__( 80 self, 81 host: str, 82 sdk_port: int = 50051, 83 audio_port: int = 50063, 84 video_port: int = 50065, 85 ) -> None: 86 """Initialize a connection to the robot. 87 88 Args: 89 host: The IP address or hostname of the robot. 90 sdk_port: The gRPC port for the SDK. Default is 50051. 91 audio_port: The gRPC port for audio services. Default is 50063. 92 video_port: The gRPC port for video services. Default is 50065. 93 """ 94 self._logger = getLogger(__name__) 95 96 if hasattr(self, "_initialized"): 97 self._logger.warning("An instance already exists.") 98 return 99 100 self._host = host 101 self._sdk_port = sdk_port 102 self._audio_port = audio_port 103 self._video_port = video_port 104 105 self._grpc_connected = False 106 self._initialized = True 107 108 self._r_arm: Optional[Arm] = None 109 self._l_arm: Optional[Arm] = None 110 self._head: Optional[Head] = None 111 self._cameras: Optional[CameraManager] = None 112 self._mobile_base: Optional[MobileBase] = None 113 self._info: Optional[ReachyInfo] = None 114 self._tripod: Optional[Tripod] = None 115 116 self._update_timestamp: Timestamp = Timestamp(seconds=0) 117 118 self.connect() 119 120 def connect(self) -> None: 121 """Connects the SDK to the robot.""" 122 if self._grpc_connected: 123 self._logger.warning("Already connected to Reachy.") 124 return 125 126 self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}") 127 128 self._stop_flag = threading.Event() 129 130 try: 131 self._get_info() 132 except ConnectionError: 133 self._logger.error( 134 f"Could not connect to Reachy with on IP address {self._host}, " 135 "check that the sdk server is running and that the IP is correct." 136 ) 137 self._grpc_connected = False 138 return 139 140 self._setup_parts() 141 self._setup_audio() 142 self._cameras = self._setup_video() 143 144 self._sync_thread = threading.Thread(target=self._start_sync_in_bg) 145 self._sync_thread.daemon = True 146 self._sync_thread.start() 147 148 self._audit_thread = threading.Thread(target=self._audit) 149 self._audit_thread.daemon = True 150 self._audit_thread.start() 151 152 self._grpc_connected = True 153 self._logger.info("Connected to Reachy.") 154 155 def disconnect(self, lost_connection: bool = False) -> None: 156 """Disconnect the SDK from the robot's server. 157 158 Args: 159 lost_connection: If `True`, indicates that the connection was lost unexpectedly. 160 """ 161 if self._host in self._instances_by_host: 162 del self._instances_by_host[self._host] 163 164 if not self._grpc_connected: 165 self._logger.warning("Already disconnected from Reachy.") 166 return 167 168 self._grpc_connected = False 169 self._grpc_channel.close() 170 self._grpc_channel = None 171 172 self._head = None 173 self._r_arm = None 174 self._l_arm = None 175 self._mobile_base = None 176 177 self._logger.info("Disconnected from Reachy.") 178 179 def __repr__(self) -> str: 180 """Clean representation of a Reachy.""" 181 if not self._grpc_connected or self.info is None: 182 return "Reachy is not connected" 183 184 s = "\n\t".join([part_name + ": " + str(part) for part_name, part in self.info._enabled_parts.items()]) 185 repr_template = ( 186 '<Reachy host="{host}" connected={connected} on={on} \n' 187 " battery_voltage={battery_voltage} \n" 188 " parts=\n\t{parts} \n>" 189 ) 190 return repr_template.format( 191 host=self._host, 192 connected=self._grpc_connected, 193 on=self.is_on(), 194 battery_voltage=self.info.battery_voltage, 195 parts=s, 196 ) 197 198 @property 199 def info(self) -> Optional[ReachyInfo]: 200 """Get ReachyInfo if connected.""" 201 if not self._grpc_connected: 202 self._logger.error("Cannot get info, not connected to Reachy") 203 return None 204 return self._info 205 206 @property 207 def head(self) -> Optional[Head]: 208 """Get Reachy's head.""" 209 if not self._grpc_connected: 210 self._logger.error("Cannot get head, not connected to Reachy") 211 return None 212 if self._head is None: 213 self._logger.error("head does not exist with this configuration") 214 return None 215 return self._head 216 217 @property 218 def r_arm(self) -> Optional[Arm]: 219 """Get Reachy's right arm.""" 220 if not self._grpc_connected: 221 self._logger.error("Cannot get r_arm, not connected to Reachy") 222 return None 223 if self._r_arm is None: 224 self._logger.error("r_arm does not exist with this configuration") 225 return None 226 return self._r_arm 227 228 @property 229 def l_arm(self) -> Optional[Arm]: 230 """Get Reachy's left arm.""" 231 if not self._grpc_connected: 232 self._logger.error("Cannot get l_arm, not connected to Reachy") 233 return None 234 if self._l_arm is None: 235 self._logger.error("l_arm does not exist with this configuration") 236 return None 237 return self._l_arm 238 239 @property 240 def mobile_base(self) -> Optional[MobileBase]: 241 """Get Reachy's mobile base.""" 242 if not self._grpc_connected: 243 self._logger.error("Cannot get mobile_base, not connected to Reachy") 244 return None 245 if self._mobile_base is None: 246 self._logger.error("mobile_base does not exist with this configuration") 247 return None 248 return self._mobile_base 249 250 @property 251 def tripod(self) -> Optional[Tripod]: 252 """Get Reachy's fixed tripod.""" 253 if not self._grpc_connected: 254 self._logger.error("Cannot get tripod, not connected to Reachy") 255 return None 256 if self._tripod is None: 257 self._logger.error("tripod does not exist with this configuration") 258 return None 259 return self._tripod 260 261 @property 262 def joints(self) -> CustomDict[str, OrbitaJoint]: 263 """Return a dictionary of all joints of the robot. 264 265 The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects. 266 """ 267 if not self._grpc_connected or not self.info: 268 self._logger.warning("Cannot get joints, not connected to Reachy.") 269 return CustomDict({}) 270 _joints: CustomDict[str, OrbitaJoint] = CustomDict({}) 271 for part_name in self.info._enabled_parts: 272 part = getattr(self, part_name) 273 for joint_name, joint in part.joints.items(): 274 _joints[part_name + "." + joint_name] = joint 275 return _joints 276 277 @property 278 def _actuators(self) -> Dict[str, Orbita2d | Orbita3d]: 279 """Return a dictionary of all actuators of the robot. 280 281 The dictionary keys are the actuator names, and the values are the corresponding actuator objects. 282 """ 283 if not self._grpc_connected or not self.info: 284 self._logger.warning("Cannot get actuators, not connected to Reachy.") 285 return {} 286 _actuators: Dict[str, Orbita2d | Orbita3d] = {} 287 for part_name in self.info._enabled_parts: 288 part = getattr(self, part_name) 289 for actuator_name, actuator in part._actuators.items(): 290 _actuators[part_name + "." + actuator_name] = actuator 291 return _actuators 292 293 def is_connected(self) -> bool: 294 """Check if the SDK is connected to the robot. 295 296 Returns: 297 `True` if connected, `False` otherwise. 298 """ 299 return self._grpc_connected 300 301 @property 302 def _grpc_status(self) -> str: 303 """Get the current connection status of the robot's gRPC server. 304 305 Returns: 306 "connected" if connected, "disconnected" otherwise. 307 """ 308 if self._grpc_connected: 309 return "connected" 310 else: 311 return "disconnected" 312 313 @_grpc_status.setter 314 def _grpc_status(self, status: str) -> None: 315 """Set the connection status of the robot's gRPC server. 316 317 Args: 318 status: The connection status to set, must be either "connected" or "disconnected". 319 320 Raises: 321 ValueError: If the status is not "connected" or "disconnected". 322 """ 323 if status == "connected": 324 self._grpc_connected = True 325 elif status == "disconnected": 326 self._grpc_connected = False 327 else: 328 raise ValueError("_grpc_status can only be set to 'connected' or 'disconnected'") 329 330 @property 331 def cameras(self) -> Optional[CameraManager]: 332 """Get the camera manager if available and connected.""" 333 if not self._grpc_connected: 334 self._logger.error("Cannot get cameras, not connected to Reachy") 335 return None 336 return self._cameras 337 338 def _get_info(self) -> None: 339 """Retrieve basic information about the robot. 340 341 Gathers data on the robot's parts, hardware and software versions, and serial number. 342 """ 343 self._stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel) 344 try: 345 self._robot = self._stub.GetReachy(Empty()) 346 except _InactiveRpcError: 347 raise ConnectionError() 348 349 self._info = ReachyInfo(self._robot) 350 self._grpc_connected = True 351 352 def _setup_audio(self) -> None: 353 """Initializes the audio grpc client.""" 354 self.audio = Audio(self._host, self._audio_port) 355 356 def _setup_video(self) -> Optional[CameraManager]: 357 """Set up the video server for the robot. 358 359 Returns: 360 A CameraManager instance if the video server connection is successful, otherwise None. 361 """ 362 try: 363 return CameraManager(self._host, self._video_port) 364 365 except Exception as e: 366 self._logger.error(f"Failed to connect to video server with error: {e}.\nReachySDK.video will not be available.") 367 return None 368 369 def _setup_part_r_arm(self, initial_state: ReachyState) -> None: 370 """Set up the robot's right arm based on the initial state.""" 371 if not self.info: 372 self._logger.warning("Reachy is not connected") 373 return None 374 375 if self._robot.HasField("r_arm"): 376 if initial_state.r_arm_state.activated: 377 r_arm = Arm(self._robot.r_arm, initial_state.r_arm_state, self._grpc_channel, self._goto_stub) 378 self._r_arm = r_arm 379 self.info._enabled_parts["r_arm"] = self._r_arm 380 if self._robot.HasField("r_hand"): 381 self._r_arm._init_hand(self._robot.r_hand, initial_state.r_hand_state) 382 else: 383 self.info._disabled_parts.append("r_arm") 384 385 def _setup_part_l_arm(self, initial_state: ReachyState) -> None: 386 """Set up the robot's left arm based on the initial state.""" 387 if not self.info: 388 self._logger.warning("Reachy is not connected") 389 return None 390 391 if self._robot.HasField("l_arm"): 392 if initial_state.l_arm_state.activated: 393 l_arm = Arm(self._robot.l_arm, initial_state.l_arm_state, self._grpc_channel, self._goto_stub) 394 self._l_arm = l_arm 395 self.info._enabled_parts["l_arm"] = self._l_arm 396 if self._robot.HasField("l_hand"): 397 self._l_arm._init_hand(self._robot.l_hand, initial_state.l_hand_state) 398 else: 399 self.info._disabled_parts.append("l_arm") 400 401 def _setup_part_mobile_base(self, initial_state: ReachyState) -> None: 402 """Set up the robot's mobile base based on the initial state.""" 403 if not self.info: 404 self._logger.warning("Reachy is not connected") 405 return None 406 407 if self._robot.HasField("mobile_base"): 408 self._mobile_base = MobileBase( 409 self._robot.mobile_base, initial_state.mobile_base_state, self._grpc_channel, self._goto_stub 410 ) 411 self.info._set_mobile_base(self._mobile_base) 412 413 def _setup_part_head(self, initial_state: ReachyState) -> None: 414 """Set up the robot's head based on the initial state.""" 415 if not self.info: 416 self._logger.warning("Reachy is not connected") 417 return None 418 419 if self._robot.HasField("head"): 420 if initial_state.head_state.activated: 421 head = Head(self._robot.head, initial_state.head_state, self._grpc_channel, self._goto_stub) 422 self._head = head 423 self.info._enabled_parts["head"] = self._head 424 else: 425 self.info._disabled_parts.append("head") 426 427 def _setup_part_tripod(self, initial_state: ReachyState) -> None: 428 """Set up the robot's tripod based on the initial state.""" 429 if not self.info: 430 self._logger.warning("Reachy is not connected") 431 return None 432 433 if self._robot.HasField("tripod"): 434 tripod = Tripod(self._robot.tripod, initial_state.tripod_state, self._grpc_channel) 435 self._tripod = tripod 436 437 def _setup_parts(self) -> None: 438 """Initialize all parts of the robot. 439 440 Retrieves the state of each part, creates instances, and adds them to the ReachySDK instance. 441 """ 442 setup_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel) 443 self._goto_stub = GoToServiceStub(self._grpc_channel) 444 initial_state = setup_stub.GetReachyState(self._robot.id) 445 446 self._setup_part_r_arm(initial_state) 447 self._setup_part_l_arm(initial_state) 448 self._setup_part_head(initial_state) 449 self._setup_part_mobile_base(initial_state) 450 self._setup_part_tripod(initial_state) 451 452 def get_update_timestamp(self) -> int: 453 """Returns the timestamp (ns) of the last update. 454 455 The timestamp is generated by ROS running on Reachy. 456 457 Returns: 458 timestamp (int) in nanoseconds. 459 """ 460 return self._update_timestamp.ToNanoseconds() 461 462 def _start_sync_in_bg(self) -> None: 463 """Start background synchronization with the robot.""" 464 reachy_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel) 465 self._get_stream_update_loop(reachy_stub, freq=100) 466 467 def _get_stream_update_loop(self, reachy_stub: reachy_pb2_grpc.ReachyServiceStub, freq: float) -> None: 468 """Update the robot's state at a specified frequency. 469 470 Args: 471 reachy_stub: The gRPC stub for communication with the robot. 472 freq: The frequency (in Hz) at which to update the robot's state. 473 """ 474 stream_req = reachy_pb2.ReachyStreamStateRequest(id=self._robot.id, publish_frequency=freq) 475 try: 476 for state_update in reachy_stub.StreamReachyState(stream_req): 477 self._update_timestamp = state_update.timestamp 478 479 self._update_part(self._l_arm, state_update.l_arm_state) 480 self._update_part(self._r_arm, state_update.r_arm_state) 481 self._update_part(self._head, state_update.head_state) 482 self._update_part(self._mobile_base, state_update.mobile_base_state) 483 self._update_part(self._tripod, state_update.tripod_state) 484 485 if self._l_arm and self._l_arm.gripper: 486 self._l_arm.gripper._update_with(state_update.l_hand_state) 487 if self._r_arm and self._r_arm.gripper: 488 self._r_arm.gripper._update_with(state_update.r_hand_state) 489 490 except grpc.RpcError as e: 491 if e.code() == grpc.StatusCode.CANCELLED: 492 self._logger.warning("Reachy gRPC stream is shutting down.") 493 else: 494 self._grpc_connected = False 495 raise ConnectionError(f"Connection with Reachy ip:{self._host} lost, check the SDK server status.") 496 497 def _update_part(self, part: Optional[Any], state: Any) -> None: 498 """Helper function to update a robot part if it exists.""" 499 if part is not None: 500 part._update_with(state) 501 502 def _audit(self) -> None: 503 """Periodically perform an audit of the robot's components.""" 504 while self._grpc_connected: 505 audit_status = self._stub.Audit(self._robot.id) 506 if self._l_arm is not None and audit_status.HasField("l_arm_status"): 507 self._l_arm._update_audit_status(audit_status.l_arm_status) 508 if self._l_arm.gripper is not None and audit_status.HasField("l_hand_status"): 509 self._l_arm.gripper._update_audit_status(audit_status.l_hand_status) 510 if self._r_arm is not None and audit_status.HasField("r_arm_status"): 511 self._r_arm._update_audit_status(audit_status.r_arm_status) 512 if self._r_arm.gripper is not None and audit_status.HasField("r_hand_status"): 513 self._r_arm.gripper._update_audit_status(audit_status.r_hand_status) 514 if self._head is not None and audit_status.HasField("head_status"): 515 self._head._update_audit_status(audit_status.head_status) 516 if self._mobile_base is not None and audit_status.HasField("mobile_base_status"): 517 self._mobile_base._update_audit_status(audit_status.mobile_base_status) 518 time.sleep(1) 519 520 @property 521 def audit(self) -> Dict[str, Dict[str, str]]: 522 """Return the audit status of all enabled parts of the robot.""" 523 audit_dict: Dict[str, Dict[str, str]] = {} 524 if not self._grpc_connected or not self.info: 525 self._logger.warning("Reachy is not connected!") 526 if self.info is not None: 527 for part in self.info._enabled_parts.values(): 528 audit_dict[part._part_id.name] = part.audit 529 return audit_dict 530 531 def turn_on(self) -> bool: 532 """Activate all motors of the robot's parts if all of them are not already turned on. 533 534 Returns: 535 `True` if successful, `False` otherwise. 536 """ 537 if not self._grpc_connected or not self.info: 538 self._logger.warning("Cannot turn on Reachy, not connected.") 539 return False 540 541 speed_limit_high = 25 542 max_iterations = 10 543 ite = 0 544 545 while not self._is_fully_on() and ite < max_iterations: 546 for part in self.info._enabled_parts.values(): 547 part.set_speed_limits(1) 548 time.sleep(0.05) 549 for part in self.info._enabled_parts.values(): 550 part._turn_on() 551 if self._mobile_base is not None: 552 self._mobile_base._turn_on() 553 time.sleep(0.05) 554 for part in self.info._enabled_parts.values(): 555 part.set_speed_limits(speed_limit_high) 556 time.sleep(0.4) 557 ite += 1 558 559 if ite == max_iterations: 560 self._logger.warning("Failed to turn on Reachy,") 561 return False 562 563 return True 564 565 def turn_off(self) -> bool: 566 """Turn all motors of enabled parts off. 567 568 All enabled parts' motors will then be compliant. 569 """ 570 if not self._grpc_connected or not self.info: 571 self._logger.warning("Cannot turn off Reachy, not connected.") 572 return False 573 for part in self.info._enabled_parts.values(): 574 part._turn_off() 575 if self._mobile_base is not None: 576 self._mobile_base._turn_off() 577 time.sleep(0.5) 578 579 return True 580 581 def turn_off_smoothly(self) -> bool: 582 """Turn all motors of robot parts off. 583 584 Arm torques are reduced during 3 seconds, then all parts' motors will be compliant. 585 """ 586 if not self._grpc_connected or not self.info: 587 self._logger.warning("Cannot turn off Reachy, not connected.") 588 return False 589 speed_limit_high = 25 590 # Enough to sustain the arm weight 591 torque_limit_low = 50 592 torque_limit_high = 100 593 duration = 3 594 arms_list = [] 595 596 if hasattr(self, "_mobile_base") and self._mobile_base is not None: 597 self._mobile_base._turn_off() 598 for part in self.info._enabled_parts.values(): 599 if "arm" in part._part_id.name: 600 part.set_torque_limits(torque_limit_low) 601 part.set_speed_limits(speed_limit_high) 602 part.goto_posture(duration=duration, wait_for_goto_end=False) 603 arms_list.append(part) 604 else: 605 part._turn_off() 606 607 countingTime = 0 608 while countingTime < duration: 609 time.sleep(1) 610 torque_limit_low -= 15 611 for arm_part in arms_list: 612 arm_part.set_torque_limits(torque_limit_low) 613 countingTime += 1 614 615 for arm_part in arms_list: 616 arm_part._turn_off() 617 arm_part.set_torque_limits(torque_limit_high) 618 619 time.sleep(0.5) 620 return True 621 622 def is_on(self) -> bool: 623 """Check if all actuators of Reachy parts are on (stiff). 624 625 Returns: 626 `True` if all are stiff, `False` otherwise. 627 """ 628 if not self.info: 629 self._logger.warning("Reachy is not connected!") 630 return False 631 632 for part in self.info._enabled_parts.values(): 633 if not part.is_on(): 634 return False 635 if self._mobile_base is not None and self._mobile_base.is_off(): 636 return False 637 return True 638 639 def is_off(self) -> bool: 640 """Check if all actuators of Reachy parts are off (compliant). 641 642 Returns: 643 `True` if all are compliant, `False` otherwise. 644 """ 645 if not self.info: 646 self._logger.warning("Reachy is not connected!") 647 return True 648 649 for part in self.info._enabled_parts.values(): 650 if part.is_on(): 651 return False 652 if self._mobile_base is not None and self._mobile_base.is_on(): 653 return False 654 return True 655 656 def _is_fully_on(self) -> bool: 657 """Check if the robot and its grippers (if they exist) are turned on.""" 658 return self.is_on() and all(arm.gripper.is_on() if arm and arm.gripper else True for arm in [self._l_arm, self._r_arm]) 659 660 def reset_default_limits(self) -> None: 661 """Set back speed and torque limits of all parts to maximum value (100).""" 662 if not self.info: 663 self._logger.warning("Reachy is not connected!") 664 return 665 666 for part in self.info._enabled_parts.values(): 667 if issubclass(type(part), JointsBasedPart): 668 part.set_speed_limits(100) 669 part.set_torque_limits(100) 670 time.sleep(0.5) 671 672 def goto_posture( 673 self, 674 common_posture: str = "default", 675 duration: float = 2, 676 wait: bool = False, 677 wait_for_goto_end: bool = True, 678 interpolation_mode: str = "minimum_jerk", 679 open_gripper: bool = False, 680 ) -> GoToHomeId: 681 """Move the robot to a predefined posture. 682 683 Args: 684 common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'. 685 duration: The time duration in seconds for the robot to move to the specified posture. 686 Defaults to 2. 687 wait: Determines whether the program should wait for the movement to finish before 688 returning. If set to `True`, the program waits for the movement to complete before continuing 689 execution. Defaults to `False`. 690 wait_for_goto_end: Specifies whether commands will be sent to a part immediately or 691 only after all previous commands in the queue have been executed. If set to `False`, the program 692 will cancel all executing moves and queues. Defaults to `True`. 693 interpolation_mode: The type of interpolation used when moving the arm's joints. 694 Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'. 695 open_gripper: If `True`, the gripper will open, if `False`, it stays in its current position. 696 Defaults to `False`. 697 698 Returns: 699 A GoToHomeId containing movement GoToIds for each part. 700 """ 701 if common_posture not in ["default", "elbow_90"]: 702 raise ValueError(f"common_posture {common_posture} not supported! Should be 'default' or 'elbow_90'") 703 head_id = None 704 r_arm_id = None 705 l_arm_id = None 706 if not wait_for_goto_end: 707 self.cancel_all_goto() 708 if self.head is not None: 709 is_last_commmand = self.r_arm is None and self.l_arm is None 710 wait_head = wait and is_last_commmand 711 head_id = self.head.goto_posture( 712 duration=duration, wait=wait_head, wait_for_goto_end=wait_for_goto_end, interpolation_mode=interpolation_mode 713 ) 714 if self.r_arm is not None: 715 is_last_commmand = self.l_arm is None 716 wait_r_arm = wait and is_last_commmand 717 r_arm_id = self.r_arm.goto_posture( 718 common_posture, 719 duration=duration, 720 wait=wait_r_arm, 721 wait_for_goto_end=wait_for_goto_end, 722 interpolation_mode=interpolation_mode, 723 open_gripper=open_gripper, 724 ) 725 if self.l_arm is not None: 726 l_arm_id = self.l_arm.goto_posture( 727 common_posture, 728 duration=duration, 729 wait=wait, 730 wait_for_goto_end=wait_for_goto_end, 731 interpolation_mode=interpolation_mode, 732 open_gripper=open_gripper, 733 ) 734 ids = GoToHomeId( 735 head=head_id, 736 r_arm=r_arm_id, 737 l_arm=l_arm_id, 738 ) 739 return ids 740 741 def is_goto_finished(self, goto_id: GoToId) -> bool: 742 """Check if a goto command has completed. 743 744 Args: 745 goto_id: The unique GoToId of the goto command. 746 747 Returns: 748 `True` if the command is complete, `False` otherwise. 749 """ 750 if not self._grpc_connected: 751 self._logger.warning("Reachy is not connected!") 752 return False 753 if not isinstance(goto_id, GoToId): 754 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 755 if goto_id.id == -1: 756 self._logger.error("is_goto_finished() asked for unvalid movement. Goto not played.") 757 return True 758 state = self._get_goto_state(goto_id) 759 result = bool( 760 state.goal_status == GoalStatus.STATUS_ABORTED 761 or state.goal_status == GoalStatus.STATUS_CANCELED 762 or state.goal_status == GoalStatus.STATUS_SUCCEEDED 763 ) 764 return result 765 766 def get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]: 767 """Retrieve the details of a goto command based on its GoToId. 768 769 Args: 770 goto_id: The ID of the goto command for which details are requested. 771 772 Returns: 773 A `SimplifiedRequest` object containing the part name, joint goal positions 774 (in degrees), movement duration, and interpolation mode. 775 Returns `None` if the robot is not connected or if the `goto_id` is invalid. 776 777 Raises: 778 TypeError: If `goto_id` is not an instance of `GoToId`. 779 ValueError: If `goto_id` is -1, indicating an invalid command. 780 """ 781 if not self._grpc_connected: 782 self._logger.warning("Reachy is not connected!") 783 return None 784 if not isinstance(goto_id, GoToId): 785 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 786 if goto_id.id == -1: 787 raise ValueError("No answer was found for given move, goto_id is -1") 788 789 response = self._goto_stub.GetGoToRequest(goto_id) 790 791 full_request = process_goto_request(response) 792 793 return full_request 794 795 def _get_goto_state(self, goto_id: GoToId) -> GoToGoalStatus: 796 """Retrieve the current state of a goto command. 797 798 Args: 799 goto_id: The unique GoToId of the goto command. 800 801 Returns: 802 The current state of the command. 803 """ 804 response = self._goto_stub.GetGoToState(goto_id) 805 return response 806 807 def cancel_goto_by_id(self, goto_id: GoToId) -> GoToAck: 808 """Request the cancellation of a specific goto command based on its GoToId. 809 810 Args: 811 goto_id: The ID of the goto command to cancel. 812 813 Returns: 814 A `GoToAck` object indicating whether the cancellation was acknowledged. 815 If the robot is not connected, returns None. 816 817 Raises: 818 TypeError: If `goto_id` is not an instance of `GoToId`. 819 """ 820 if not self._grpc_connected: 821 self._logger.warning("Reachy is not connected!") 822 return None 823 if not isinstance(goto_id, GoToId): 824 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 825 if goto_id.id == -1: 826 self._logger.error("cancel_goto_by_id() asked for unvalid movement. Goto not played.") 827 return GoToAck(ack=True) 828 response = self._goto_stub.CancelGoTo(goto_id) 829 return response 830 831 def cancel_all_goto(self) -> GoToAck: 832 """Cancel all active goto commands. 833 834 Returns: 835 A `GoToAck` object indicating whether the cancellation was acknowledged. 836 """ 837 if not self._grpc_connected: 838 self._logger.warning("Reachy is not connected!") 839 return None 840 response = self._goto_stub.CancelAllGoTo(Empty()) 841 return response 842 843 def send_goal_positions(self, check_positions: bool = False) -> None: 844 """Send the goal positions to the robot. 845 846 If goal positions have been specified for any joint of the robot, sends them to the robot. 847 848 Args : 849 check_positions: A boolean indicating whether to check the positions after sending the command. 850 Defaults to True. 851 """ 852 if not self.info: 853 self._logger.warning("Reachy is not connected!") 854 return 855 856 commands: Dict[str, ArmComponentsCommands | HeadComponentsCommands | HandPositionRequest] = {} 857 for part in [self.r_arm, self.l_arm, self.head]: 858 self._add_component_commands(part, commands, check_positions) 859 860 if self.r_arm is not None: 861 self._add_component_commands(self.r_arm.gripper, commands, check_positions) 862 if self.l_arm is not None: 863 self._add_component_commands(self.l_arm.gripper, commands, check_positions) 864 865 components_commands = ReachyComponentsCommands(**commands) 866 self._stub.SendComponentsCommands(components_commands) 867 868 def _add_component_commands( 869 self, 870 part: JointsBasedPart | Hand | None, 871 commands: Dict[str, HeadComponentsCommands | ArmComponentsCommands | HandPositionRequest], 872 check_positions: bool, 873 ) -> None: 874 """Get the current component commands.""" 875 if part is not None: 876 if part.is_off(): 877 self._logger.warning(f"{part._part_id.name} is off. Command not sent.") 878 return 879 part_command = part._get_goal_positions_message() 880 if part_command is not None: 881 commands[f"{part._part_id.name}_commands"] = part_command 882 part._clean_outgoing_goal_positions() 883 if check_positions: 884 part._post_send_goal_positions()
The ReachySDK class manages the connection and interaction with a Reachy robot.
This class handles:
- Establishing and maintaining a connection with the robot via gRPC.
- Accessing and controlling various parts of the robot, such as the arms, head, and mobile base.
- Managing robot components including actuators, joints, cameras, and audio.
- Synchronizing robot state with the server in the background to keep data up-to-date.
- Providing utility functions for common tasks such as turning on/off motors, sending goal positions, and performing movements.
79 def __init__( 80 self, 81 host: str, 82 sdk_port: int = 50051, 83 audio_port: int = 50063, 84 video_port: int = 50065, 85 ) -> None: 86 """Initialize a connection to the robot. 87 88 Args: 89 host: The IP address or hostname of the robot. 90 sdk_port: The gRPC port for the SDK. Default is 50051. 91 audio_port: The gRPC port for audio services. Default is 50063. 92 video_port: The gRPC port for video services. Default is 50065. 93 """ 94 self._logger = getLogger(__name__) 95 96 if hasattr(self, "_initialized"): 97 self._logger.warning("An instance already exists.") 98 return 99 100 self._host = host 101 self._sdk_port = sdk_port 102 self._audio_port = audio_port 103 self._video_port = video_port 104 105 self._grpc_connected = False 106 self._initialized = True 107 108 self._r_arm: Optional[Arm] = None 109 self._l_arm: Optional[Arm] = None 110 self._head: Optional[Head] = None 111 self._cameras: Optional[CameraManager] = None 112 self._mobile_base: Optional[MobileBase] = None 113 self._info: Optional[ReachyInfo] = None 114 self._tripod: Optional[Tripod] = None 115 116 self._update_timestamp: Timestamp = Timestamp(seconds=0) 117 118 self.connect()
Initialize a connection to the robot.
Arguments:
- host: The IP address or hostname of the robot.
- sdk_port: The gRPC port for the SDK. Default is 50051.
- audio_port: The gRPC port for audio services. Default is 50063.
- video_port: The gRPC port for video services. Default is 50065.
120 def connect(self) -> None: 121 """Connects the SDK to the robot.""" 122 if self._grpc_connected: 123 self._logger.warning("Already connected to Reachy.") 124 return 125 126 self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}") 127 128 self._stop_flag = threading.Event() 129 130 try: 131 self._get_info() 132 except ConnectionError: 133 self._logger.error( 134 f"Could not connect to Reachy with on IP address {self._host}, " 135 "check that the sdk server is running and that the IP is correct." 136 ) 137 self._grpc_connected = False 138 return 139 140 self._setup_parts() 141 self._setup_audio() 142 self._cameras = self._setup_video() 143 144 self._sync_thread = threading.Thread(target=self._start_sync_in_bg) 145 self._sync_thread.daemon = True 146 self._sync_thread.start() 147 148 self._audit_thread = threading.Thread(target=self._audit) 149 self._audit_thread.daemon = True 150 self._audit_thread.start() 151 152 self._grpc_connected = True 153 self._logger.info("Connected to Reachy.")
Connects the SDK to the robot.
155 def disconnect(self, lost_connection: bool = False) -> None: 156 """Disconnect the SDK from the robot's server. 157 158 Args: 159 lost_connection: If `True`, indicates that the connection was lost unexpectedly. 160 """ 161 if self._host in self._instances_by_host: 162 del self._instances_by_host[self._host] 163 164 if not self._grpc_connected: 165 self._logger.warning("Already disconnected from Reachy.") 166 return 167 168 self._grpc_connected = False 169 self._grpc_channel.close() 170 self._grpc_channel = None 171 172 self._head = None 173 self._r_arm = None 174 self._l_arm = None 175 self._mobile_base = None 176 177 self._logger.info("Disconnected from Reachy.")
Disconnect the SDK from the robot's server.
Arguments:
- lost_connection: If
True
, indicates that the connection was lost unexpectedly.
198 @property 199 def info(self) -> Optional[ReachyInfo]: 200 """Get ReachyInfo if connected.""" 201 if not self._grpc_connected: 202 self._logger.error("Cannot get info, not connected to Reachy") 203 return None 204 return self._info
Get ReachyInfo if connected.
206 @property 207 def head(self) -> Optional[Head]: 208 """Get Reachy's head.""" 209 if not self._grpc_connected: 210 self._logger.error("Cannot get head, not connected to Reachy") 211 return None 212 if self._head is None: 213 self._logger.error("head does not exist with this configuration") 214 return None 215 return self._head
Get Reachy's head.
217 @property 218 def r_arm(self) -> Optional[Arm]: 219 """Get Reachy's right arm.""" 220 if not self._grpc_connected: 221 self._logger.error("Cannot get r_arm, not connected to Reachy") 222 return None 223 if self._r_arm is None: 224 self._logger.error("r_arm does not exist with this configuration") 225 return None 226 return self._r_arm
Get Reachy's right arm.
228 @property 229 def l_arm(self) -> Optional[Arm]: 230 """Get Reachy's left arm.""" 231 if not self._grpc_connected: 232 self._logger.error("Cannot get l_arm, not connected to Reachy") 233 return None 234 if self._l_arm is None: 235 self._logger.error("l_arm does not exist with this configuration") 236 return None 237 return self._l_arm
Get Reachy's left arm.
239 @property 240 def mobile_base(self) -> Optional[MobileBase]: 241 """Get Reachy's mobile base.""" 242 if not self._grpc_connected: 243 self._logger.error("Cannot get mobile_base, not connected to Reachy") 244 return None 245 if self._mobile_base is None: 246 self._logger.error("mobile_base does not exist with this configuration") 247 return None 248 return self._mobile_base
Get Reachy's mobile base.
250 @property 251 def tripod(self) -> Optional[Tripod]: 252 """Get Reachy's fixed tripod.""" 253 if not self._grpc_connected: 254 self._logger.error("Cannot get tripod, not connected to Reachy") 255 return None 256 if self._tripod is None: 257 self._logger.error("tripod does not exist with this configuration") 258 return None 259 return self._tripod
Get Reachy's fixed tripod.
261 @property 262 def joints(self) -> CustomDict[str, OrbitaJoint]: 263 """Return a dictionary of all joints of the robot. 264 265 The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects. 266 """ 267 if not self._grpc_connected or not self.info: 268 self._logger.warning("Cannot get joints, not connected to Reachy.") 269 return CustomDict({}) 270 _joints: CustomDict[str, OrbitaJoint] = CustomDict({}) 271 for part_name in self.info._enabled_parts: 272 part = getattr(self, part_name) 273 for joint_name, joint in part.joints.items(): 274 _joints[part_name + "." + joint_name] = joint 275 return _joints
Return a dictionary of all joints of the robot.
The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects.
293 def is_connected(self) -> bool: 294 """Check if the SDK is connected to the robot. 295 296 Returns: 297 `True` if connected, `False` otherwise. 298 """ 299 return self._grpc_connected
Check if the SDK is connected to the robot.
Returns:
True
if connected,False
otherwise.
330 @property 331 def cameras(self) -> Optional[CameraManager]: 332 """Get the camera manager if available and connected.""" 333 if not self._grpc_connected: 334 self._logger.error("Cannot get cameras, not connected to Reachy") 335 return None 336 return self._cameras
Get the camera manager if available and connected.
452 def get_update_timestamp(self) -> int: 453 """Returns the timestamp (ns) of the last update. 454 455 The timestamp is generated by ROS running on Reachy. 456 457 Returns: 458 timestamp (int) in nanoseconds. 459 """ 460 return self._update_timestamp.ToNanoseconds()
Returns the timestamp (ns) of the last update.
The timestamp is generated by ROS running on Reachy.
Returns:
timestamp (int) in nanoseconds.
520 @property 521 def audit(self) -> Dict[str, Dict[str, str]]: 522 """Return the audit status of all enabled parts of the robot.""" 523 audit_dict: Dict[str, Dict[str, str]] = {} 524 if not self._grpc_connected or not self.info: 525 self._logger.warning("Reachy is not connected!") 526 if self.info is not None: 527 for part in self.info._enabled_parts.values(): 528 audit_dict[part._part_id.name] = part.audit 529 return audit_dict
Return the audit status of all enabled parts of the robot.
531 def turn_on(self) -> bool: 532 """Activate all motors of the robot's parts if all of them are not already turned on. 533 534 Returns: 535 `True` if successful, `False` otherwise. 536 """ 537 if not self._grpc_connected or not self.info: 538 self._logger.warning("Cannot turn on Reachy, not connected.") 539 return False 540 541 speed_limit_high = 25 542 max_iterations = 10 543 ite = 0 544 545 while not self._is_fully_on() and ite < max_iterations: 546 for part in self.info._enabled_parts.values(): 547 part.set_speed_limits(1) 548 time.sleep(0.05) 549 for part in self.info._enabled_parts.values(): 550 part._turn_on() 551 if self._mobile_base is not None: 552 self._mobile_base._turn_on() 553 time.sleep(0.05) 554 for part in self.info._enabled_parts.values(): 555 part.set_speed_limits(speed_limit_high) 556 time.sleep(0.4) 557 ite += 1 558 559 if ite == max_iterations: 560 self._logger.warning("Failed to turn on Reachy,") 561 return False 562 563 return True
Activate all motors of the robot's parts if all of them are not already turned on.
Returns:
True
if successful,False
otherwise.
565 def turn_off(self) -> bool: 566 """Turn all motors of enabled parts off. 567 568 All enabled parts' motors will then be compliant. 569 """ 570 if not self._grpc_connected or not self.info: 571 self._logger.warning("Cannot turn off Reachy, not connected.") 572 return False 573 for part in self.info._enabled_parts.values(): 574 part._turn_off() 575 if self._mobile_base is not None: 576 self._mobile_base._turn_off() 577 time.sleep(0.5) 578 579 return True
Turn all motors of enabled parts off.
All enabled parts' motors will then be compliant.
581 def turn_off_smoothly(self) -> bool: 582 """Turn all motors of robot parts off. 583 584 Arm torques are reduced during 3 seconds, then all parts' motors will be compliant. 585 """ 586 if not self._grpc_connected or not self.info: 587 self._logger.warning("Cannot turn off Reachy, not connected.") 588 return False 589 speed_limit_high = 25 590 # Enough to sustain the arm weight 591 torque_limit_low = 50 592 torque_limit_high = 100 593 duration = 3 594 arms_list = [] 595 596 if hasattr(self, "_mobile_base") and self._mobile_base is not None: 597 self._mobile_base._turn_off() 598 for part in self.info._enabled_parts.values(): 599 if "arm" in part._part_id.name: 600 part.set_torque_limits(torque_limit_low) 601 part.set_speed_limits(speed_limit_high) 602 part.goto_posture(duration=duration, wait_for_goto_end=False) 603 arms_list.append(part) 604 else: 605 part._turn_off() 606 607 countingTime = 0 608 while countingTime < duration: 609 time.sleep(1) 610 torque_limit_low -= 15 611 for arm_part in arms_list: 612 arm_part.set_torque_limits(torque_limit_low) 613 countingTime += 1 614 615 for arm_part in arms_list: 616 arm_part._turn_off() 617 arm_part.set_torque_limits(torque_limit_high) 618 619 time.sleep(0.5) 620 return True
Turn all motors of robot parts off.
Arm torques are reduced during 3 seconds, then all parts' motors will be compliant.
622 def is_on(self) -> bool: 623 """Check if all actuators of Reachy parts are on (stiff). 624 625 Returns: 626 `True` if all are stiff, `False` otherwise. 627 """ 628 if not self.info: 629 self._logger.warning("Reachy is not connected!") 630 return False 631 632 for part in self.info._enabled_parts.values(): 633 if not part.is_on(): 634 return False 635 if self._mobile_base is not None and self._mobile_base.is_off(): 636 return False 637 return True
Check if all actuators of Reachy parts are on (stiff).
Returns:
True
if all are stiff,False
otherwise.
639 def is_off(self) -> bool: 640 """Check if all actuators of Reachy parts are off (compliant). 641 642 Returns: 643 `True` if all are compliant, `False` otherwise. 644 """ 645 if not self.info: 646 self._logger.warning("Reachy is not connected!") 647 return True 648 649 for part in self.info._enabled_parts.values(): 650 if part.is_on(): 651 return False 652 if self._mobile_base is not None and self._mobile_base.is_on(): 653 return False 654 return True
Check if all actuators of Reachy parts are off (compliant).
Returns:
True
if all are compliant,False
otherwise.
660 def reset_default_limits(self) -> None: 661 """Set back speed and torque limits of all parts to maximum value (100).""" 662 if not self.info: 663 self._logger.warning("Reachy is not connected!") 664 return 665 666 for part in self.info._enabled_parts.values(): 667 if issubclass(type(part), JointsBasedPart): 668 part.set_speed_limits(100) 669 part.set_torque_limits(100) 670 time.sleep(0.5)
Set back speed and torque limits of all parts to maximum value (100).
672 def goto_posture( 673 self, 674 common_posture: str = "default", 675 duration: float = 2, 676 wait: bool = False, 677 wait_for_goto_end: bool = True, 678 interpolation_mode: str = "minimum_jerk", 679 open_gripper: bool = False, 680 ) -> GoToHomeId: 681 """Move the robot to a predefined posture. 682 683 Args: 684 common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'. 685 duration: The time duration in seconds for the robot to move to the specified posture. 686 Defaults to 2. 687 wait: Determines whether the program should wait for the movement to finish before 688 returning. If set to `True`, the program waits for the movement to complete before continuing 689 execution. Defaults to `False`. 690 wait_for_goto_end: Specifies whether commands will be sent to a part immediately or 691 only after all previous commands in the queue have been executed. If set to `False`, the program 692 will cancel all executing moves and queues. Defaults to `True`. 693 interpolation_mode: The type of interpolation used when moving the arm's joints. 694 Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'. 695 open_gripper: If `True`, the gripper will open, if `False`, it stays in its current position. 696 Defaults to `False`. 697 698 Returns: 699 A GoToHomeId containing movement GoToIds for each part. 700 """ 701 if common_posture not in ["default", "elbow_90"]: 702 raise ValueError(f"common_posture {common_posture} not supported! Should be 'default' or 'elbow_90'") 703 head_id = None 704 r_arm_id = None 705 l_arm_id = None 706 if not wait_for_goto_end: 707 self.cancel_all_goto() 708 if self.head is not None: 709 is_last_commmand = self.r_arm is None and self.l_arm is None 710 wait_head = wait and is_last_commmand 711 head_id = self.head.goto_posture( 712 duration=duration, wait=wait_head, wait_for_goto_end=wait_for_goto_end, interpolation_mode=interpolation_mode 713 ) 714 if self.r_arm is not None: 715 is_last_commmand = self.l_arm is None 716 wait_r_arm = wait and is_last_commmand 717 r_arm_id = self.r_arm.goto_posture( 718 common_posture, 719 duration=duration, 720 wait=wait_r_arm, 721 wait_for_goto_end=wait_for_goto_end, 722 interpolation_mode=interpolation_mode, 723 open_gripper=open_gripper, 724 ) 725 if self.l_arm is not None: 726 l_arm_id = self.l_arm.goto_posture( 727 common_posture, 728 duration=duration, 729 wait=wait, 730 wait_for_goto_end=wait_for_goto_end, 731 interpolation_mode=interpolation_mode, 732 open_gripper=open_gripper, 733 ) 734 ids = GoToHomeId( 735 head=head_id, 736 r_arm=r_arm_id, 737 l_arm=l_arm_id, 738 ) 739 return ids
Move the robot to a predefined posture.
Arguments:
- common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'.
- duration: The time duration in seconds for the robot to move to the specified posture. Defaults to 2.
- wait: Determines whether the program should wait for the movement to finish before
returning. If set to
True
, the program waits for the movement to complete before continuing execution. Defaults toFalse
. - wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
only after all previous commands in the queue have been executed. If set to
False
, the program will cancel all executing moves and queues. Defaults toTrue
. - interpolation_mode: The type of interpolation used when moving the arm's joints. Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
- open_gripper: If
True
, the gripper will open, ifFalse
, it stays in its current position. Defaults toFalse
.
Returns:
A GoToHomeId containing movement GoToIds for each part.
741 def is_goto_finished(self, goto_id: GoToId) -> bool: 742 """Check if a goto command has completed. 743 744 Args: 745 goto_id: The unique GoToId of the goto command. 746 747 Returns: 748 `True` if the command is complete, `False` otherwise. 749 """ 750 if not self._grpc_connected: 751 self._logger.warning("Reachy is not connected!") 752 return False 753 if not isinstance(goto_id, GoToId): 754 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 755 if goto_id.id == -1: 756 self._logger.error("is_goto_finished() asked for unvalid movement. Goto not played.") 757 return True 758 state = self._get_goto_state(goto_id) 759 result = bool( 760 state.goal_status == GoalStatus.STATUS_ABORTED 761 or state.goal_status == GoalStatus.STATUS_CANCELED 762 or state.goal_status == GoalStatus.STATUS_SUCCEEDED 763 ) 764 return result
Check if a goto command has completed.
Arguments:
- goto_id: The unique GoToId of the goto command.
Returns:
True
if the command is complete,False
otherwise.
766 def get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]: 767 """Retrieve the details of a goto command based on its GoToId. 768 769 Args: 770 goto_id: The ID of the goto command for which details are requested. 771 772 Returns: 773 A `SimplifiedRequest` object containing the part name, joint goal positions 774 (in degrees), movement duration, and interpolation mode. 775 Returns `None` if the robot is not connected or if the `goto_id` is invalid. 776 777 Raises: 778 TypeError: If `goto_id` is not an instance of `GoToId`. 779 ValueError: If `goto_id` is -1, indicating an invalid command. 780 """ 781 if not self._grpc_connected: 782 self._logger.warning("Reachy is not connected!") 783 return None 784 if not isinstance(goto_id, GoToId): 785 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 786 if goto_id.id == -1: 787 raise ValueError("No answer was found for given move, goto_id is -1") 788 789 response = self._goto_stub.GetGoToRequest(goto_id) 790 791 full_request = process_goto_request(response) 792 793 return full_request
Retrieve the details of a goto command based on its GoToId.
Arguments:
- goto_id: The ID of the goto command for which details are requested.
Returns:
A
SimplifiedRequest
object containing the part name, joint goal positions (in degrees), movement duration, and interpolation mode. ReturnsNone
if the robot is not connected or if thegoto_id
is invalid.
Raises:
- TypeError: If
goto_id
is not an instance ofGoToId
. - ValueError: If
goto_id
is -1, indicating an invalid command.
807 def cancel_goto_by_id(self, goto_id: GoToId) -> GoToAck: 808 """Request the cancellation of a specific goto command based on its GoToId. 809 810 Args: 811 goto_id: The ID of the goto command to cancel. 812 813 Returns: 814 A `GoToAck` object indicating whether the cancellation was acknowledged. 815 If the robot is not connected, returns None. 816 817 Raises: 818 TypeError: If `goto_id` is not an instance of `GoToId`. 819 """ 820 if not self._grpc_connected: 821 self._logger.warning("Reachy is not connected!") 822 return None 823 if not isinstance(goto_id, GoToId): 824 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 825 if goto_id.id == -1: 826 self._logger.error("cancel_goto_by_id() asked for unvalid movement. Goto not played.") 827 return GoToAck(ack=True) 828 response = self._goto_stub.CancelGoTo(goto_id) 829 return response
Request the cancellation of a specific goto command based on its GoToId.
Arguments:
- goto_id: The ID of the goto command to cancel.
Returns:
A
GoToAck
object indicating whether the cancellation was acknowledged. If the robot is not connected, returns None.
Raises:
- TypeError: If
goto_id
is not an instance ofGoToId
.
831 def cancel_all_goto(self) -> GoToAck: 832 """Cancel all active goto commands. 833 834 Returns: 835 A `GoToAck` object indicating whether the cancellation was acknowledged. 836 """ 837 if not self._grpc_connected: 838 self._logger.warning("Reachy is not connected!") 839 return None 840 response = self._goto_stub.CancelAllGoTo(Empty()) 841 return response
Cancel all active goto commands.
Returns:
A
GoToAck
object indicating whether the cancellation was acknowledged.
843 def send_goal_positions(self, check_positions: bool = False) -> None: 844 """Send the goal positions to the robot. 845 846 If goal positions have been specified for any joint of the robot, sends them to the robot. 847 848 Args : 849 check_positions: A boolean indicating whether to check the positions after sending the command. 850 Defaults to True. 851 """ 852 if not self.info: 853 self._logger.warning("Reachy is not connected!") 854 return 855 856 commands: Dict[str, ArmComponentsCommands | HeadComponentsCommands | HandPositionRequest] = {} 857 for part in [self.r_arm, self.l_arm, self.head]: 858 self._add_component_commands(part, commands, check_positions) 859 860 if self.r_arm is not None: 861 self._add_component_commands(self.r_arm.gripper, commands, check_positions) 862 if self.l_arm is not None: 863 self._add_component_commands(self.l_arm.gripper, commands, check_positions) 864 865 components_commands = ReachyComponentsCommands(**commands) 866 self._stub.SendComponentsCommands(components_commands)
Send the goal positions to the robot.
If goal positions have been specified for any joint of the robot, sends them to the robot.
Args :
check_positions: A boolean indicating whether to check the positions after sending the command. Defaults to True.