reachy2_sdk.parts.part
Reachy Part module.
Handles all specific methods commmon to all Reachy parts (Arm, Head, Hand or MobileBase).
1"""Reachy Part module. 2 3Handles all specific methods commmon to all Reachy parts (Arm, Head, Hand or MobileBase). 4""" 5 6import logging 7import time 8from abc import ABC, abstractmethod 9from typing import Any, Dict 10 11import grpc 12from reachy2_sdk_api.arm_pb2 import Arm as Arm_proto 13from reachy2_sdk_api.arm_pb2 import ArmState, ArmStatus 14from reachy2_sdk_api.arm_pb2_grpc import ArmServiceStub 15from reachy2_sdk_api.hand_pb2 import Hand as Hand_proto 16from reachy2_sdk_api.hand_pb2 import HandState, HandStatus 17from reachy2_sdk_api.hand_pb2_grpc import HandServiceStub 18from reachy2_sdk_api.head_pb2 import Head as Head_proto 19from reachy2_sdk_api.head_pb2 import HeadState, HeadStatus 20from reachy2_sdk_api.head_pb2_grpc import HeadServiceStub 21from reachy2_sdk_api.mobile_base_utility_pb2 import MobileBase as MobileBase_proto 22from reachy2_sdk_api.mobile_base_utility_pb2 import MobileBaseState, MobileBaseStatus 23from reachy2_sdk_api.mobile_base_utility_pb2_grpc import MobileBaseUtilityServiceStub 24from reachy2_sdk_api.part_pb2 import PartId 25 26 27class Part(ABC): 28 """The Part class serves as an abstract base class representing parts of a robot, such as Arm, Hand, Head, or MobileBase. 29 30 This class provides a common interface for managing robot components, including turning them on or off, checking their 31 status, and updating their states. The class is intended to be subclassed to implement specific behaviors for different 32 types of robot parts. 33 """ 34 35 def __init__( 36 self, 37 proto_msg: Arm_proto | Head_proto | Hand_proto | MobileBase_proto, 38 grpc_channel: grpc.Channel, 39 stub: ArmServiceStub | HeadServiceStub | HandServiceStub | MobileBaseUtilityServiceStub, 40 ) -> None: 41 """Initialize the Part with common attributes for gRPC communication. 42 43 This sets up the communication channel and service stubs for the specified part, 44 configures the part's unique identifier. It provides the foundation for specific parts of the robot 45 (Arm, Head, Hand, MobileBase) to be derived from this class. 46 47 Args: 48 proto_msg: The protobuf message containing configuration details for the part 49 (Arm, Head, Hand, or MobileBase). 50 grpc_channel: The gRPC channel used to communicate with the service. 51 stub: The service stub for the gRPC communication, which could be for Arm, Head, 52 Hand, or MobileBase. 53 """ 54 self._grpc_channel = grpc_channel 55 self._stub = stub 56 self._part_id = PartId(id=proto_msg.part_id.id, name=proto_msg.part_id.name) 57 self._logger = logging.getLogger(__name__) 58 59 self._actuators: Dict[str, Any] = {} 60 61 def turn_on(self) -> None: 62 """Turn on the part. 63 64 This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits 65 to maximum. It waits for a brief period to ensure the operation is complete. 66 """ 67 self._set_speed_limits(1) 68 time.sleep(0.05) 69 self._turn_on() 70 time.sleep(0.05) 71 self._set_speed_limits(100) 72 time.sleep(0.4) 73 74 def turn_off(self) -> None: 75 """Turn off the part. 76 77 This method turns off all motors of the part and waits for a brief period to ensure the operation is complete. 78 """ 79 self._turn_off() 80 time.sleep(0.5) 81 82 def _turn_on(self) -> None: 83 """Send a command to turn on immediately the part.""" 84 self._stub.TurnOn(self._part_id) 85 86 def _turn_off(self) -> None: 87 """Send a command to turn off immediately the part.""" 88 self._stub.TurnOff(self._part_id) 89 90 def is_on(self) -> bool: 91 """Check if all actuators of the part are currently on. 92 93 Returns: 94 True if all actuators are on, otherwise False. 95 """ 96 for actuator in self._actuators.values(): 97 if not actuator.is_on(): 98 return False 99 return True 100 101 def is_off(self) -> bool: 102 """Check if all actuators of the part are currently off. 103 104 Returns: 105 True if all actuators are off, otherwise False. 106 """ 107 for actuator in self._actuators.values(): 108 if actuator.is_on(): 109 return False 110 return True 111 112 @abstractmethod 113 def _update_with(self, state: ArmState | HeadState | HandState | MobileBaseState) -> None: 114 """Update the part's state with newly received data. 115 116 This method must be implemented by subclasses to update the state of the part based on 117 specific state data types such as ArmState, HeadState, HandState, or MobileBaseState. 118 119 Args: 120 state: The state data used to update the part, which can be an ArmState, HeadState, 121 HandState, or MobileBaseState. 122 """ 123 pass # pragma: no cover 124 125 @abstractmethod 126 def _update_audit_status(self, state: ArmStatus | HeadStatus | HandStatus | MobileBaseStatus) -> None: 127 """Update the audit status of the part. 128 129 This method must be implemented by subclasses to update the audit status of the part based on 130 specific status data types such as ArmStatus, HeadStatus, HandStatus, or MobileBaseStatus. 131 132 Args: 133 state: The status data used to update the audit status, which can be an ArmStatus, 134 HeadStatus, HandStatus, or MobileBaseStatus. 135 """ 136 pass # pragma: no cover 137 138 @abstractmethod 139 def _set_speed_limits(self, value: int) -> None: 140 """Set the speed limits for the part. 141 142 This method must be implemented by subclasses to set speed limits. 143 144 Args: 145 value: The speed limit value to be set, as a percentage of the maximum speed allowed (0-100). 146 """ 147 pass # pragma: no cover 148 149 @property 150 def audit(self) -> Dict[str, str]: 151 """Get the audit status of all actuators of the part. 152 153 Returns: 154 A dictionary where each key is the name of an actuator and the value is its audit status. 155 If an error is detected in any actuator, a warning is logged. Otherwise, an informational 156 message indicating no errors is logged. 157 """ 158 error_dict: Dict[str, str] = {} 159 error_detected = False 160 for act_name, actuator in self._actuators.items(): 161 error_dict[act_name] = actuator.status 162 if actuator.status is not None and actuator.status != "Ok": 163 self._logger.warning(f'Error detected on {self._part_id.name}_{act_name}: "{actuator.status}"') 164 error_detected = True 165 if not error_detected: 166 self._logger.info(f"No error detected on {self._part_id.name}") 167 return error_dict
28class Part(ABC): 29 """The Part class serves as an abstract base class representing parts of a robot, such as Arm, Hand, Head, or MobileBase. 30 31 This class provides a common interface for managing robot components, including turning them on or off, checking their 32 status, and updating their states. The class is intended to be subclassed to implement specific behaviors for different 33 types of robot parts. 34 """ 35 36 def __init__( 37 self, 38 proto_msg: Arm_proto | Head_proto | Hand_proto | MobileBase_proto, 39 grpc_channel: grpc.Channel, 40 stub: ArmServiceStub | HeadServiceStub | HandServiceStub | MobileBaseUtilityServiceStub, 41 ) -> None: 42 """Initialize the Part with common attributes for gRPC communication. 43 44 This sets up the communication channel and service stubs for the specified part, 45 configures the part's unique identifier. It provides the foundation for specific parts of the robot 46 (Arm, Head, Hand, MobileBase) to be derived from this class. 47 48 Args: 49 proto_msg: The protobuf message containing configuration details for the part 50 (Arm, Head, Hand, or MobileBase). 51 grpc_channel: The gRPC channel used to communicate with the service. 52 stub: The service stub for the gRPC communication, which could be for Arm, Head, 53 Hand, or MobileBase. 54 """ 55 self._grpc_channel = grpc_channel 56 self._stub = stub 57 self._part_id = PartId(id=proto_msg.part_id.id, name=proto_msg.part_id.name) 58 self._logger = logging.getLogger(__name__) 59 60 self._actuators: Dict[str, Any] = {} 61 62 def turn_on(self) -> None: 63 """Turn on the part. 64 65 This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits 66 to maximum. It waits for a brief period to ensure the operation is complete. 67 """ 68 self._set_speed_limits(1) 69 time.sleep(0.05) 70 self._turn_on() 71 time.sleep(0.05) 72 self._set_speed_limits(100) 73 time.sleep(0.4) 74 75 def turn_off(self) -> None: 76 """Turn off the part. 77 78 This method turns off all motors of the part and waits for a brief period to ensure the operation is complete. 79 """ 80 self._turn_off() 81 time.sleep(0.5) 82 83 def _turn_on(self) -> None: 84 """Send a command to turn on immediately the part.""" 85 self._stub.TurnOn(self._part_id) 86 87 def _turn_off(self) -> None: 88 """Send a command to turn off immediately the part.""" 89 self._stub.TurnOff(self._part_id) 90 91 def is_on(self) -> bool: 92 """Check if all actuators of the part are currently on. 93 94 Returns: 95 True if all actuators are on, otherwise False. 96 """ 97 for actuator in self._actuators.values(): 98 if not actuator.is_on(): 99 return False 100 return True 101 102 def is_off(self) -> bool: 103 """Check if all actuators of the part are currently off. 104 105 Returns: 106 True if all actuators are off, otherwise False. 107 """ 108 for actuator in self._actuators.values(): 109 if actuator.is_on(): 110 return False 111 return True 112 113 @abstractmethod 114 def _update_with(self, state: ArmState | HeadState | HandState | MobileBaseState) -> None: 115 """Update the part's state with newly received data. 116 117 This method must be implemented by subclasses to update the state of the part based on 118 specific state data types such as ArmState, HeadState, HandState, or MobileBaseState. 119 120 Args: 121 state: The state data used to update the part, which can be an ArmState, HeadState, 122 HandState, or MobileBaseState. 123 """ 124 pass # pragma: no cover 125 126 @abstractmethod 127 def _update_audit_status(self, state: ArmStatus | HeadStatus | HandStatus | MobileBaseStatus) -> None: 128 """Update the audit status of the part. 129 130 This method must be implemented by subclasses to update the audit status of the part based on 131 specific status data types such as ArmStatus, HeadStatus, HandStatus, or MobileBaseStatus. 132 133 Args: 134 state: The status data used to update the audit status, which can be an ArmStatus, 135 HeadStatus, HandStatus, or MobileBaseStatus. 136 """ 137 pass # pragma: no cover 138 139 @abstractmethod 140 def _set_speed_limits(self, value: int) -> None: 141 """Set the speed limits for the part. 142 143 This method must be implemented by subclasses to set speed limits. 144 145 Args: 146 value: The speed limit value to be set, as a percentage of the maximum speed allowed (0-100). 147 """ 148 pass # pragma: no cover 149 150 @property 151 def audit(self) -> Dict[str, str]: 152 """Get the audit status of all actuators of the part. 153 154 Returns: 155 A dictionary where each key is the name of an actuator and the value is its audit status. 156 If an error is detected in any actuator, a warning is logged. Otherwise, an informational 157 message indicating no errors is logged. 158 """ 159 error_dict: Dict[str, str] = {} 160 error_detected = False 161 for act_name, actuator in self._actuators.items(): 162 error_dict[act_name] = actuator.status 163 if actuator.status is not None and actuator.status != "Ok": 164 self._logger.warning(f'Error detected on {self._part_id.name}_{act_name}: "{actuator.status}"') 165 error_detected = True 166 if not error_detected: 167 self._logger.info(f"No error detected on {self._part_id.name}") 168 return error_dict
The Part class serves as an abstract base class representing parts of a robot, such as Arm, Hand, Head, or MobileBase.
This class provides a common interface for managing robot components, including turning them on or off, checking their status, and updating their states. The class is intended to be subclassed to implement specific behaviors for different types of robot parts.
36 def __init__( 37 self, 38 proto_msg: Arm_proto | Head_proto | Hand_proto | MobileBase_proto, 39 grpc_channel: grpc.Channel, 40 stub: ArmServiceStub | HeadServiceStub | HandServiceStub | MobileBaseUtilityServiceStub, 41 ) -> None: 42 """Initialize the Part with common attributes for gRPC communication. 43 44 This sets up the communication channel and service stubs for the specified part, 45 configures the part's unique identifier. It provides the foundation for specific parts of the robot 46 (Arm, Head, Hand, MobileBase) to be derived from this class. 47 48 Args: 49 proto_msg: The protobuf message containing configuration details for the part 50 (Arm, Head, Hand, or MobileBase). 51 grpc_channel: The gRPC channel used to communicate with the service. 52 stub: The service stub for the gRPC communication, which could be for Arm, Head, 53 Hand, or MobileBase. 54 """ 55 self._grpc_channel = grpc_channel 56 self._stub = stub 57 self._part_id = PartId(id=proto_msg.part_id.id, name=proto_msg.part_id.name) 58 self._logger = logging.getLogger(__name__) 59 60 self._actuators: Dict[str, Any] = {}
Initialize the Part with common attributes for gRPC communication.
This sets up the communication channel and service stubs for the specified part, configures the part's unique identifier. It provides the foundation for specific parts of the robot (Arm, Head, Hand, MobileBase) to be derived from this class.
Arguments:
- proto_msg: The protobuf message containing configuration details for the part (Arm, Head, Hand, or MobileBase).
- grpc_channel: The gRPC channel used to communicate with the service.
- stub: The service stub for the gRPC communication, which could be for Arm, Head, Hand, or MobileBase.
62 def turn_on(self) -> None: 63 """Turn on the part. 64 65 This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits 66 to maximum. It waits for a brief period to ensure the operation is complete. 67 """ 68 self._set_speed_limits(1) 69 time.sleep(0.05) 70 self._turn_on() 71 time.sleep(0.05) 72 self._set_speed_limits(100) 73 time.sleep(0.4)
Turn on the part.
This method sets the speed limits to a low value, turns on all motors of the part, and then restores the speed limits to maximum. It waits for a brief period to ensure the operation is complete.
75 def turn_off(self) -> None: 76 """Turn off the part. 77 78 This method turns off all motors of the part and waits for a brief period to ensure the operation is complete. 79 """ 80 self._turn_off() 81 time.sleep(0.5)
Turn off the part.
This method turns off all motors of the part and waits for a brief period to ensure the operation is complete.
91 def is_on(self) -> bool: 92 """Check if all actuators of the part are currently on. 93 94 Returns: 95 True if all actuators are on, otherwise False. 96 """ 97 for actuator in self._actuators.values(): 98 if not actuator.is_on(): 99 return False 100 return True
Check if all actuators of the part are currently on.
Returns:
True if all actuators are on, otherwise False.
102 def is_off(self) -> bool: 103 """Check if all actuators of the part are currently off. 104 105 Returns: 106 True if all actuators are off, otherwise False. 107 """ 108 for actuator in self._actuators.values(): 109 if actuator.is_on(): 110 return False 111 return True
Check if all actuators of the part are currently off.
Returns:
True if all actuators are off, otherwise False.
150 @property 151 def audit(self) -> Dict[str, str]: 152 """Get the audit status of all actuators of the part. 153 154 Returns: 155 A dictionary where each key is the name of an actuator and the value is its audit status. 156 If an error is detected in any actuator, a warning is logged. Otherwise, an informational 157 message indicating no errors is logged. 158 """ 159 error_dict: Dict[str, str] = {} 160 error_detected = False 161 for act_name, actuator in self._actuators.items(): 162 error_dict[act_name] = actuator.status 163 if actuator.status is not None and actuator.status != "Ok": 164 self._logger.warning(f'Error detected on {self._part_id.name}_{act_name}: "{actuator.status}"') 165 error_detected = True 166 if not error_detected: 167 self._logger.info(f"No error detected on {self._part_id.name}") 168 return error_dict
Get the audit status of all actuators of the part.
Returns:
A dictionary where each key is the name of an actuator and the value is its audit status. If an error is detected in any actuator, a warning is logged. Otherwise, an informational message indicating no errors is logged.