reachy2_sdk.reachy_sdk

ReachySDK package.

This package provides remote access (via socket) to a Reachy robot. It automatically handles the synchronization with the robot. In particular, you can easily get an always up-to-date robot state (joint positions, sensors value). You can also send joint commands, compute forward or inverse kinematics.

  1"""ReachySDK package.
  2
  3This package provides remote access (via socket) to a Reachy robot.
  4It automatically handles the synchronization with the robot.
  5In particular, you can easily get an always up-to-date robot state (joint positions, sensors value).
  6You can also send joint commands, compute forward or inverse kinematics.
  7"""
  8
  9# from reachy2_sdk_api.dynamixel_motor_pb2_grpc import DynamixelMotorServiceStub
 10# from .dynamixel_motor import DynamixelMotor
 11
 12from __future__ import annotations
 13
 14import threading
 15import time
 16from collections import namedtuple
 17from logging import getLogger
 18from typing import Any, Dict, Optional, Type
 19
 20import grpc
 21from google.protobuf.empty_pb2 import Empty
 22from google.protobuf.timestamp_pb2 import Timestamp
 23from grpc._channel import _InactiveRpcError
 24from reachy2_sdk_api import reachy_pb2, reachy_pb2_grpc
 25from reachy2_sdk_api.arm_pb2 import ArmComponentsCommands
 26from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToGoalStatus, GoToId
 27from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
 28from reachy2_sdk_api.hand_pb2 import HandPositionRequest
 29from reachy2_sdk_api.head_pb2 import HeadComponentsCommands
 30from reachy2_sdk_api.reachy_pb2 import ReachyComponentsCommands, ReachyState
 31
 32from .config.reachy_info import ReachyInfo
 33from .media.audio import Audio
 34from .media.camera_manager import CameraManager
 35from .orbita.orbita2d import Orbita2d
 36from .orbita.orbita3d import Orbita3d
 37from .orbita.orbita_joint import OrbitaJoint
 38from .parts.arm import Arm
 39from .parts.hand import Hand
 40from .parts.head import Head
 41from .parts.joints_based_part import JointsBasedPart
 42from .parts.mobile_base import MobileBase
 43from .parts.tripod import Tripod
 44from .utils.custom_dict import CustomDict
 45from .utils.goto_based_element import process_goto_request
 46from .utils.utils import SimplifiedRequest
 47
 48GoToHomeId = namedtuple("GoToHomeId", ["head", "r_arm", "l_arm"])
 49"""Named tuple for easy access to goto request on full body"""
 50
 51
 52class ReachySDK:
 53    """The ReachySDK class manages the connection and interaction with a Reachy robot.
 54
 55    This class handles:
 56    - Establishing and maintaining a connection with the robot via gRPC.
 57    - Accessing and controlling various parts of the robot, such as the arms, head, and mobile base.
 58    - Managing robot components including actuators, joints, cameras, and audio.
 59    - Synchronizing robot state with the server in the background to keep data up-to-date.
 60    - Providing utility functions for common tasks such as turning on/off motors, sending goal positions,
 61        and performing movements.
 62    """
 63
 64    _instances_by_host: Dict[str, "ReachySDK"] = {}
 65
 66    def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK:
 67        """Ensure only one connected instance per IP is created."""
 68        if host in cls._instances_by_host:
 69            if cls._instances_by_host[host]._grpc_connected:
 70                return cls._instances_by_host[host]
 71            else:
 72                del cls._instances_by_host[host]
 73
 74        instance = super().__new__(cls)
 75        cls._instances_by_host[host] = instance
 76        return instance
 77
 78    def __init__(
 79        self,
 80        host: str,
 81        sdk_port: int = 50051,
 82        audio_port: int = 50063,
 83        video_port: int = 50065,
 84    ) -> None:
 85        """Initialize a connection to the robot.
 86
 87        Args:
 88            host: The IP address or hostname of the robot.
 89            sdk_port: The gRPC port for the SDK. Default is 50051.
 90            audio_port: The gRPC port for audio services. Default is 50063.
 91            video_port: The gRPC port for video services. Default is 50065.
 92        """
 93        self._logger = getLogger(__name__)
 94
 95        if hasattr(self, "_initialized"):
 96            self._logger.warning("An instance already exists.")
 97            return
 98
 99        self._host = host
100        self._sdk_port = sdk_port
101        self._audio_port = audio_port
102        self._video_port = video_port
103
104        self._grpc_connected = False
105        self._initialized = True
106
107        self._r_arm: Optional[Arm] = None
108        self._l_arm: Optional[Arm] = None
109        self._head: Optional[Head] = None
110        self._cameras: Optional[CameraManager] = None
111        self._mobile_base: Optional[MobileBase] = None
112        self._info: Optional[ReachyInfo] = None
113        self._tripod: Optional[Tripod] = None
114
115        self._update_timestamp: Timestamp = Timestamp(seconds=0)
116
117        self.connect()
118
119    def connect(self) -> None:
120        """Connects the SDK to the robot."""
121        if self._grpc_connected:
122            self._logger.warning("Already connected to Reachy.")
123            return
124
125        self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}")
126
127        self._stop_flag = threading.Event()
128
129        try:
130            self._get_info()
131        except ConnectionError:
132            self._logger.error(
133                f"Could not connect to Reachy with on IP address {self._host}, "
134                "check that the sdk server is running and that the IP is correct."
135            )
136            self._grpc_connected = False
137            return
138
139        self._setup_parts()
140        self._setup_audio()
141        self._cameras = self._setup_video()
142
143        self._sync_thread = threading.Thread(target=self._start_sync_in_bg)
144        self._sync_thread.daemon = True
145        self._sync_thread.start()
146
147        self._audit_thread = threading.Thread(target=self._audit)
148        self._audit_thread.daemon = True
149        self._audit_thread.start()
150
151        self._grpc_connected = True
152        self._logger.info("Connected to Reachy.")
153
154    def disconnect(self, lost_connection: bool = False) -> None:
155        """Disconnect the SDK from the robot's server.
156
157        Args:
158            lost_connection: If `True`, indicates that the connection was lost unexpectedly.
159        """
160        if self._host in self._instances_by_host:
161            del self._instances_by_host[self._host]
162
163        if not self._grpc_connected:
164            self._logger.warning("Already disconnected from Reachy.")
165            return
166
167        self._grpc_connected = False
168        self._grpc_channel.close()
169        self._grpc_channel = None
170
171        self._head = None
172        self._r_arm = None
173        self._l_arm = None
174        self._mobile_base = None
175
176        self._logger.info("Disconnected from Reachy.")
177
178    def __repr__(self) -> str:
179        """Clean representation of a Reachy."""
180        if not self._grpc_connected or self.info is None:
181            return "Reachy is not connected"
182
183        s = "\n\t".join([part_name + ": " + str(part) for part_name, part in self.info._enabled_parts.items()])
184        repr_template = (
185            '<Reachy host="{host}" connected={connected} on={on} \n'
186            " battery_voltage={battery_voltage} \n"
187            " parts=\n\t{parts} \n>"
188        )
189        return repr_template.format(
190            host=self._host,
191            connected=self._grpc_connected,
192            on=self.is_on(),
193            battery_voltage=self.info.battery_voltage,
194            parts=s,
195        )
196
197    @property
198    def info(self) -> Optional[ReachyInfo]:
199        """Get ReachyInfo if connected."""
200        if not self._grpc_connected:
201            self._logger.error("Cannot get info, not connected to Reachy")
202            return None
203        return self._info
204
205    @property
206    def head(self) -> Optional[Head]:
207        """Get Reachy's head."""
208        if not self._grpc_connected:
209            self._logger.error("Cannot get head, not connected to Reachy")
210            return None
211        if self._head is None:
212            self._logger.error("head does not exist with this configuration")
213            return None
214        return self._head
215
216    @property
217    def r_arm(self) -> Optional[Arm]:
218        """Get Reachy's right arm."""
219        if not self._grpc_connected:
220            self._logger.error("Cannot get r_arm, not connected to Reachy")
221            return None
222        if self._r_arm is None:
223            self._logger.error("r_arm does not exist with this configuration")
224            return None
225        return self._r_arm
226
227    @property
228    def l_arm(self) -> Optional[Arm]:
229        """Get Reachy's left arm."""
230        if not self._grpc_connected:
231            self._logger.error("Cannot get l_arm, not connected to Reachy")
232            return None
233        if self._l_arm is None:
234            self._logger.error("l_arm does not exist with this configuration")
235            return None
236        return self._l_arm
237
238    @property
239    def mobile_base(self) -> Optional[MobileBase]:
240        """Get Reachy's mobile base."""
241        if not self._grpc_connected:
242            self._logger.error("Cannot get mobile_base, not connected to Reachy")
243            return None
244        if self._mobile_base is None:
245            self._logger.error("mobile_base does not exist with this configuration")
246            return None
247        return self._mobile_base
248
249    @property
250    def tripod(self) -> Optional[Tripod]:
251        """Get Reachy's fixed tripod."""
252        if not self._grpc_connected:
253            self._logger.error("Cannot get tripod, not connected to Reachy")
254            return None
255        if self._tripod is None:
256            self._logger.error("tripod does not exist with this configuration")
257            return None
258        return self._tripod
259
260    @property
261    def joints(self) -> CustomDict[str, OrbitaJoint]:
262        """Return a dictionary of all joints of the robot.
263
264        The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects.
265        """
266        if not self._grpc_connected or not self.info:
267            self._logger.warning("Cannot get joints, not connected to Reachy.")
268            return CustomDict({})
269        _joints: CustomDict[str, OrbitaJoint] = CustomDict({})
270        for part_name in self.info._enabled_parts:
271            part = getattr(self, part_name)
272            for joint_name, joint in part.joints.items():
273                _joints[part_name + "." + joint_name] = joint
274        return _joints
275
276    @property
277    def _actuators(self) -> Dict[str, Orbita2d | Orbita3d]:
278        """Return a dictionary of all actuators of the robot.
279
280        The dictionary keys are the actuator names, and the values are the corresponding actuator objects.
281        """
282        if not self._grpc_connected or not self.info:
283            self._logger.warning("Cannot get actuators, not connected to Reachy.")
284            return {}
285        _actuators: Dict[str, Orbita2d | Orbita3d] = {}
286        for part_name in self.info._enabled_parts:
287            part = getattr(self, part_name)
288            for actuator_name, actuator in part._actuators.items():
289                _actuators[part_name + "." + actuator_name] = actuator
290        return _actuators
291
292    def is_connected(self) -> bool:
293        """Check if the SDK is connected to the robot.
294
295        Returns:
296            `True` if connected, `False` otherwise.
297        """
298        return self._grpc_connected
299
300    @property
301    def _grpc_status(self) -> str:
302        """Get the current connection status of the robot's gRPC server.
303
304        Returns:
305            "connected" if connected, "disconnected" otherwise.
306        """
307        if self._grpc_connected:
308            return "connected"
309        else:
310            return "disconnected"
311
312    @_grpc_status.setter
313    def _grpc_status(self, status: str) -> None:
314        """Set the connection status of the robot's gRPC server.
315
316        Args:
317            status: The connection status to set, must be either "connected" or "disconnected".
318
319        Raises:
320            ValueError: If the status is not "connected" or "disconnected".
321        """
322        if status == "connected":
323            self._grpc_connected = True
324        elif status == "disconnected":
325            self._grpc_connected = False
326        else:
327            raise ValueError("_grpc_status can only be set to 'connected' or 'disconnected'")
328
329    @property
330    def cameras(self) -> Optional[CameraManager]:
331        """Get the camera manager if available and connected."""
332        if not self._grpc_connected:
333            self._logger.error("Cannot get cameras, not connected to Reachy")
334            return None
335        return self._cameras
336
337    def _get_info(self) -> None:
338        """Retrieve basic information about the robot.
339
340        Gathers data on the robot's parts, hardware and software versions, and serial number.
341        """
342        self._stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel)
343        try:
344            self._robot = self._stub.GetReachy(Empty())
345        except _InactiveRpcError:
346            raise ConnectionError()
347
348        self._info = ReachyInfo(self._robot)
349        self._grpc_connected = True
350
351    def _setup_audio(self) -> None:
352        """Initializes the audio grpc client."""
353        self.audio = Audio(self._host, self._audio_port)
354
355    def _setup_video(self) -> Optional[CameraManager]:
356        """Set up the video server for the robot.
357
358        Returns:
359            A CameraManager instance if the video server connection is successful, otherwise None.
360        """
361        try:
362            return CameraManager(self._host, self._video_port)
363
364        except Exception as e:
365            self._logger.error(f"Failed to connect to video server with error: {e}.\nReachySDK.video will not be available.")
366            return None
367
368    def _setup_part_r_arm(self, initial_state: ReachyState) -> None:
369        """Set up the robot's right arm based on the initial state."""
370        if not self.info:
371            self._logger.warning("Reachy is not connected")
372            return None
373
374        if self._robot.HasField("r_arm"):
375            if initial_state.r_arm_state.activated:
376                r_arm = Arm(self._robot.r_arm, initial_state.r_arm_state, self._grpc_channel, self._goto_stub)
377                self._r_arm = r_arm
378                self.info._enabled_parts["r_arm"] = self._r_arm
379                if self._robot.HasField("r_hand"):
380                    self._r_arm._init_hand(self._robot.r_hand, initial_state.r_hand_state)
381            else:
382                self.info._disabled_parts.append("r_arm")
383
384    def _setup_part_l_arm(self, initial_state: ReachyState) -> None:
385        """Set up the robot's left arm based on the initial state."""
386        if not self.info:
387            self._logger.warning("Reachy is not connected")
388            return None
389
390        if self._robot.HasField("l_arm"):
391            if initial_state.l_arm_state.activated:
392                l_arm = Arm(self._robot.l_arm, initial_state.l_arm_state, self._grpc_channel, self._goto_stub)
393                self._l_arm = l_arm
394                self.info._enabled_parts["l_arm"] = self._l_arm
395                if self._robot.HasField("l_hand"):
396                    self._l_arm._init_hand(self._robot.l_hand, initial_state.l_hand_state)
397            else:
398                self.info._disabled_parts.append("l_arm")
399
400    def _setup_part_mobile_base(self, initial_state: ReachyState) -> None:
401        """Set up the robot's mobile base based on the initial state."""
402        if not self.info:
403            self._logger.warning("Reachy is not connected")
404            return None
405
406        if self._robot.HasField("mobile_base"):
407            self._mobile_base = MobileBase(
408                self._robot.mobile_base, initial_state.mobile_base_state, self._grpc_channel, self._goto_stub
409            )
410            self.info._set_mobile_base(self._mobile_base)
411
412    def _setup_part_head(self, initial_state: ReachyState) -> None:
413        """Set up the robot's head based on the initial state."""
414        if not self.info:
415            self._logger.warning("Reachy is not connected")
416            return None
417
418        if self._robot.HasField("head"):
419            if initial_state.head_state.activated:
420                head = Head(self._robot.head, initial_state.head_state, self._grpc_channel, self._goto_stub)
421                self._head = head
422                self.info._enabled_parts["head"] = self._head
423            else:
424                self.info._disabled_parts.append("head")
425
426    def _setup_part_tripod(self, initial_state: ReachyState) -> None:
427        """Set up the robot's tripod based on the initial state."""
428        if not self.info:
429            self._logger.warning("Reachy is not connected")
430            return None
431
432        if self._robot.HasField("tripod"):
433            tripod = Tripod(self._robot.tripod, initial_state.tripod_state, self._grpc_channel)
434            self._tripod = tripod
435
436    def _setup_parts(self) -> None:
437        """Initialize all parts of the robot.
438
439        Retrieves the state of each part, creates instances, and adds them to the ReachySDK instance.
440        """
441        setup_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel)
442        self._goto_stub = GoToServiceStub(self._grpc_channel)
443        initial_state = setup_stub.GetReachyState(self._robot.id)
444
445        self._setup_part_r_arm(initial_state)
446        self._setup_part_l_arm(initial_state)
447        self._setup_part_head(initial_state)
448        self._setup_part_mobile_base(initial_state)
449        self._setup_part_tripod(initial_state)
450
451    def get_update_timestamp(self) -> int:
452        """Returns the timestamp (ns) of the last update.
453
454        The timestamp is generated by ROS running on Reachy.
455
456        Returns:
457            timestamp (int) in nanoseconds.
458        """
459        return self._update_timestamp.ToNanoseconds()
460
461    def _start_sync_in_bg(self) -> None:
462        """Start background synchronization with the robot."""
463        reachy_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel)
464        self._get_stream_update_loop(reachy_stub, freq=100)
465
466    def _get_stream_update_loop(self, reachy_stub: reachy_pb2_grpc.ReachyServiceStub, freq: float) -> None:
467        """Update the robot's state at a specified frequency.
468
469        Args:
470            reachy_stub: The gRPC stub for communication with the robot.
471            freq: The frequency (in Hz) at which to update the robot's state.
472        """
473        stream_req = reachy_pb2.ReachyStreamStateRequest(id=self._robot.id, publish_frequency=freq)
474        try:
475            for state_update in reachy_stub.StreamReachyState(stream_req):
476                self._update_timestamp = state_update.timestamp
477
478                self._update_part(self._l_arm, state_update.l_arm_state)
479                self._update_part(self._r_arm, state_update.r_arm_state)
480                self._update_part(self._head, state_update.head_state)
481                self._update_part(self._mobile_base, state_update.mobile_base_state)
482                self._update_part(self._tripod, state_update.tripod_state)
483
484                if self._l_arm and self._l_arm.gripper:
485                    self._l_arm.gripper._update_with(state_update.l_hand_state)
486                if self._r_arm and self._r_arm.gripper:
487                    self._r_arm.gripper._update_with(state_update.r_hand_state)
488
489        except grpc.RpcError as e:
490            if e.code() == grpc.StatusCode.CANCELLED:
491                self._logger.warning("Reachy gRPC stream is shutting down.")
492            else:
493                self._grpc_connected = False
494                raise ConnectionError(f"Connection with Reachy ip:{self._host} lost, check the SDK server status.")
495
496    def _update_part(self, part: Optional[Any], state: Any) -> None:
497        """Helper function to update a robot part if it exists."""
498        if part is not None:
499            part._update_with(state)
500
501    def _audit(self) -> None:
502        """Periodically perform an audit of the robot's components."""
503        while self._grpc_connected:
504            audit_status = self._stub.Audit(self._robot.id)
505            if self._l_arm is not None and audit_status.HasField("l_arm_status"):
506                self._l_arm._update_audit_status(audit_status.l_arm_status)
507                if self._l_arm.gripper is not None and audit_status.HasField("l_hand_status"):
508                    self._l_arm.gripper._update_audit_status(audit_status.l_hand_status)
509            if self._r_arm is not None and audit_status.HasField("r_arm_status"):
510                self._r_arm._update_audit_status(audit_status.r_arm_status)
511                if self._r_arm.gripper is not None and audit_status.HasField("r_hand_status"):
512                    self._r_arm.gripper._update_audit_status(audit_status.r_hand_status)
513            if self._head is not None and audit_status.HasField("head_status"):
514                self._head._update_audit_status(audit_status.head_status)
515            if self._mobile_base is not None and audit_status.HasField("mobile_base_status"):
516                self._mobile_base._update_audit_status(audit_status.mobile_base_status)
517            time.sleep(1)
518
519    @property
520    def audit(self) -> Dict[str, Dict[str, str]]:
521        """Return the audit status of all enabled parts of the robot."""
522        audit_dict: Dict[str, Dict[str, str]] = {}
523        if not self._grpc_connected or not self.info:
524            self._logger.warning("Reachy is not connected!")
525        if self.info is not None:
526            for part in self.info._enabled_parts.values():
527                audit_dict[part._part_id.name] = part.audit
528        return audit_dict
529
530    def turn_on(self) -> bool:
531        """Activate all motors of the robot's parts if all of them are not already turned on.
532
533        Returns:
534            `True` if successful, `False` otherwise.
535        """
536        if not self._grpc_connected or not self.info:
537            self._logger.warning("Cannot turn on Reachy, not connected.")
538            return False
539
540        speed_limit_high = 25
541        max_iterations = 10
542        ite = 0
543
544        while not self._is_fully_on() and ite < max_iterations:
545            for part in self.info._enabled_parts.values():
546                part.set_speed_limits(1)
547            time.sleep(0.05)
548            for part in self.info._enabled_parts.values():
549                part._turn_on()
550            if self._mobile_base is not None:
551                self._mobile_base._turn_on()
552            time.sleep(0.05)
553            for part in self.info._enabled_parts.values():
554                part.set_speed_limits(speed_limit_high)
555            time.sleep(0.4)
556            ite += 1
557
558        if ite == max_iterations:
559            self._logger.warning("Failed to turn on Reachy,")
560            return False
561
562        return True
563
564    def turn_off(self) -> bool:
565        """Turn all motors of enabled parts off.
566
567        All enabled parts' motors will then be compliant.
568        """
569        if not self._grpc_connected or not self.info:
570            self._logger.warning("Cannot turn off Reachy, not connected.")
571            return False
572        for part in self.info._enabled_parts.values():
573            part._turn_off()
574        if self._mobile_base is not None:
575            self._mobile_base._turn_off()
576        time.sleep(0.5)
577
578        return True
579
580    def turn_off_smoothly(self) -> bool:
581        """Turn all motors of robot parts off.
582
583        Arm torques are reduced during 3 seconds, then all parts' motors will be compliant.
584        """
585        if not self._grpc_connected or not self.info:
586            self._logger.warning("Cannot turn off Reachy, not connected.")
587            return False
588        speed_limit_high = 25
589        # Enough to sustain the arm weight
590        torque_limit_low = 50
591        torque_limit_high = 100
592        duration = 3
593        arms_list = []
594
595        if hasattr(self, "_mobile_base") and self._mobile_base is not None:
596            self._mobile_base._turn_off()
597        for part in self.info._enabled_parts.values():
598            if "arm" in part._part_id.name:
599                part.set_torque_limits(torque_limit_low)
600                part.set_speed_limits(speed_limit_high)
601                part.goto_posture(duration=duration, wait_for_goto_end=False)
602                arms_list.append(part)
603            else:
604                part._turn_off()
605
606        countingTime = 0
607        while countingTime < duration:
608            time.sleep(1)
609            torque_limit_low -= 15
610            for arm_part in arms_list:
611                arm_part.set_torque_limits(torque_limit_low)
612            countingTime += 1
613
614        for arm_part in arms_list:
615            arm_part._turn_off()
616            arm_part.set_torque_limits(torque_limit_high)
617
618        time.sleep(0.5)
619        return True
620
621    def is_on(self) -> bool:
622        """Check if all actuators of Reachy parts are on (stiff).
623
624        Returns:
625            `True` if all are stiff, `False` otherwise.
626        """
627        if not self.info:
628            self._logger.warning("Reachy is not connected!")
629            return False
630
631        for part in self.info._enabled_parts.values():
632            if not part.is_on():
633                return False
634        if self._mobile_base is not None and self._mobile_base.is_off():
635            return False
636        return True
637
638    def is_off(self) -> bool:
639        """Check if all actuators of Reachy parts are off (compliant).
640
641        Returns:
642            `True` if all are compliant, `False` otherwise.
643        """
644        if not self.info:
645            self._logger.warning("Reachy is not connected!")
646            return True
647
648        for part in self.info._enabled_parts.values():
649            if part.is_on():
650                return False
651        if self._mobile_base is not None and self._mobile_base.is_on():
652            return False
653        return True
654
655    def _is_fully_on(self) -> bool:
656        """Check if the robot and its grippers (if they exist) are turned on."""
657        return self.is_on() and all(arm.gripper.is_on() if arm and arm.gripper else True for arm in [self._l_arm, self._r_arm])
658
659    def reset_default_limits(self) -> None:
660        """Set back speed and torque limits of all parts to maximum value (100)."""
661        if not self.info:
662            self._logger.warning("Reachy is not connected!")
663            return
664
665        for part in self.info._enabled_parts.values():
666            if issubclass(type(part), JointsBasedPart):
667                part.set_speed_limits(100)
668                part.set_torque_limits(100)
669        time.sleep(0.5)
670
671    def goto_posture(
672        self,
673        common_posture: str = "default",
674        duration: float = 2,
675        wait: bool = False,
676        wait_for_goto_end: bool = True,
677        interpolation_mode: str = "minimum_jerk",
678        open_gripper: bool = False,
679    ) -> GoToHomeId:
680        """Move the robot to a predefined posture.
681
682        Args:
683            common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'.
684            duration: The time duration in seconds for the robot to move to the specified posture.
685                Defaults to 2.
686            wait: Determines whether the program should wait for the movement to finish before
687                returning. If set to `True`, the program waits for the movement to complete before continuing
688                execution. Defaults to `False`.
689            wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
690                only after all previous commands in the queue have been executed. If set to `False`, the program
691                will cancel all executing moves and queues. Defaults to `True`.
692            interpolation_mode: The type of interpolation used when moving the arm's joints.
693                Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
694            open_gripper: If `True`, the gripper will open, if `False`, it stays in its current position.
695                Defaults to `False`.
696
697        Returns:
698            A GoToHomeId containing movement GoToIds for each part.
699        """
700        if common_posture not in ["default", "elbow_90"]:
701            raise ValueError(f"common_posture {common_posture} not supported! Should be 'default' or 'elbow_90'")
702        head_id = None
703        r_arm_id = None
704        l_arm_id = None
705        if not wait_for_goto_end:
706            self.cancel_all_goto()
707        if self.head is not None:
708            is_last_commmand = self.r_arm is None and self.l_arm is None
709            wait_head = wait and is_last_commmand
710            head_id = self.head.goto_posture(
711                duration=duration, wait=wait_head, wait_for_goto_end=wait_for_goto_end, interpolation_mode=interpolation_mode
712            )
713        if self.r_arm is not None:
714            is_last_commmand = self.l_arm is None
715            wait_r_arm = wait and is_last_commmand
716            r_arm_id = self.r_arm.goto_posture(
717                common_posture,
718                duration=duration,
719                wait=wait_r_arm,
720                wait_for_goto_end=wait_for_goto_end,
721                interpolation_mode=interpolation_mode,
722                open_gripper=open_gripper,
723            )
724        if self.l_arm is not None:
725            l_arm_id = self.l_arm.goto_posture(
726                common_posture,
727                duration=duration,
728                wait=wait,
729                wait_for_goto_end=wait_for_goto_end,
730                interpolation_mode=interpolation_mode,
731                open_gripper=open_gripper,
732            )
733        ids = GoToHomeId(
734            head=head_id,
735            r_arm=r_arm_id,
736            l_arm=l_arm_id,
737        )
738        return ids
739
740    def is_goto_finished(self, goto_id: GoToId) -> bool:
741        """Check if a goto command has completed.
742
743        Args:
744            goto_id: The unique GoToId of the goto command.
745
746        Returns:
747            `True` if the command is complete, `False` otherwise.
748        """
749        if not self._grpc_connected:
750            self._logger.warning("Reachy is not connected!")
751            return False
752        if not isinstance(goto_id, GoToId):
753            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
754        if goto_id.id == -1:
755            self._logger.error("is_goto_finished() asked for unvalid movement. Goto not played.")
756            return True
757        state = self._get_goto_state(goto_id)
758        result = bool(
759            state.goal_status == GoalStatus.STATUS_ABORTED
760            or state.goal_status == GoalStatus.STATUS_CANCELED
761            or state.goal_status == GoalStatus.STATUS_SUCCEEDED
762        )
763        return result
764
765    def get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]:
766        """Retrieve the details of a goto command based on its GoToId.
767
768        Args:
769            goto_id: The ID of the goto command for which details are requested.
770
771        Returns:
772            A `SimplifiedRequest` object containing the part name, joint goal positions
773            (in degrees), movement duration, and interpolation mode.
774            Returns `None` if the robot is not connected or if the `goto_id` is invalid.
775
776        Raises:
777            TypeError: If `goto_id` is not an instance of `GoToId`.
778            ValueError: If `goto_id` is -1, indicating an invalid command.
779        """
780        if not self._grpc_connected:
781            self._logger.warning("Reachy is not connected!")
782            return None
783        if not isinstance(goto_id, GoToId):
784            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
785        if goto_id.id == -1:
786            raise ValueError("No answer was found for given move, goto_id is -1")
787
788        response = self._goto_stub.GetGoToRequest(goto_id)
789
790        full_request = process_goto_request(response)
791
792        return full_request
793
794    def _get_goto_state(self, goto_id: GoToId) -> GoToGoalStatus:
795        """Retrieve the current state of a goto command.
796
797        Args:
798            goto_id: The unique GoToId of the goto command.
799
800        Returns:
801            The current state of the command.
802        """
803        response = self._goto_stub.GetGoToState(goto_id)
804        return response
805
806    def cancel_goto_by_id(self, goto_id: GoToId) -> GoToAck:
807        """Request the cancellation of a specific goto command based on its GoToId.
808
809        Args:
810            goto_id: The ID of the goto command to cancel.
811
812        Returns:
813            A `GoToAck` object indicating whether the cancellation was acknowledged.
814            If the robot is not connected, returns None.
815
816        Raises:
817            TypeError: If `goto_id` is not an instance of `GoToId`.
818        """
819        if not self._grpc_connected:
820            self._logger.warning("Reachy is not connected!")
821            return None
822        if not isinstance(goto_id, GoToId):
823            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
824        if goto_id.id == -1:
825            self._logger.error("cancel_goto_by_id() asked for unvalid movement. Goto not played.")
826            return GoToAck(ack=True)
827        response = self._goto_stub.CancelGoTo(goto_id)
828        return response
829
830    def cancel_all_goto(self) -> GoToAck:
831        """Cancel all active goto commands.
832
833        Returns:
834             A `GoToAck` object indicating whether the cancellation was acknowledged.
835        """
836        if not self._grpc_connected:
837            self._logger.warning("Reachy is not connected!")
838            return None
839        response = self._goto_stub.CancelAllGoTo(Empty())
840        return response
841
842    def send_goal_positions(self, check_positions: bool = False) -> None:
843        """Send the goal positions to the robot.
844
845        If goal positions have been specified for any joint of the robot, sends them to the robot.
846
847        Args :
848            check_positions: A boolean indicating whether to check the positions after sending the command.
849                Defaults to True.
850        """
851        if not self.info:
852            self._logger.warning("Reachy is not connected!")
853            return
854
855        commands: Dict[str, ArmComponentsCommands | HeadComponentsCommands | HandPositionRequest] = {}
856        for part in [self.r_arm, self.l_arm, self.head]:
857            self._add_component_commands(part, commands, check_positions)
858
859        if self.r_arm is not None:
860            self._add_component_commands(self.r_arm.gripper, commands, check_positions)
861        if self.l_arm is not None:
862            self._add_component_commands(self.l_arm.gripper, commands, check_positions)
863
864        components_commands = ReachyComponentsCommands(**commands)
865        self._stub.SendComponentsCommands(components_commands)
866
867    def _add_component_commands(
868        self,
869        part: JointsBasedPart | Hand | None,
870        commands: Dict[str, HeadComponentsCommands | ArmComponentsCommands | HandPositionRequest],
871        check_positions: bool,
872    ) -> None:
873        """Get the current component commands."""
874        if part is not None:
875            if part.is_off():
876                self._logger.warning(f"{part._part_id.name} is off. Command not sent.")
877                return
878            part_command = part._get_goal_positions_message()
879            if part_command is not None:
880                commands[f"{part._part_id.name}_commands"] = part_command
881                part._clean_outgoing_goal_positions()
882                if check_positions:
883                    part._post_send_goal_positions()
class GoToHomeId(builtins.tuple):

Named tuple for easy access to goto request on full body

GoToHomeId(head, r_arm, l_arm)

Create new instance of GoToHomeId(head, r_arm, l_arm)

head

Alias for field number 0

r_arm

Alias for field number 1

l_arm

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class ReachySDK:
 53class ReachySDK:
 54    """The ReachySDK class manages the connection and interaction with a Reachy robot.
 55
 56    This class handles:
 57    - Establishing and maintaining a connection with the robot via gRPC.
 58    - Accessing and controlling various parts of the robot, such as the arms, head, and mobile base.
 59    - Managing robot components including actuators, joints, cameras, and audio.
 60    - Synchronizing robot state with the server in the background to keep data up-to-date.
 61    - Providing utility functions for common tasks such as turning on/off motors, sending goal positions,
 62        and performing movements.
 63    """
 64
 65    _instances_by_host: Dict[str, "ReachySDK"] = {}
 66
 67    def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK:
 68        """Ensure only one connected instance per IP is created."""
 69        if host in cls._instances_by_host:
 70            if cls._instances_by_host[host]._grpc_connected:
 71                return cls._instances_by_host[host]
 72            else:
 73                del cls._instances_by_host[host]
 74
 75        instance = super().__new__(cls)
 76        cls._instances_by_host[host] = instance
 77        return instance
 78
 79    def __init__(
 80        self,
 81        host: str,
 82        sdk_port: int = 50051,
 83        audio_port: int = 50063,
 84        video_port: int = 50065,
 85    ) -> None:
 86        """Initialize a connection to the robot.
 87
 88        Args:
 89            host: The IP address or hostname of the robot.
 90            sdk_port: The gRPC port for the SDK. Default is 50051.
 91            audio_port: The gRPC port for audio services. Default is 50063.
 92            video_port: The gRPC port for video services. Default is 50065.
 93        """
 94        self._logger = getLogger(__name__)
 95
 96        if hasattr(self, "_initialized"):
 97            self._logger.warning("An instance already exists.")
 98            return
 99
100        self._host = host
101        self._sdk_port = sdk_port
102        self._audio_port = audio_port
103        self._video_port = video_port
104
105        self._grpc_connected = False
106        self._initialized = True
107
108        self._r_arm: Optional[Arm] = None
109        self._l_arm: Optional[Arm] = None
110        self._head: Optional[Head] = None
111        self._cameras: Optional[CameraManager] = None
112        self._mobile_base: Optional[MobileBase] = None
113        self._info: Optional[ReachyInfo] = None
114        self._tripod: Optional[Tripod] = None
115
116        self._update_timestamp: Timestamp = Timestamp(seconds=0)
117
118        self.connect()
119
120    def connect(self) -> None:
121        """Connects the SDK to the robot."""
122        if self._grpc_connected:
123            self._logger.warning("Already connected to Reachy.")
124            return
125
126        self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}")
127
128        self._stop_flag = threading.Event()
129
130        try:
131            self._get_info()
132        except ConnectionError:
133            self._logger.error(
134                f"Could not connect to Reachy with on IP address {self._host}, "
135                "check that the sdk server is running and that the IP is correct."
136            )
137            self._grpc_connected = False
138            return
139
140        self._setup_parts()
141        self._setup_audio()
142        self._cameras = self._setup_video()
143
144        self._sync_thread = threading.Thread(target=self._start_sync_in_bg)
145        self._sync_thread.daemon = True
146        self._sync_thread.start()
147
148        self._audit_thread = threading.Thread(target=self._audit)
149        self._audit_thread.daemon = True
150        self._audit_thread.start()
151
152        self._grpc_connected = True
153        self._logger.info("Connected to Reachy.")
154
155    def disconnect(self, lost_connection: bool = False) -> None:
156        """Disconnect the SDK from the robot's server.
157
158        Args:
159            lost_connection: If `True`, indicates that the connection was lost unexpectedly.
160        """
161        if self._host in self._instances_by_host:
162            del self._instances_by_host[self._host]
163
164        if not self._grpc_connected:
165            self._logger.warning("Already disconnected from Reachy.")
166            return
167
168        self._grpc_connected = False
169        self._grpc_channel.close()
170        self._grpc_channel = None
171
172        self._head = None
173        self._r_arm = None
174        self._l_arm = None
175        self._mobile_base = None
176
177        self._logger.info("Disconnected from Reachy.")
178
179    def __repr__(self) -> str:
180        """Clean representation of a Reachy."""
181        if not self._grpc_connected or self.info is None:
182            return "Reachy is not connected"
183
184        s = "\n\t".join([part_name + ": " + str(part) for part_name, part in self.info._enabled_parts.items()])
185        repr_template = (
186            '<Reachy host="{host}" connected={connected} on={on} \n'
187            " battery_voltage={battery_voltage} \n"
188            " parts=\n\t{parts} \n>"
189        )
190        return repr_template.format(
191            host=self._host,
192            connected=self._grpc_connected,
193            on=self.is_on(),
194            battery_voltage=self.info.battery_voltage,
195            parts=s,
196        )
197
198    @property
199    def info(self) -> Optional[ReachyInfo]:
200        """Get ReachyInfo if connected."""
201        if not self._grpc_connected:
202            self._logger.error("Cannot get info, not connected to Reachy")
203            return None
204        return self._info
205
206    @property
207    def head(self) -> Optional[Head]:
208        """Get Reachy's head."""
209        if not self._grpc_connected:
210            self._logger.error("Cannot get head, not connected to Reachy")
211            return None
212        if self._head is None:
213            self._logger.error("head does not exist with this configuration")
214            return None
215        return self._head
216
217    @property
218    def r_arm(self) -> Optional[Arm]:
219        """Get Reachy's right arm."""
220        if not self._grpc_connected:
221            self._logger.error("Cannot get r_arm, not connected to Reachy")
222            return None
223        if self._r_arm is None:
224            self._logger.error("r_arm does not exist with this configuration")
225            return None
226        return self._r_arm
227
228    @property
229    def l_arm(self) -> Optional[Arm]:
230        """Get Reachy's left arm."""
231        if not self._grpc_connected:
232            self._logger.error("Cannot get l_arm, not connected to Reachy")
233            return None
234        if self._l_arm is None:
235            self._logger.error("l_arm does not exist with this configuration")
236            return None
237        return self._l_arm
238
239    @property
240    def mobile_base(self) -> Optional[MobileBase]:
241        """Get Reachy's mobile base."""
242        if not self._grpc_connected:
243            self._logger.error("Cannot get mobile_base, not connected to Reachy")
244            return None
245        if self._mobile_base is None:
246            self._logger.error("mobile_base does not exist with this configuration")
247            return None
248        return self._mobile_base
249
250    @property
251    def tripod(self) -> Optional[Tripod]:
252        """Get Reachy's fixed tripod."""
253        if not self._grpc_connected:
254            self._logger.error("Cannot get tripod, not connected to Reachy")
255            return None
256        if self._tripod is None:
257            self._logger.error("tripod does not exist with this configuration")
258            return None
259        return self._tripod
260
261    @property
262    def joints(self) -> CustomDict[str, OrbitaJoint]:
263        """Return a dictionary of all joints of the robot.
264
265        The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects.
266        """
267        if not self._grpc_connected or not self.info:
268            self._logger.warning("Cannot get joints, not connected to Reachy.")
269            return CustomDict({})
270        _joints: CustomDict[str, OrbitaJoint] = CustomDict({})
271        for part_name in self.info._enabled_parts:
272            part = getattr(self, part_name)
273            for joint_name, joint in part.joints.items():
274                _joints[part_name + "." + joint_name] = joint
275        return _joints
276
277    @property
278    def _actuators(self) -> Dict[str, Orbita2d | Orbita3d]:
279        """Return a dictionary of all actuators of the robot.
280
281        The dictionary keys are the actuator names, and the values are the corresponding actuator objects.
282        """
283        if not self._grpc_connected or not self.info:
284            self._logger.warning("Cannot get actuators, not connected to Reachy.")
285            return {}
286        _actuators: Dict[str, Orbita2d | Orbita3d] = {}
287        for part_name in self.info._enabled_parts:
288            part = getattr(self, part_name)
289            for actuator_name, actuator in part._actuators.items():
290                _actuators[part_name + "." + actuator_name] = actuator
291        return _actuators
292
293    def is_connected(self) -> bool:
294        """Check if the SDK is connected to the robot.
295
296        Returns:
297            `True` if connected, `False` otherwise.
298        """
299        return self._grpc_connected
300
301    @property
302    def _grpc_status(self) -> str:
303        """Get the current connection status of the robot's gRPC server.
304
305        Returns:
306            "connected" if connected, "disconnected" otherwise.
307        """
308        if self._grpc_connected:
309            return "connected"
310        else:
311            return "disconnected"
312
313    @_grpc_status.setter
314    def _grpc_status(self, status: str) -> None:
315        """Set the connection status of the robot's gRPC server.
316
317        Args:
318            status: The connection status to set, must be either "connected" or "disconnected".
319
320        Raises:
321            ValueError: If the status is not "connected" or "disconnected".
322        """
323        if status == "connected":
324            self._grpc_connected = True
325        elif status == "disconnected":
326            self._grpc_connected = False
327        else:
328            raise ValueError("_grpc_status can only be set to 'connected' or 'disconnected'")
329
330    @property
331    def cameras(self) -> Optional[CameraManager]:
332        """Get the camera manager if available and connected."""
333        if not self._grpc_connected:
334            self._logger.error("Cannot get cameras, not connected to Reachy")
335            return None
336        return self._cameras
337
338    def _get_info(self) -> None:
339        """Retrieve basic information about the robot.
340
341        Gathers data on the robot's parts, hardware and software versions, and serial number.
342        """
343        self._stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel)
344        try:
345            self._robot = self._stub.GetReachy(Empty())
346        except _InactiveRpcError:
347            raise ConnectionError()
348
349        self._info = ReachyInfo(self._robot)
350        self._grpc_connected = True
351
352    def _setup_audio(self) -> None:
353        """Initializes the audio grpc client."""
354        self.audio = Audio(self._host, self._audio_port)
355
356    def _setup_video(self) -> Optional[CameraManager]:
357        """Set up the video server for the robot.
358
359        Returns:
360            A CameraManager instance if the video server connection is successful, otherwise None.
361        """
362        try:
363            return CameraManager(self._host, self._video_port)
364
365        except Exception as e:
366            self._logger.error(f"Failed to connect to video server with error: {e}.\nReachySDK.video will not be available.")
367            return None
368
369    def _setup_part_r_arm(self, initial_state: ReachyState) -> None:
370        """Set up the robot's right arm based on the initial state."""
371        if not self.info:
372            self._logger.warning("Reachy is not connected")
373            return None
374
375        if self._robot.HasField("r_arm"):
376            if initial_state.r_arm_state.activated:
377                r_arm = Arm(self._robot.r_arm, initial_state.r_arm_state, self._grpc_channel, self._goto_stub)
378                self._r_arm = r_arm
379                self.info._enabled_parts["r_arm"] = self._r_arm
380                if self._robot.HasField("r_hand"):
381                    self._r_arm._init_hand(self._robot.r_hand, initial_state.r_hand_state)
382            else:
383                self.info._disabled_parts.append("r_arm")
384
385    def _setup_part_l_arm(self, initial_state: ReachyState) -> None:
386        """Set up the robot's left arm based on the initial state."""
387        if not self.info:
388            self._logger.warning("Reachy is not connected")
389            return None
390
391        if self._robot.HasField("l_arm"):
392            if initial_state.l_arm_state.activated:
393                l_arm = Arm(self._robot.l_arm, initial_state.l_arm_state, self._grpc_channel, self._goto_stub)
394                self._l_arm = l_arm
395                self.info._enabled_parts["l_arm"] = self._l_arm
396                if self._robot.HasField("l_hand"):
397                    self._l_arm._init_hand(self._robot.l_hand, initial_state.l_hand_state)
398            else:
399                self.info._disabled_parts.append("l_arm")
400
401    def _setup_part_mobile_base(self, initial_state: ReachyState) -> None:
402        """Set up the robot's mobile base based on the initial state."""
403        if not self.info:
404            self._logger.warning("Reachy is not connected")
405            return None
406
407        if self._robot.HasField("mobile_base"):
408            self._mobile_base = MobileBase(
409                self._robot.mobile_base, initial_state.mobile_base_state, self._grpc_channel, self._goto_stub
410            )
411            self.info._set_mobile_base(self._mobile_base)
412
413    def _setup_part_head(self, initial_state: ReachyState) -> None:
414        """Set up the robot's head based on the initial state."""
415        if not self.info:
416            self._logger.warning("Reachy is not connected")
417            return None
418
419        if self._robot.HasField("head"):
420            if initial_state.head_state.activated:
421                head = Head(self._robot.head, initial_state.head_state, self._grpc_channel, self._goto_stub)
422                self._head = head
423                self.info._enabled_parts["head"] = self._head
424            else:
425                self.info._disabled_parts.append("head")
426
427    def _setup_part_tripod(self, initial_state: ReachyState) -> None:
428        """Set up the robot's tripod based on the initial state."""
429        if not self.info:
430            self._logger.warning("Reachy is not connected")
431            return None
432
433        if self._robot.HasField("tripod"):
434            tripod = Tripod(self._robot.tripod, initial_state.tripod_state, self._grpc_channel)
435            self._tripod = tripod
436
437    def _setup_parts(self) -> None:
438        """Initialize all parts of the robot.
439
440        Retrieves the state of each part, creates instances, and adds them to the ReachySDK instance.
441        """
442        setup_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel)
443        self._goto_stub = GoToServiceStub(self._grpc_channel)
444        initial_state = setup_stub.GetReachyState(self._robot.id)
445
446        self._setup_part_r_arm(initial_state)
447        self._setup_part_l_arm(initial_state)
448        self._setup_part_head(initial_state)
449        self._setup_part_mobile_base(initial_state)
450        self._setup_part_tripod(initial_state)
451
452    def get_update_timestamp(self) -> int:
453        """Returns the timestamp (ns) of the last update.
454
455        The timestamp is generated by ROS running on Reachy.
456
457        Returns:
458            timestamp (int) in nanoseconds.
459        """
460        return self._update_timestamp.ToNanoseconds()
461
462    def _start_sync_in_bg(self) -> None:
463        """Start background synchronization with the robot."""
464        reachy_stub = reachy_pb2_grpc.ReachyServiceStub(self._grpc_channel)
465        self._get_stream_update_loop(reachy_stub, freq=100)
466
467    def _get_stream_update_loop(self, reachy_stub: reachy_pb2_grpc.ReachyServiceStub, freq: float) -> None:
468        """Update the robot's state at a specified frequency.
469
470        Args:
471            reachy_stub: The gRPC stub for communication with the robot.
472            freq: The frequency (in Hz) at which to update the robot's state.
473        """
474        stream_req = reachy_pb2.ReachyStreamStateRequest(id=self._robot.id, publish_frequency=freq)
475        try:
476            for state_update in reachy_stub.StreamReachyState(stream_req):
477                self._update_timestamp = state_update.timestamp
478
479                self._update_part(self._l_arm, state_update.l_arm_state)
480                self._update_part(self._r_arm, state_update.r_arm_state)
481                self._update_part(self._head, state_update.head_state)
482                self._update_part(self._mobile_base, state_update.mobile_base_state)
483                self._update_part(self._tripod, state_update.tripod_state)
484
485                if self._l_arm and self._l_arm.gripper:
486                    self._l_arm.gripper._update_with(state_update.l_hand_state)
487                if self._r_arm and self._r_arm.gripper:
488                    self._r_arm.gripper._update_with(state_update.r_hand_state)
489
490        except grpc.RpcError as e:
491            if e.code() == grpc.StatusCode.CANCELLED:
492                self._logger.warning("Reachy gRPC stream is shutting down.")
493            else:
494                self._grpc_connected = False
495                raise ConnectionError(f"Connection with Reachy ip:{self._host} lost, check the SDK server status.")
496
497    def _update_part(self, part: Optional[Any], state: Any) -> None:
498        """Helper function to update a robot part if it exists."""
499        if part is not None:
500            part._update_with(state)
501
502    def _audit(self) -> None:
503        """Periodically perform an audit of the robot's components."""
504        while self._grpc_connected:
505            audit_status = self._stub.Audit(self._robot.id)
506            if self._l_arm is not None and audit_status.HasField("l_arm_status"):
507                self._l_arm._update_audit_status(audit_status.l_arm_status)
508                if self._l_arm.gripper is not None and audit_status.HasField("l_hand_status"):
509                    self._l_arm.gripper._update_audit_status(audit_status.l_hand_status)
510            if self._r_arm is not None and audit_status.HasField("r_arm_status"):
511                self._r_arm._update_audit_status(audit_status.r_arm_status)
512                if self._r_arm.gripper is not None and audit_status.HasField("r_hand_status"):
513                    self._r_arm.gripper._update_audit_status(audit_status.r_hand_status)
514            if self._head is not None and audit_status.HasField("head_status"):
515                self._head._update_audit_status(audit_status.head_status)
516            if self._mobile_base is not None and audit_status.HasField("mobile_base_status"):
517                self._mobile_base._update_audit_status(audit_status.mobile_base_status)
518            time.sleep(1)
519
520    @property
521    def audit(self) -> Dict[str, Dict[str, str]]:
522        """Return the audit status of all enabled parts of the robot."""
523        audit_dict: Dict[str, Dict[str, str]] = {}
524        if not self._grpc_connected or not self.info:
525            self._logger.warning("Reachy is not connected!")
526        if self.info is not None:
527            for part in self.info._enabled_parts.values():
528                audit_dict[part._part_id.name] = part.audit
529        return audit_dict
530
531    def turn_on(self) -> bool:
532        """Activate all motors of the robot's parts if all of them are not already turned on.
533
534        Returns:
535            `True` if successful, `False` otherwise.
536        """
537        if not self._grpc_connected or not self.info:
538            self._logger.warning("Cannot turn on Reachy, not connected.")
539            return False
540
541        speed_limit_high = 25
542        max_iterations = 10
543        ite = 0
544
545        while not self._is_fully_on() and ite < max_iterations:
546            for part in self.info._enabled_parts.values():
547                part.set_speed_limits(1)
548            time.sleep(0.05)
549            for part in self.info._enabled_parts.values():
550                part._turn_on()
551            if self._mobile_base is not None:
552                self._mobile_base._turn_on()
553            time.sleep(0.05)
554            for part in self.info._enabled_parts.values():
555                part.set_speed_limits(speed_limit_high)
556            time.sleep(0.4)
557            ite += 1
558
559        if ite == max_iterations:
560            self._logger.warning("Failed to turn on Reachy,")
561            return False
562
563        return True
564
565    def turn_off(self) -> bool:
566        """Turn all motors of enabled parts off.
567
568        All enabled parts' motors will then be compliant.
569        """
570        if not self._grpc_connected or not self.info:
571            self._logger.warning("Cannot turn off Reachy, not connected.")
572            return False
573        for part in self.info._enabled_parts.values():
574            part._turn_off()
575        if self._mobile_base is not None:
576            self._mobile_base._turn_off()
577        time.sleep(0.5)
578
579        return True
580
581    def turn_off_smoothly(self) -> bool:
582        """Turn all motors of robot parts off.
583
584        Arm torques are reduced during 3 seconds, then all parts' motors will be compliant.
585        """
586        if not self._grpc_connected or not self.info:
587            self._logger.warning("Cannot turn off Reachy, not connected.")
588            return False
589        speed_limit_high = 25
590        # Enough to sustain the arm weight
591        torque_limit_low = 50
592        torque_limit_high = 100
593        duration = 3
594        arms_list = []
595
596        if hasattr(self, "_mobile_base") and self._mobile_base is not None:
597            self._mobile_base._turn_off()
598        for part in self.info._enabled_parts.values():
599            if "arm" in part._part_id.name:
600                part.set_torque_limits(torque_limit_low)
601                part.set_speed_limits(speed_limit_high)
602                part.goto_posture(duration=duration, wait_for_goto_end=False)
603                arms_list.append(part)
604            else:
605                part._turn_off()
606
607        countingTime = 0
608        while countingTime < duration:
609            time.sleep(1)
610            torque_limit_low -= 15
611            for arm_part in arms_list:
612                arm_part.set_torque_limits(torque_limit_low)
613            countingTime += 1
614
615        for arm_part in arms_list:
616            arm_part._turn_off()
617            arm_part.set_torque_limits(torque_limit_high)
618
619        time.sleep(0.5)
620        return True
621
622    def is_on(self) -> bool:
623        """Check if all actuators of Reachy parts are on (stiff).
624
625        Returns:
626            `True` if all are stiff, `False` otherwise.
627        """
628        if not self.info:
629            self._logger.warning("Reachy is not connected!")
630            return False
631
632        for part in self.info._enabled_parts.values():
633            if not part.is_on():
634                return False
635        if self._mobile_base is not None and self._mobile_base.is_off():
636            return False
637        return True
638
639    def is_off(self) -> bool:
640        """Check if all actuators of Reachy parts are off (compliant).
641
642        Returns:
643            `True` if all are compliant, `False` otherwise.
644        """
645        if not self.info:
646            self._logger.warning("Reachy is not connected!")
647            return True
648
649        for part in self.info._enabled_parts.values():
650            if part.is_on():
651                return False
652        if self._mobile_base is not None and self._mobile_base.is_on():
653            return False
654        return True
655
656    def _is_fully_on(self) -> bool:
657        """Check if the robot and its grippers (if they exist) are turned on."""
658        return self.is_on() and all(arm.gripper.is_on() if arm and arm.gripper else True for arm in [self._l_arm, self._r_arm])
659
660    def reset_default_limits(self) -> None:
661        """Set back speed and torque limits of all parts to maximum value (100)."""
662        if not self.info:
663            self._logger.warning("Reachy is not connected!")
664            return
665
666        for part in self.info._enabled_parts.values():
667            if issubclass(type(part), JointsBasedPart):
668                part.set_speed_limits(100)
669                part.set_torque_limits(100)
670        time.sleep(0.5)
671
672    def goto_posture(
673        self,
674        common_posture: str = "default",
675        duration: float = 2,
676        wait: bool = False,
677        wait_for_goto_end: bool = True,
678        interpolation_mode: str = "minimum_jerk",
679        open_gripper: bool = False,
680    ) -> GoToHomeId:
681        """Move the robot to a predefined posture.
682
683        Args:
684            common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'.
685            duration: The time duration in seconds for the robot to move to the specified posture.
686                Defaults to 2.
687            wait: Determines whether the program should wait for the movement to finish before
688                returning. If set to `True`, the program waits for the movement to complete before continuing
689                execution. Defaults to `False`.
690            wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
691                only after all previous commands in the queue have been executed. If set to `False`, the program
692                will cancel all executing moves and queues. Defaults to `True`.
693            interpolation_mode: The type of interpolation used when moving the arm's joints.
694                Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
695            open_gripper: If `True`, the gripper will open, if `False`, it stays in its current position.
696                Defaults to `False`.
697
698        Returns:
699            A GoToHomeId containing movement GoToIds for each part.
700        """
701        if common_posture not in ["default", "elbow_90"]:
702            raise ValueError(f"common_posture {common_posture} not supported! Should be 'default' or 'elbow_90'")
703        head_id = None
704        r_arm_id = None
705        l_arm_id = None
706        if not wait_for_goto_end:
707            self.cancel_all_goto()
708        if self.head is not None:
709            is_last_commmand = self.r_arm is None and self.l_arm is None
710            wait_head = wait and is_last_commmand
711            head_id = self.head.goto_posture(
712                duration=duration, wait=wait_head, wait_for_goto_end=wait_for_goto_end, interpolation_mode=interpolation_mode
713            )
714        if self.r_arm is not None:
715            is_last_commmand = self.l_arm is None
716            wait_r_arm = wait and is_last_commmand
717            r_arm_id = self.r_arm.goto_posture(
718                common_posture,
719                duration=duration,
720                wait=wait_r_arm,
721                wait_for_goto_end=wait_for_goto_end,
722                interpolation_mode=interpolation_mode,
723                open_gripper=open_gripper,
724            )
725        if self.l_arm is not None:
726            l_arm_id = self.l_arm.goto_posture(
727                common_posture,
728                duration=duration,
729                wait=wait,
730                wait_for_goto_end=wait_for_goto_end,
731                interpolation_mode=interpolation_mode,
732                open_gripper=open_gripper,
733            )
734        ids = GoToHomeId(
735            head=head_id,
736            r_arm=r_arm_id,
737            l_arm=l_arm_id,
738        )
739        return ids
740
741    def is_goto_finished(self, goto_id: GoToId) -> bool:
742        """Check if a goto command has completed.
743
744        Args:
745            goto_id: The unique GoToId of the goto command.
746
747        Returns:
748            `True` if the command is complete, `False` otherwise.
749        """
750        if not self._grpc_connected:
751            self._logger.warning("Reachy is not connected!")
752            return False
753        if not isinstance(goto_id, GoToId):
754            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
755        if goto_id.id == -1:
756            self._logger.error("is_goto_finished() asked for unvalid movement. Goto not played.")
757            return True
758        state = self._get_goto_state(goto_id)
759        result = bool(
760            state.goal_status == GoalStatus.STATUS_ABORTED
761            or state.goal_status == GoalStatus.STATUS_CANCELED
762            or state.goal_status == GoalStatus.STATUS_SUCCEEDED
763        )
764        return result
765
766    def get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]:
767        """Retrieve the details of a goto command based on its GoToId.
768
769        Args:
770            goto_id: The ID of the goto command for which details are requested.
771
772        Returns:
773            A `SimplifiedRequest` object containing the part name, joint goal positions
774            (in degrees), movement duration, and interpolation mode.
775            Returns `None` if the robot is not connected or if the `goto_id` is invalid.
776
777        Raises:
778            TypeError: If `goto_id` is not an instance of `GoToId`.
779            ValueError: If `goto_id` is -1, indicating an invalid command.
780        """
781        if not self._grpc_connected:
782            self._logger.warning("Reachy is not connected!")
783            return None
784        if not isinstance(goto_id, GoToId):
785            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
786        if goto_id.id == -1:
787            raise ValueError("No answer was found for given move, goto_id is -1")
788
789        response = self._goto_stub.GetGoToRequest(goto_id)
790
791        full_request = process_goto_request(response)
792
793        return full_request
794
795    def _get_goto_state(self, goto_id: GoToId) -> GoToGoalStatus:
796        """Retrieve the current state of a goto command.
797
798        Args:
799            goto_id: The unique GoToId of the goto command.
800
801        Returns:
802            The current state of the command.
803        """
804        response = self._goto_stub.GetGoToState(goto_id)
805        return response
806
807    def cancel_goto_by_id(self, goto_id: GoToId) -> GoToAck:
808        """Request the cancellation of a specific goto command based on its GoToId.
809
810        Args:
811            goto_id: The ID of the goto command to cancel.
812
813        Returns:
814            A `GoToAck` object indicating whether the cancellation was acknowledged.
815            If the robot is not connected, returns None.
816
817        Raises:
818            TypeError: If `goto_id` is not an instance of `GoToId`.
819        """
820        if not self._grpc_connected:
821            self._logger.warning("Reachy is not connected!")
822            return None
823        if not isinstance(goto_id, GoToId):
824            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
825        if goto_id.id == -1:
826            self._logger.error("cancel_goto_by_id() asked for unvalid movement. Goto not played.")
827            return GoToAck(ack=True)
828        response = self._goto_stub.CancelGoTo(goto_id)
829        return response
830
831    def cancel_all_goto(self) -> GoToAck:
832        """Cancel all active goto commands.
833
834        Returns:
835             A `GoToAck` object indicating whether the cancellation was acknowledged.
836        """
837        if not self._grpc_connected:
838            self._logger.warning("Reachy is not connected!")
839            return None
840        response = self._goto_stub.CancelAllGoTo(Empty())
841        return response
842
843    def send_goal_positions(self, check_positions: bool = False) -> None:
844        """Send the goal positions to the robot.
845
846        If goal positions have been specified for any joint of the robot, sends them to the robot.
847
848        Args :
849            check_positions: A boolean indicating whether to check the positions after sending the command.
850                Defaults to True.
851        """
852        if not self.info:
853            self._logger.warning("Reachy is not connected!")
854            return
855
856        commands: Dict[str, ArmComponentsCommands | HeadComponentsCommands | HandPositionRequest] = {}
857        for part in [self.r_arm, self.l_arm, self.head]:
858            self._add_component_commands(part, commands, check_positions)
859
860        if self.r_arm is not None:
861            self._add_component_commands(self.r_arm.gripper, commands, check_positions)
862        if self.l_arm is not None:
863            self._add_component_commands(self.l_arm.gripper, commands, check_positions)
864
865        components_commands = ReachyComponentsCommands(**commands)
866        self._stub.SendComponentsCommands(components_commands)
867
868    def _add_component_commands(
869        self,
870        part: JointsBasedPart | Hand | None,
871        commands: Dict[str, HeadComponentsCommands | ArmComponentsCommands | HandPositionRequest],
872        check_positions: bool,
873    ) -> None:
874        """Get the current component commands."""
875        if part is not None:
876            if part.is_off():
877                self._logger.warning(f"{part._part_id.name} is off. Command not sent.")
878                return
879            part_command = part._get_goal_positions_message()
880            if part_command is not None:
881                commands[f"{part._part_id.name}_commands"] = part_command
882                part._clean_outgoing_goal_positions()
883                if check_positions:
884                    part._post_send_goal_positions()

The ReachySDK class manages the connection and interaction with a Reachy robot.

This class handles:

  • Establishing and maintaining a connection with the robot via gRPC.
  • Accessing and controlling various parts of the robot, such as the arms, head, and mobile base.
  • Managing robot components including actuators, joints, cameras, and audio.
  • Synchronizing robot state with the server in the background to keep data up-to-date.
  • Providing utility functions for common tasks such as turning on/off motors, sending goal positions, and performing movements.
ReachySDK( host: str, sdk_port: int = 50051, audio_port: int = 50063, video_port: int = 50065)
 79    def __init__(
 80        self,
 81        host: str,
 82        sdk_port: int = 50051,
 83        audio_port: int = 50063,
 84        video_port: int = 50065,
 85    ) -> None:
 86        """Initialize a connection to the robot.
 87
 88        Args:
 89            host: The IP address or hostname of the robot.
 90            sdk_port: The gRPC port for the SDK. Default is 50051.
 91            audio_port: The gRPC port for audio services. Default is 50063.
 92            video_port: The gRPC port for video services. Default is 50065.
 93        """
 94        self._logger = getLogger(__name__)
 95
 96        if hasattr(self, "_initialized"):
 97            self._logger.warning("An instance already exists.")
 98            return
 99
100        self._host = host
101        self._sdk_port = sdk_port
102        self._audio_port = audio_port
103        self._video_port = video_port
104
105        self._grpc_connected = False
106        self._initialized = True
107
108        self._r_arm: Optional[Arm] = None
109        self._l_arm: Optional[Arm] = None
110        self._head: Optional[Head] = None
111        self._cameras: Optional[CameraManager] = None
112        self._mobile_base: Optional[MobileBase] = None
113        self._info: Optional[ReachyInfo] = None
114        self._tripod: Optional[Tripod] = None
115
116        self._update_timestamp: Timestamp = Timestamp(seconds=0)
117
118        self.connect()

Initialize a connection to the robot.

Arguments:
  • host: The IP address or hostname of the robot.
  • sdk_port: The gRPC port for the SDK. Default is 50051.
  • audio_port: The gRPC port for audio services. Default is 50063.
  • video_port: The gRPC port for video services. Default is 50065.
def connect(self) -> None:
120    def connect(self) -> None:
121        """Connects the SDK to the robot."""
122        if self._grpc_connected:
123            self._logger.warning("Already connected to Reachy.")
124            return
125
126        self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}")
127
128        self._stop_flag = threading.Event()
129
130        try:
131            self._get_info()
132        except ConnectionError:
133            self._logger.error(
134                f"Could not connect to Reachy with on IP address {self._host}, "
135                "check that the sdk server is running and that the IP is correct."
136            )
137            self._grpc_connected = False
138            return
139
140        self._setup_parts()
141        self._setup_audio()
142        self._cameras = self._setup_video()
143
144        self._sync_thread = threading.Thread(target=self._start_sync_in_bg)
145        self._sync_thread.daemon = True
146        self._sync_thread.start()
147
148        self._audit_thread = threading.Thread(target=self._audit)
149        self._audit_thread.daemon = True
150        self._audit_thread.start()
151
152        self._grpc_connected = True
153        self._logger.info("Connected to Reachy.")

Connects the SDK to the robot.

def disconnect(self, lost_connection: bool = False) -> None:
155    def disconnect(self, lost_connection: bool = False) -> None:
156        """Disconnect the SDK from the robot's server.
157
158        Args:
159            lost_connection: If `True`, indicates that the connection was lost unexpectedly.
160        """
161        if self._host in self._instances_by_host:
162            del self._instances_by_host[self._host]
163
164        if not self._grpc_connected:
165            self._logger.warning("Already disconnected from Reachy.")
166            return
167
168        self._grpc_connected = False
169        self._grpc_channel.close()
170        self._grpc_channel = None
171
172        self._head = None
173        self._r_arm = None
174        self._l_arm = None
175        self._mobile_base = None
176
177        self._logger.info("Disconnected from Reachy.")

Disconnect the SDK from the robot's server.

Arguments:
  • lost_connection: If True, indicates that the connection was lost unexpectedly.
info: Optional[reachy2_sdk.config.reachy_info.ReachyInfo]
198    @property
199    def info(self) -> Optional[ReachyInfo]:
200        """Get ReachyInfo if connected."""
201        if not self._grpc_connected:
202            self._logger.error("Cannot get info, not connected to Reachy")
203            return None
204        return self._info

Get ReachyInfo if connected.

head: Optional[reachy2_sdk.parts.head.Head]
206    @property
207    def head(self) -> Optional[Head]:
208        """Get Reachy's head."""
209        if not self._grpc_connected:
210            self._logger.error("Cannot get head, not connected to Reachy")
211            return None
212        if self._head is None:
213            self._logger.error("head does not exist with this configuration")
214            return None
215        return self._head

Get Reachy's head.

r_arm: Optional[reachy2_sdk.parts.arm.Arm]
217    @property
218    def r_arm(self) -> Optional[Arm]:
219        """Get Reachy's right arm."""
220        if not self._grpc_connected:
221            self._logger.error("Cannot get r_arm, not connected to Reachy")
222            return None
223        if self._r_arm is None:
224            self._logger.error("r_arm does not exist with this configuration")
225            return None
226        return self._r_arm

Get Reachy's right arm.

l_arm: Optional[reachy2_sdk.parts.arm.Arm]
228    @property
229    def l_arm(self) -> Optional[Arm]:
230        """Get Reachy's left arm."""
231        if not self._grpc_connected:
232            self._logger.error("Cannot get l_arm, not connected to Reachy")
233            return None
234        if self._l_arm is None:
235            self._logger.error("l_arm does not exist with this configuration")
236            return None
237        return self._l_arm

Get Reachy's left arm.

mobile_base: Optional[reachy2_sdk.parts.mobile_base.MobileBase]
239    @property
240    def mobile_base(self) -> Optional[MobileBase]:
241        """Get Reachy's mobile base."""
242        if not self._grpc_connected:
243            self._logger.error("Cannot get mobile_base, not connected to Reachy")
244            return None
245        if self._mobile_base is None:
246            self._logger.error("mobile_base does not exist with this configuration")
247            return None
248        return self._mobile_base

Get Reachy's mobile base.

tripod: Optional[reachy2_sdk.parts.tripod.Tripod]
250    @property
251    def tripod(self) -> Optional[Tripod]:
252        """Get Reachy's fixed tripod."""
253        if not self._grpc_connected:
254            self._logger.error("Cannot get tripod, not connected to Reachy")
255            return None
256        if self._tripod is None:
257            self._logger.error("tripod does not exist with this configuration")
258            return None
259        return self._tripod

Get Reachy's fixed tripod.

261    @property
262    def joints(self) -> CustomDict[str, OrbitaJoint]:
263        """Return a dictionary of all joints of the robot.
264
265        The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects.
266        """
267        if not self._grpc_connected or not self.info:
268            self._logger.warning("Cannot get joints, not connected to Reachy.")
269            return CustomDict({})
270        _joints: CustomDict[str, OrbitaJoint] = CustomDict({})
271        for part_name in self.info._enabled_parts:
272            part = getattr(self, part_name)
273            for joint_name, joint in part.joints.items():
274                _joints[part_name + "." + joint_name] = joint
275        return _joints

Return a dictionary of all joints of the robot.

The dictionary keys are the joint names, and the values are the corresponding OrbitaJoint objects.

def is_connected(self) -> bool:
293    def is_connected(self) -> bool:
294        """Check if the SDK is connected to the robot.
295
296        Returns:
297            `True` if connected, `False` otherwise.
298        """
299        return self._grpc_connected

Check if the SDK is connected to the robot.

Returns:

True if connected, False otherwise.

cameras: Optional[reachy2_sdk.media.camera_manager.CameraManager]
330    @property
331    def cameras(self) -> Optional[CameraManager]:
332        """Get the camera manager if available and connected."""
333        if not self._grpc_connected:
334            self._logger.error("Cannot get cameras, not connected to Reachy")
335            return None
336        return self._cameras

Get the camera manager if available and connected.

def get_update_timestamp(self) -> int:
452    def get_update_timestamp(self) -> int:
453        """Returns the timestamp (ns) of the last update.
454
455        The timestamp is generated by ROS running on Reachy.
456
457        Returns:
458            timestamp (int) in nanoseconds.
459        """
460        return self._update_timestamp.ToNanoseconds()

Returns the timestamp (ns) of the last update.

The timestamp is generated by ROS running on Reachy.

Returns:

timestamp (int) in nanoseconds.

audit: Dict[str, Dict[str, str]]
520    @property
521    def audit(self) -> Dict[str, Dict[str, str]]:
522        """Return the audit status of all enabled parts of the robot."""
523        audit_dict: Dict[str, Dict[str, str]] = {}
524        if not self._grpc_connected or not self.info:
525            self._logger.warning("Reachy is not connected!")
526        if self.info is not None:
527            for part in self.info._enabled_parts.values():
528                audit_dict[part._part_id.name] = part.audit
529        return audit_dict

Return the audit status of all enabled parts of the robot.

def turn_on(self) -> bool:
531    def turn_on(self) -> bool:
532        """Activate all motors of the robot's parts if all of them are not already turned on.
533
534        Returns:
535            `True` if successful, `False` otherwise.
536        """
537        if not self._grpc_connected or not self.info:
538            self._logger.warning("Cannot turn on Reachy, not connected.")
539            return False
540
541        speed_limit_high = 25
542        max_iterations = 10
543        ite = 0
544
545        while not self._is_fully_on() and ite < max_iterations:
546            for part in self.info._enabled_parts.values():
547                part.set_speed_limits(1)
548            time.sleep(0.05)
549            for part in self.info._enabled_parts.values():
550                part._turn_on()
551            if self._mobile_base is not None:
552                self._mobile_base._turn_on()
553            time.sleep(0.05)
554            for part in self.info._enabled_parts.values():
555                part.set_speed_limits(speed_limit_high)
556            time.sleep(0.4)
557            ite += 1
558
559        if ite == max_iterations:
560            self._logger.warning("Failed to turn on Reachy,")
561            return False
562
563        return True

Activate all motors of the robot's parts if all of them are not already turned on.

Returns:

True if successful, False otherwise.

def turn_off(self) -> bool:
565    def turn_off(self) -> bool:
566        """Turn all motors of enabled parts off.
567
568        All enabled parts' motors will then be compliant.
569        """
570        if not self._grpc_connected or not self.info:
571            self._logger.warning("Cannot turn off Reachy, not connected.")
572            return False
573        for part in self.info._enabled_parts.values():
574            part._turn_off()
575        if self._mobile_base is not None:
576            self._mobile_base._turn_off()
577        time.sleep(0.5)
578
579        return True

Turn all motors of enabled parts off.

All enabled parts' motors will then be compliant.

def turn_off_smoothly(self) -> bool:
581    def turn_off_smoothly(self) -> bool:
582        """Turn all motors of robot parts off.
583
584        Arm torques are reduced during 3 seconds, then all parts' motors will be compliant.
585        """
586        if not self._grpc_connected or not self.info:
587            self._logger.warning("Cannot turn off Reachy, not connected.")
588            return False
589        speed_limit_high = 25
590        # Enough to sustain the arm weight
591        torque_limit_low = 50
592        torque_limit_high = 100
593        duration = 3
594        arms_list = []
595
596        if hasattr(self, "_mobile_base") and self._mobile_base is not None:
597            self._mobile_base._turn_off()
598        for part in self.info._enabled_parts.values():
599            if "arm" in part._part_id.name:
600                part.set_torque_limits(torque_limit_low)
601                part.set_speed_limits(speed_limit_high)
602                part.goto_posture(duration=duration, wait_for_goto_end=False)
603                arms_list.append(part)
604            else:
605                part._turn_off()
606
607        countingTime = 0
608        while countingTime < duration:
609            time.sleep(1)
610            torque_limit_low -= 15
611            for arm_part in arms_list:
612                arm_part.set_torque_limits(torque_limit_low)
613            countingTime += 1
614
615        for arm_part in arms_list:
616            arm_part._turn_off()
617            arm_part.set_torque_limits(torque_limit_high)
618
619        time.sleep(0.5)
620        return True

Turn all motors of robot parts off.

Arm torques are reduced during 3 seconds, then all parts' motors will be compliant.

def is_on(self) -> bool:
622    def is_on(self) -> bool:
623        """Check if all actuators of Reachy parts are on (stiff).
624
625        Returns:
626            `True` if all are stiff, `False` otherwise.
627        """
628        if not self.info:
629            self._logger.warning("Reachy is not connected!")
630            return False
631
632        for part in self.info._enabled_parts.values():
633            if not part.is_on():
634                return False
635        if self._mobile_base is not None and self._mobile_base.is_off():
636            return False
637        return True

Check if all actuators of Reachy parts are on (stiff).

Returns:

True if all are stiff, False otherwise.

def is_off(self) -> bool:
639    def is_off(self) -> bool:
640        """Check if all actuators of Reachy parts are off (compliant).
641
642        Returns:
643            `True` if all are compliant, `False` otherwise.
644        """
645        if not self.info:
646            self._logger.warning("Reachy is not connected!")
647            return True
648
649        for part in self.info._enabled_parts.values():
650            if part.is_on():
651                return False
652        if self._mobile_base is not None and self._mobile_base.is_on():
653            return False
654        return True

Check if all actuators of Reachy parts are off (compliant).

Returns:

True if all are compliant, False otherwise.

def reset_default_limits(self) -> None:
660    def reset_default_limits(self) -> None:
661        """Set back speed and torque limits of all parts to maximum value (100)."""
662        if not self.info:
663            self._logger.warning("Reachy is not connected!")
664            return
665
666        for part in self.info._enabled_parts.values():
667            if issubclass(type(part), JointsBasedPart):
668                part.set_speed_limits(100)
669                part.set_torque_limits(100)
670        time.sleep(0.5)

Set back speed and torque limits of all parts to maximum value (100).

def goto_posture( self, common_posture: str = 'default', duration: float = 2, wait: bool = False, wait_for_goto_end: bool = True, interpolation_mode: str = 'minimum_jerk', open_gripper: bool = False) -> GoToHomeId:
672    def goto_posture(
673        self,
674        common_posture: str = "default",
675        duration: float = 2,
676        wait: bool = False,
677        wait_for_goto_end: bool = True,
678        interpolation_mode: str = "minimum_jerk",
679        open_gripper: bool = False,
680    ) -> GoToHomeId:
681        """Move the robot to a predefined posture.
682
683        Args:
684            common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'.
685            duration: The time duration in seconds for the robot to move to the specified posture.
686                Defaults to 2.
687            wait: Determines whether the program should wait for the movement to finish before
688                returning. If set to `True`, the program waits for the movement to complete before continuing
689                execution. Defaults to `False`.
690            wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
691                only after all previous commands in the queue have been executed. If set to `False`, the program
692                will cancel all executing moves and queues. Defaults to `True`.
693            interpolation_mode: The type of interpolation used when moving the arm's joints.
694                Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
695            open_gripper: If `True`, the gripper will open, if `False`, it stays in its current position.
696                Defaults to `False`.
697
698        Returns:
699            A GoToHomeId containing movement GoToIds for each part.
700        """
701        if common_posture not in ["default", "elbow_90"]:
702            raise ValueError(f"common_posture {common_posture} not supported! Should be 'default' or 'elbow_90'")
703        head_id = None
704        r_arm_id = None
705        l_arm_id = None
706        if not wait_for_goto_end:
707            self.cancel_all_goto()
708        if self.head is not None:
709            is_last_commmand = self.r_arm is None and self.l_arm is None
710            wait_head = wait and is_last_commmand
711            head_id = self.head.goto_posture(
712                duration=duration, wait=wait_head, wait_for_goto_end=wait_for_goto_end, interpolation_mode=interpolation_mode
713            )
714        if self.r_arm is not None:
715            is_last_commmand = self.l_arm is None
716            wait_r_arm = wait and is_last_commmand
717            r_arm_id = self.r_arm.goto_posture(
718                common_posture,
719                duration=duration,
720                wait=wait_r_arm,
721                wait_for_goto_end=wait_for_goto_end,
722                interpolation_mode=interpolation_mode,
723                open_gripper=open_gripper,
724            )
725        if self.l_arm is not None:
726            l_arm_id = self.l_arm.goto_posture(
727                common_posture,
728                duration=duration,
729                wait=wait,
730                wait_for_goto_end=wait_for_goto_end,
731                interpolation_mode=interpolation_mode,
732                open_gripper=open_gripper,
733            )
734        ids = GoToHomeId(
735            head=head_id,
736            r_arm=r_arm_id,
737            l_arm=l_arm_id,
738        )
739        return ids

Move the robot to a predefined posture.

Arguments:
  • common_posture: The name of the posture. It can be 'default' or 'elbow_90'. Defaults to 'default'.
  • duration: The time duration in seconds for the robot to move to the specified posture. Defaults to 2.
  • wait: Determines whether the program should wait for the movement to finish before returning. If set to True, the program waits for the movement to complete before continuing execution. Defaults to False.
  • wait_for_goto_end: Specifies whether commands will be sent to a part immediately or only after all previous commands in the queue have been executed. If set to False, the program will cancel all executing moves and queues. Defaults to True.
  • interpolation_mode: The type of interpolation used when moving the arm's joints. Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
  • open_gripper: If True, the gripper will open, if False, it stays in its current position. Defaults to False.
Returns:

A GoToHomeId containing movement GoToIds for each part.

def is_goto_finished(self, goto_id: goto_pb2.GoToId) -> bool:
741    def is_goto_finished(self, goto_id: GoToId) -> bool:
742        """Check if a goto command has completed.
743
744        Args:
745            goto_id: The unique GoToId of the goto command.
746
747        Returns:
748            `True` if the command is complete, `False` otherwise.
749        """
750        if not self._grpc_connected:
751            self._logger.warning("Reachy is not connected!")
752            return False
753        if not isinstance(goto_id, GoToId):
754            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
755        if goto_id.id == -1:
756            self._logger.error("is_goto_finished() asked for unvalid movement. Goto not played.")
757            return True
758        state = self._get_goto_state(goto_id)
759        result = bool(
760            state.goal_status == GoalStatus.STATUS_ABORTED
761            or state.goal_status == GoalStatus.STATUS_CANCELED
762            or state.goal_status == GoalStatus.STATUS_SUCCEEDED
763        )
764        return result

Check if a goto command has completed.

Arguments:
  • goto_id: The unique GoToId of the goto command.
Returns:

True if the command is complete, False otherwise.

def get_goto_request( self, goto_id: goto_pb2.GoToId) -> Optional[reachy2_sdk.utils.utils.SimplifiedRequest]:
766    def get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]:
767        """Retrieve the details of a goto command based on its GoToId.
768
769        Args:
770            goto_id: The ID of the goto command for which details are requested.
771
772        Returns:
773            A `SimplifiedRequest` object containing the part name, joint goal positions
774            (in degrees), movement duration, and interpolation mode.
775            Returns `None` if the robot is not connected or if the `goto_id` is invalid.
776
777        Raises:
778            TypeError: If `goto_id` is not an instance of `GoToId`.
779            ValueError: If `goto_id` is -1, indicating an invalid command.
780        """
781        if not self._grpc_connected:
782            self._logger.warning("Reachy is not connected!")
783            return None
784        if not isinstance(goto_id, GoToId):
785            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
786        if goto_id.id == -1:
787            raise ValueError("No answer was found for given move, goto_id is -1")
788
789        response = self._goto_stub.GetGoToRequest(goto_id)
790
791        full_request = process_goto_request(response)
792
793        return full_request

Retrieve the details of a goto command based on its GoToId.

Arguments:
  • goto_id: The ID of the goto command for which details are requested.
Returns:

A SimplifiedRequest object containing the part name, joint goal positions (in degrees), movement duration, and interpolation mode. Returns None if the robot is not connected or if the goto_id is invalid.

Raises:
  • TypeError: If goto_id is not an instance of GoToId.
  • ValueError: If goto_id is -1, indicating an invalid command.
def cancel_goto_by_id(self, goto_id: goto_pb2.GoToId) -> goto_pb2.GoToAck:
807    def cancel_goto_by_id(self, goto_id: GoToId) -> GoToAck:
808        """Request the cancellation of a specific goto command based on its GoToId.
809
810        Args:
811            goto_id: The ID of the goto command to cancel.
812
813        Returns:
814            A `GoToAck` object indicating whether the cancellation was acknowledged.
815            If the robot is not connected, returns None.
816
817        Raises:
818            TypeError: If `goto_id` is not an instance of `GoToId`.
819        """
820        if not self._grpc_connected:
821            self._logger.warning("Reachy is not connected!")
822            return None
823        if not isinstance(goto_id, GoToId):
824            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
825        if goto_id.id == -1:
826            self._logger.error("cancel_goto_by_id() asked for unvalid movement. Goto not played.")
827            return GoToAck(ack=True)
828        response = self._goto_stub.CancelGoTo(goto_id)
829        return response

Request the cancellation of a specific goto command based on its GoToId.

Arguments:
  • goto_id: The ID of the goto command to cancel.
Returns:

A GoToAck object indicating whether the cancellation was acknowledged. If the robot is not connected, returns None.

Raises:
  • TypeError: If goto_id is not an instance of GoToId.
def cancel_all_goto(self) -> goto_pb2.GoToAck:
831    def cancel_all_goto(self) -> GoToAck:
832        """Cancel all active goto commands.
833
834        Returns:
835             A `GoToAck` object indicating whether the cancellation was acknowledged.
836        """
837        if not self._grpc_connected:
838            self._logger.warning("Reachy is not connected!")
839            return None
840        response = self._goto_stub.CancelAllGoTo(Empty())
841        return response

Cancel all active goto commands.

Returns:

A GoToAck object indicating whether the cancellation was acknowledged.

def send_goal_positions(self, check_positions: bool = False) -> None:
843    def send_goal_positions(self, check_positions: bool = False) -> None:
844        """Send the goal positions to the robot.
845
846        If goal positions have been specified for any joint of the robot, sends them to the robot.
847
848        Args :
849            check_positions: A boolean indicating whether to check the positions after sending the command.
850                Defaults to True.
851        """
852        if not self.info:
853            self._logger.warning("Reachy is not connected!")
854            return
855
856        commands: Dict[str, ArmComponentsCommands | HeadComponentsCommands | HandPositionRequest] = {}
857        for part in [self.r_arm, self.l_arm, self.head]:
858            self._add_component_commands(part, commands, check_positions)
859
860        if self.r_arm is not None:
861            self._add_component_commands(self.r_arm.gripper, commands, check_positions)
862        if self.l_arm is not None:
863            self._add_component_commands(self.l_arm.gripper, commands, check_positions)
864
865        components_commands = ReachyComponentsCommands(**commands)
866        self._stub.SendComponentsCommands(components_commands)

Send the goal positions to the robot.

If goal positions have been specified for any joint of the robot, sends them to the robot.

Args :

check_positions: A boolean indicating whether to check the positions after sending the command. Defaults to True.