reachy2_sdk.parts.part

Reachy Part module.

Handles all specific methods commmon to all Reachy parts (Arm, Head, Hand or MobileBase).

  1"""Reachy Part module.
  2
  3Handles all specific methods commmon to all Reachy parts (Arm, Head, Hand or MobileBase).
  4"""
  5
  6import logging
  7import time
  8from abc import ABC, abstractmethod
  9from typing import Any, Dict
 10
 11import grpc
 12from reachy2_sdk_api.arm_pb2 import Arm as Arm_proto
 13from reachy2_sdk_api.arm_pb2 import ArmState, ArmStatus
 14from reachy2_sdk_api.arm_pb2_grpc import ArmServiceStub
 15from reachy2_sdk_api.hand_pb2 import Hand as Hand_proto
 16from reachy2_sdk_api.hand_pb2 import HandState, HandStatus
 17from reachy2_sdk_api.hand_pb2_grpc import HandServiceStub
 18from reachy2_sdk_api.head_pb2 import Head as Head_proto
 19from reachy2_sdk_api.head_pb2 import HeadState, HeadStatus
 20from reachy2_sdk_api.head_pb2_grpc import HeadServiceStub
 21from reachy2_sdk_api.mobile_base_utility_pb2 import MobileBase as MobileBase_proto
 22from reachy2_sdk_api.mobile_base_utility_pb2 import MobileBaseState, MobileBaseStatus
 23from reachy2_sdk_api.mobile_base_utility_pb2_grpc import MobileBaseUtilityServiceStub
 24from reachy2_sdk_api.part_pb2 import PartId
 25
 26
 27class Part(ABC):
 28    """The Part class serves as an abstract base class representing parts of a robot, such as Arm, Hand, Head, or MobileBase.
 29
 30    This class provides a common interface for managing robot components, including turning them on or off, checking their
 31    status, and updating their states. The class is intended to be subclassed to implement specific behaviors for different
 32    types of robot parts.
 33    """
 34
 35    def __init__(
 36        self,
 37        proto_msg: Arm_proto | Head_proto | Hand_proto | MobileBase_proto,
 38        grpc_channel: grpc.Channel,
 39        stub: ArmServiceStub | HeadServiceStub | HandServiceStub | MobileBaseUtilityServiceStub,
 40    ) -> None:
 41        """Initialize the Part with common attributes for gRPC communication.
 42
 43        This sets up the communication channel and service stubs for the specified part,
 44        configures the part's unique identifier. It provides the foundation for specific parts of the robot
 45        (Arm, Head, Hand, MobileBase) to be derived from this class.
 46
 47        Args:
 48            proto_msg: The protobuf message containing configuration details for the part
 49                (Arm, Head, Hand, or MobileBase).
 50            grpc_channel: The gRPC channel used to communicate with the service.
 51            stub: The service stub for the gRPC communication, which could be for Arm, Head,
 52                Hand, or MobileBase.
 53        """
 54        self._grpc_channel = grpc_channel
 55        self._stub = stub
 56        self._part_id = PartId(id=proto_msg.part_id.id, name=proto_msg.part_id.name)
 57        self._logger = logging.getLogger(__name__)
 58
 59        self._actuators: Dict[str, Any] = {}
 60
 61    def turn_on(self) -> None:
 62        """Turn on the part.
 63
 64        This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits
 65        to maximum. It waits for a brief period to ensure the operation is complete.
 66        """
 67        self._set_speed_limits(1)
 68        time.sleep(0.05)
 69        self._turn_on()
 70        time.sleep(0.05)
 71        self._set_speed_limits(100)
 72        time.sleep(0.4)
 73
 74    def turn_off(self) -> None:
 75        """Turn off the part.
 76
 77        This method turns off all motors of the part and waits for a brief period to ensure the operation is complete.
 78        """
 79        self._turn_off()
 80        time.sleep(0.5)
 81
 82    def _turn_on(self) -> None:
 83        """Send a command to turn on immediately the part."""
 84        self._stub.TurnOn(self._part_id)
 85
 86    def _turn_off(self) -> None:
 87        """Send a command to turn off immediately the part."""
 88        self._stub.TurnOff(self._part_id)
 89
 90    def is_on(self) -> bool:
 91        """Check if all actuators of the part are currently on.
 92
 93        Returns:
 94            True if all actuators are on, otherwise False.
 95        """
 96        for actuator in self._actuators.values():
 97            if not actuator.is_on():
 98                return False
 99        return True
100
101    def is_off(self) -> bool:
102        """Check if all actuators of the part are currently off.
103
104        Returns:
105            True if all actuators are off, otherwise False.
106        """
107        for actuator in self._actuators.values():
108            if actuator.is_on():
109                return False
110        return True
111
112    @abstractmethod
113    def _update_with(self, state: ArmState | HeadState | HandState | MobileBaseState) -> None:
114        """Update the part's state with newly received data.
115
116        This method must be implemented by subclasses to update the state of the part based on
117        specific state data types such as ArmState, HeadState, HandState, or MobileBaseState.
118
119        Args:
120            state: The state data used to update the part, which can be an ArmState, HeadState,
121                HandState, or MobileBaseState.
122        """
123        pass  # pragma: no cover
124
125    @abstractmethod
126    def _update_audit_status(self, state: ArmStatus | HeadStatus | HandStatus | MobileBaseStatus) -> None:
127        """Update the audit status of the part.
128
129        This method must be implemented by subclasses to update the audit status of the part based on
130        specific status data types such as ArmStatus, HeadStatus, HandStatus, or MobileBaseStatus.
131
132        Args:
133            state: The status data used to update the audit status, which can be an ArmStatus,
134                HeadStatus, HandStatus, or MobileBaseStatus.
135        """
136        pass  # pragma: no cover
137
138    @abstractmethod
139    def _set_speed_limits(self, value: int) -> None:
140        """Set the speed limits for the part.
141
142        This method must be implemented by subclasses to set speed limits.
143
144        Args:
145            value: The speed limit value to be set, as a percentage of the maximum speed allowed (0-100).
146        """
147        pass  # pragma: no cover
148
149    @property
150    def audit(self) -> Dict[str, str]:
151        """Get the audit status of all actuators of the part.
152
153        Returns:
154            A dictionary where each key is the name of an actuator and the value is its audit status.
155            If an error is detected in any actuator, a warning is logged. Otherwise, an informational
156            message indicating no errors is logged.
157        """
158        error_dict: Dict[str, str] = {}
159        error_detected = False
160        for act_name, actuator in self._actuators.items():
161            error_dict[act_name] = actuator.status
162            if actuator.status is not None and actuator.status != "Ok":
163                self._logger.warning(f'Error detected on {self._part_id.name}_{act_name}: "{actuator.status}"')
164                error_detected = True
165        if not error_detected:
166            self._logger.info(f"No error detected on {self._part_id.name}")
167        return error_dict
class Part(abc.ABC):
 28class Part(ABC):
 29    """The Part class serves as an abstract base class representing parts of a robot, such as Arm, Hand, Head, or MobileBase.
 30
 31    This class provides a common interface for managing robot components, including turning them on or off, checking their
 32    status, and updating their states. The class is intended to be subclassed to implement specific behaviors for different
 33    types of robot parts.
 34    """
 35
 36    def __init__(
 37        self,
 38        proto_msg: Arm_proto | Head_proto | Hand_proto | MobileBase_proto,
 39        grpc_channel: grpc.Channel,
 40        stub: ArmServiceStub | HeadServiceStub | HandServiceStub | MobileBaseUtilityServiceStub,
 41    ) -> None:
 42        """Initialize the Part with common attributes for gRPC communication.
 43
 44        This sets up the communication channel and service stubs for the specified part,
 45        configures the part's unique identifier. It provides the foundation for specific parts of the robot
 46        (Arm, Head, Hand, MobileBase) to be derived from this class.
 47
 48        Args:
 49            proto_msg: The protobuf message containing configuration details for the part
 50                (Arm, Head, Hand, or MobileBase).
 51            grpc_channel: The gRPC channel used to communicate with the service.
 52            stub: The service stub for the gRPC communication, which could be for Arm, Head,
 53                Hand, or MobileBase.
 54        """
 55        self._grpc_channel = grpc_channel
 56        self._stub = stub
 57        self._part_id = PartId(id=proto_msg.part_id.id, name=proto_msg.part_id.name)
 58        self._logger = logging.getLogger(__name__)
 59
 60        self._actuators: Dict[str, Any] = {}
 61
 62    def turn_on(self) -> None:
 63        """Turn on the part.
 64
 65        This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits
 66        to maximum. It waits for a brief period to ensure the operation is complete.
 67        """
 68        self._set_speed_limits(1)
 69        time.sleep(0.05)
 70        self._turn_on()
 71        time.sleep(0.05)
 72        self._set_speed_limits(100)
 73        time.sleep(0.4)
 74
 75    def turn_off(self) -> None:
 76        """Turn off the part.
 77
 78        This method turns off all motors of the part and waits for a brief period to ensure the operation is complete.
 79        """
 80        self._turn_off()
 81        time.sleep(0.5)
 82
 83    def _turn_on(self) -> None:
 84        """Send a command to turn on immediately the part."""
 85        self._stub.TurnOn(self._part_id)
 86
 87    def _turn_off(self) -> None:
 88        """Send a command to turn off immediately the part."""
 89        self._stub.TurnOff(self._part_id)
 90
 91    def is_on(self) -> bool:
 92        """Check if all actuators of the part are currently on.
 93
 94        Returns:
 95            True if all actuators are on, otherwise False.
 96        """
 97        for actuator in self._actuators.values():
 98            if not actuator.is_on():
 99                return False
100        return True
101
102    def is_off(self) -> bool:
103        """Check if all actuators of the part are currently off.
104
105        Returns:
106            True if all actuators are off, otherwise False.
107        """
108        for actuator in self._actuators.values():
109            if actuator.is_on():
110                return False
111        return True
112
113    @abstractmethod
114    def _update_with(self, state: ArmState | HeadState | HandState | MobileBaseState) -> None:
115        """Update the part's state with newly received data.
116
117        This method must be implemented by subclasses to update the state of the part based on
118        specific state data types such as ArmState, HeadState, HandState, or MobileBaseState.
119
120        Args:
121            state: The state data used to update the part, which can be an ArmState, HeadState,
122                HandState, or MobileBaseState.
123        """
124        pass  # pragma: no cover
125
126    @abstractmethod
127    def _update_audit_status(self, state: ArmStatus | HeadStatus | HandStatus | MobileBaseStatus) -> None:
128        """Update the audit status of the part.
129
130        This method must be implemented by subclasses to update the audit status of the part based on
131        specific status data types such as ArmStatus, HeadStatus, HandStatus, or MobileBaseStatus.
132
133        Args:
134            state: The status data used to update the audit status, which can be an ArmStatus,
135                HeadStatus, HandStatus, or MobileBaseStatus.
136        """
137        pass  # pragma: no cover
138
139    @abstractmethod
140    def _set_speed_limits(self, value: int) -> None:
141        """Set the speed limits for the part.
142
143        This method must be implemented by subclasses to set speed limits.
144
145        Args:
146            value: The speed limit value to be set, as a percentage of the maximum speed allowed (0-100).
147        """
148        pass  # pragma: no cover
149
150    @property
151    def audit(self) -> Dict[str, str]:
152        """Get the audit status of all actuators of the part.
153
154        Returns:
155            A dictionary where each key is the name of an actuator and the value is its audit status.
156            If an error is detected in any actuator, a warning is logged. Otherwise, an informational
157            message indicating no errors is logged.
158        """
159        error_dict: Dict[str, str] = {}
160        error_detected = False
161        for act_name, actuator in self._actuators.items():
162            error_dict[act_name] = actuator.status
163            if actuator.status is not None and actuator.status != "Ok":
164                self._logger.warning(f'Error detected on {self._part_id.name}_{act_name}: "{actuator.status}"')
165                error_detected = True
166        if not error_detected:
167            self._logger.info(f"No error detected on {self._part_id.name}")
168        return error_dict

The Part class serves as an abstract base class representing parts of a robot, such as Arm, Hand, Head, or MobileBase.

This class provides a common interface for managing robot components, including turning them on or off, checking their status, and updating their states. The class is intended to be subclassed to implement specific behaviors for different types of robot parts.

Part( proto_msg: arm_pb2.Arm | head_pb2.Head | hand_pb2.Hand | mobile_base_utility_pb2.MobileBase, grpc_channel: grpc.Channel, stub: reachy2_sdk_api.arm_pb2_grpc.ArmServiceStub | reachy2_sdk_api.head_pb2_grpc.HeadServiceStub | reachy2_sdk_api.hand_pb2_grpc.HandServiceStub | reachy2_sdk_api.mobile_base_utility_pb2_grpc.MobileBaseUtilityServiceStub)
36    def __init__(
37        self,
38        proto_msg: Arm_proto | Head_proto | Hand_proto | MobileBase_proto,
39        grpc_channel: grpc.Channel,
40        stub: ArmServiceStub | HeadServiceStub | HandServiceStub | MobileBaseUtilityServiceStub,
41    ) -> None:
42        """Initialize the Part with common attributes for gRPC communication.
43
44        This sets up the communication channel and service stubs for the specified part,
45        configures the part's unique identifier. It provides the foundation for specific parts of the robot
46        (Arm, Head, Hand, MobileBase) to be derived from this class.
47
48        Args:
49            proto_msg: The protobuf message containing configuration details for the part
50                (Arm, Head, Hand, or MobileBase).
51            grpc_channel: The gRPC channel used to communicate with the service.
52            stub: The service stub for the gRPC communication, which could be for Arm, Head,
53                Hand, or MobileBase.
54        """
55        self._grpc_channel = grpc_channel
56        self._stub = stub
57        self._part_id = PartId(id=proto_msg.part_id.id, name=proto_msg.part_id.name)
58        self._logger = logging.getLogger(__name__)
59
60        self._actuators: Dict[str, Any] = {}

Initialize the Part with common attributes for gRPC communication.

This sets up the communication channel and service stubs for the specified part, configures the part's unique identifier. It provides the foundation for specific parts of the robot (Arm, Head, Hand, MobileBase) to be derived from this class.

Arguments:
  • proto_msg: The protobuf message containing configuration details for the part (Arm, Head, Hand, or MobileBase).
  • grpc_channel: The gRPC channel used to communicate with the service.
  • stub: The service stub for the gRPC communication, which could be for Arm, Head, Hand, or MobileBase.
def turn_on(self) -> None:
62    def turn_on(self) -> None:
63        """Turn on the part.
64
65        This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits
66        to maximum. It waits for a brief period to ensure the operation is complete.
67        """
68        self._set_speed_limits(1)
69        time.sleep(0.05)
70        self._turn_on()
71        time.sleep(0.05)
72        self._set_speed_limits(100)
73        time.sleep(0.4)

Turn on the part.

This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits to maximum. It waits for a brief period to ensure the operation is complete.

def turn_off(self) -> None:
75    def turn_off(self) -> None:
76        """Turn off the part.
77
78        This method turns off all motors of the part and waits for a brief period to ensure the operation is complete.
79        """
80        self._turn_off()
81        time.sleep(0.5)

Turn off the part.

This method turns off all motors of the part and waits for a brief period to ensure the operation is complete.

def is_on(self) -> bool:
 91    def is_on(self) -> bool:
 92        """Check if all actuators of the part are currently on.
 93
 94        Returns:
 95            True if all actuators are on, otherwise False.
 96        """
 97        for actuator in self._actuators.values():
 98            if not actuator.is_on():
 99                return False
100        return True

Check if all actuators of the part are currently on.

Returns:

True if all actuators are on, otherwise False.

def is_off(self) -> bool:
102    def is_off(self) -> bool:
103        """Check if all actuators of the part are currently off.
104
105        Returns:
106            True if all actuators are off, otherwise False.
107        """
108        for actuator in self._actuators.values():
109            if actuator.is_on():
110                return False
111        return True

Check if all actuators of the part are currently off.

Returns:

True if all actuators are off, otherwise False.

audit: Dict[str, str]
150    @property
151    def audit(self) -> Dict[str, str]:
152        """Get the audit status of all actuators of the part.
153
154        Returns:
155            A dictionary where each key is the name of an actuator and the value is its audit status.
156            If an error is detected in any actuator, a warning is logged. Otherwise, an informational
157            message indicating no errors is logged.
158        """
159        error_dict: Dict[str, str] = {}
160        error_detected = False
161        for act_name, actuator in self._actuators.items():
162            error_dict[act_name] = actuator.status
163            if actuator.status is not None and actuator.status != "Ok":
164                self._logger.warning(f'Error detected on {self._part_id.name}_{act_name}: "{actuator.status}"')
165                error_detected = True
166        if not error_detected:
167            self._logger.info(f"No error detected on {self._part_id.name}")
168        return error_dict

Get the audit status of all actuators of the part.

Returns:

A dictionary where each key is the name of an actuator and the value is its audit status. If an error is detected in any actuator, a warning is logged. Otherwise, an informational message indicating no errors is logged.