reachy2_sdk.orbita.orbita
Reachy Orbita module.
Handles all specific methods commmon to all Orbita2d and Orbita3d.
1"""Reachy Orbita module. 2 3Handles all specific methods commmon to all Orbita2d and Orbita3d. 4""" 5 6import logging 7import time 8from abc import ABC, abstractmethod 9from threading import Thread 10from typing import Any, Dict, Optional, Tuple 11 12import numpy as np 13from google.protobuf.wrappers_pb2 import BoolValue, FloatValue 14from reachy2_sdk_api.component_pb2 import ComponentId 15from reachy2_sdk_api.orbita2d_pb2 import ( 16 Orbita2dCommand, 17 Orbita2dsCommand, 18 Orbita2dState, 19 Orbita2dStatus, 20) 21from reachy2_sdk_api.orbita2d_pb2_grpc import Orbita2dServiceStub 22from reachy2_sdk_api.orbita3d_pb2 import Orbita3dState, Orbita3dStatus 23from reachy2_sdk_api.orbita3d_pb2_grpc import Orbita3dServiceStub 24 25from ..parts.part import Part 26from .orbita_axis import OrbitaAxis 27from .orbita_motor import OrbitaMotor 28 29 30class Orbita(ABC): 31 """The Orbita class is an abstract class to represent any Orbita actuator. 32 33 The Orbita class is used to store the up-to-date state of the actuator, especially: 34 - its compliancy 35 - its joints state 36 - its motors state 37 - its axis state 38 39 And apply speed, torque, pid and compliancy to all motors of the actuator. 40 41 This class is meant to be derived by Orbita2d and Orbita3d 42 """ 43 44 def __init__(self, uid: int, name: str, orbita_type: str, stub: Orbita2dServiceStub | Orbita3dServiceStub, part: Part): 45 """Initialize the Orbita actuator with its common attributes. 46 47 Args: 48 uid: The unique identifier for the actuator. 49 name: The name of the actuator. 50 orbita_type: Specifies the type of Orbita, either "2d" or "3d". 51 stub: The gRPC stub used for communicating with the actuator, which can be an 52 instance of either `Orbita2dServiceStub` or `Orbita3dServiceStub`. 53 part: The parent part to which the Orbita belongs, used for referencing the 54 part's attributes. 55 """ 56 self._logger = logging.getLogger(__name__) 57 self._name = name 58 self._id = uid 59 self._orbita_type = orbita_type 60 self._stub = stub 61 self._part = part 62 63 self._compliant: bool 64 65 self._joints: Dict[str, Any] = {} 66 self._axis_name_by_joint: Dict[Any, str] = {} 67 self._motors: Dict[str, OrbitaMotor] = {} 68 self._outgoing_goal_positions: Dict[str, float] = {} 69 self._axis: Dict[str, OrbitaAxis] = {} 70 71 self._error_status: Optional[str] = None 72 73 self._thread_check_position: Optional[Thread] = None 74 self._cancel_check = False 75 76 @abstractmethod 77 def _create_dict_state(self, initial_state: Orbita2dState | Orbita3dState) -> Dict[str, Dict[str, FloatValue]]: 78 """Create a dictionary representation of the joint states. 79 80 This method must be implemented by subclasses to create the dict state of the Orbita. 81 82 Args: 83 initial_state: The initial state of the Orbita, provided as an instance of 84 `Orbita2dState` or `Orbita3dState`. 85 86 Returns: 87 A dictionary where each key corresponds to a joint attribute, and each value 88 is another dictionary of state information with string keys and `FloatValue` values. 89 """ 90 pass # pragma: no cover 91 92 def __repr__(self) -> str: 93 """Clean representation of an Orbita.""" 94 s = "\n\t".join([str(joint) for joint in self._joints.values()]) 95 return f"""<Orbita{self._orbita_type} on={self.is_on()} joints=\n\t{ 96 s 97 }\n>""" 98 99 @abstractmethod 100 def set_speed_limits(self, speed_limit: float | int) -> None: 101 """Set the speed limits for the Orbita actuator. 102 103 This method defines the maximum speed for the joints, specified as a percentage 104 of the maximum speed capability. 105 106 Args: 107 speed_limit: The desired speed limit as a percentage (0-100). 108 109 Raises: 110 TypeError: If the provided speed_limit is not a float or int. 111 ValueError: If the provided speed_limit is outside the range [0, 100]. 112 """ 113 if not isinstance(speed_limit, float | int): 114 raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}") 115 if not (0 <= speed_limit <= 100): 116 raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.") 117 118 @abstractmethod 119 def set_torque_limits(self, torque_limit: float | int) -> None: 120 """Set the torque limits for the Orbita actuator. 121 122 This method defines the maximum torque for the joints, specified as a percentage 123 of the maximum torque capability. 124 125 Args: 126 torque_limit: The desired torque limit as a percentage (0-100). 127 128 Raises: 129 TypeError: If the provided torque_limit is not a float or int. 130 ValueError: If the provided torque_limit is outside the range [0, 100]. 131 """ 132 if not isinstance(torque_limit, float | int): 133 raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}") 134 if not (0 <= torque_limit <= 100): 135 raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.") 136 137 # def set_pid(self, pid: Tuple[float, float, float]) -> None: 138 # """Set a pid value on all motors of the actuator""" 139 # if isinstance(pid, tuple) and len(pid) == 3 and all(isinstance(n, float | int) for n in pid): 140 # for m in self._motors.values(): 141 # m._tmp_pid = pid 142 # self._update_loop("pid") 143 # else: 144 # raise ValueError("pid should be of type Tuple[float, float, float]") 145 146 def get_speed_limits(self) -> Dict[str, float]: 147 """Get the speed limits for all motors of the actuator. 148 149 The speed limits are expressed as percentages of the maximum speed for each motor. 150 151 Returns: 152 A dictionary where each key is the motor name and the value is the speed limit 153 percentage (0-100) for that motor. Motor names are of format "motor_{n}". 154 """ 155 return {motor_name: m.speed_limit for motor_name, m in self._motors.items()} 156 157 def get_torque_limits(self) -> Dict[str, float]: 158 """Get the torque limits for all motors of the actuator. 159 160 The torque limits are expressed as percentages of the maximum torque for each motor. 161 162 Returns: 163 A dictionary where each key is the motor name and the value is the torque limit 164 percentage (0-100) for that motor. Motor names are of format "motor_{n}". 165 """ 166 return {motor_name: m.torque_limit for motor_name, m in self._motors.items()} 167 168 def get_pids(self) -> Dict[str, Tuple[float, float, float]]: 169 """Get the PID values for all motors of the actuator. 170 171 Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned. 172 173 Returns: 174 A dictionary where each key is the motor name and the value is a tuple containing 175 the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}". 176 """ 177 return {motor_name: m.pid for motor_name, m in self._motors.items()} 178 179 def turn_on(self) -> None: 180 """Turn on all motors of the actuator.""" 181 self._set_compliant(False) 182 183 def turn_off(self) -> None: 184 """Turn off all motors of the actuator.""" 185 self._set_compliant(True) 186 187 def is_on(self) -> bool: 188 """Check if the actuator is currently stiff. 189 190 Returns: 191 `True` if the actuator is stiff (not compliant), `False` otherwise. 192 """ 193 return not self._compliant 194 195 def is_off(self) -> bool: 196 """Check if the actuator is currently compliant. 197 198 Returns: 199 `True` if the actuator is compliant (not stiff), `False` otherwise. 200 """ 201 return self._compliant 202 203 @property 204 def temperatures(self) -> Dict[str, float]: 205 """Get the current temperatures of all the motors in the actuator. 206 207 Returns: 208 A dictionary where each key is the motor name and the value is the 209 current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}". 210 """ 211 return {motor_name: m.temperature for motor_name, m in self._motors.items()} 212 213 def _set_compliant(self, compliant: bool) -> None: 214 """Set the compliance mode of the actuator's motors. 215 216 Compliance mode determines whether the motors are stiff or compliant. 217 218 Args: 219 compliant: A boolean value indicating whether to set the motors to 220 compliant (`True`) or stiff (`False`). 221 """ 222 command = Orbita2dsCommand( 223 cmd=[ 224 Orbita2dCommand( 225 id=ComponentId(id=self._id), 226 compliant=BoolValue(value=compliant), 227 ) 228 ] 229 ) 230 self._stub.SendCommand(command) 231 232 def _set_outgoing_goal_position(self, axis_name: str, goal_position: float) -> None: 233 """Set the goal position for a specified axis. 234 235 This method sets the target position for an axis, preparing it to be sent as a goal position. 236 237 Args: 238 axis_name: The name of the axis for which to set the goal position. Could be "roll", "pitch", "yaw" for Orbita3d 239 or "axis_1", "axis_2" for Orbita2d. 240 goal_position: The desired goal position converted in radians for the specified axis. 241 """ 242 joint = getattr(self, axis_name) 243 axis = self._axis_name_by_joint[joint] 244 self._outgoing_goal_positions[axis] = goal_position 245 246 @abstractmethod 247 def send_goal_positions(self, check_positions: bool = False) -> None: 248 """Send the goal positions to the actuator. 249 250 This method is abstract and should be implemented in derived classes to 251 send the specified goal positions to the actuator's joints. 252 253 Args: 254 check_positions: A boolean value indicating whether to check the positions of the joints 255 after sending the goal positions. If `True`, a background thread is started to monitor 256 the joint positions relative to their last goal positions. 257 Default is `True`. 258 """ 259 pass # pragma: no cover 260 261 def _post_send_goal_positions(self) -> None: 262 """Start a background thread to check the goal positions after sending them. 263 264 This method stops any ongoing position check thread and starts a new thread 265 to monitor the current positions of the joints relative to their last goal positions. 266 """ 267 self._cancel_check = True 268 if self._thread_check_position is not None and self._thread_check_position.is_alive(): 269 self._thread_check_position.join() 270 self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True) 271 self._thread_check_position.start() 272 273 def _check_goal_positions(self) -> None: 274 """Monitor the joint positions to check if they reach the specified goals. 275 276 This method checks the current positions of the joints and compares them to 277 the goal positions. If a position is significantly different from the goal after 1 second, 278 a warning is logged indicating that the position may be unreachable. 279 """ 280 self._cancel_check = False 281 t1 = time.time() 282 while time.time() - t1 < 1: 283 time.sleep(0.0001) 284 if self._cancel_check: 285 # in case of multiple send_goal_positions we'll check the next call 286 return 287 288 for joint, orbitajoint in self._joints.items(): 289 # precision is low we are looking for unreachable positions 290 if not np.isclose(orbitajoint.present_position, orbitajoint.goal_position, atol=1): 291 self._logger.warning( 292 f"Required goal position ({round(orbitajoint.goal_position, 2)}) for {self._name}.{joint} is unreachable." 293 f"\nCurrent position is ({round(orbitajoint.present_position, 2)})." 294 ) 295 296 def _update_with(self, new_state: Orbita2dState | Orbita3dState) -> None: 297 """Update the actuator's state with new data. 298 299 This method updates the internal state of the motors, axes, and joints based on 300 the new state data received. 301 302 Args: 303 new_state: The new state of the actuator, either as an Orbita2dState or 304 Orbita3dState object. 305 """ 306 state: Dict[str, Dict[str, FloatValue]] = self._create_dict_state(new_state) 307 308 for name, motor in self._motors.items(): 309 motor._update_with(state[name]) 310 311 for name, axis in self._axis.items(): 312 axis._update_with(state[name]) 313 314 for name, joints in self._joints.items(): 315 joints._update_with(state[name]) 316 317 @property 318 def status(self) -> Optional[str]: 319 """Get the current audit status of the actuator. 320 321 Returns: 322 The audit status as a string, representing the latest error or status 323 message, or `None` if there is no error. 324 """ 325 return self._error_status 326 327 def _update_audit_status(self, new_status: Orbita2dStatus | Orbita3dStatus) -> None: 328 """Update the audit status based on the new status data. 329 330 Args: 331 new_status: The new status data, either as an Orbita2dStatus or 332 Orbita3dStatus object, containing error details. 333 """ 334 self._error_status = new_status.errors[0].details
31class Orbita(ABC): 32 """The Orbita class is an abstract class to represent any Orbita actuator. 33 34 The Orbita class is used to store the up-to-date state of the actuator, especially: 35 - its compliancy 36 - its joints state 37 - its motors state 38 - its axis state 39 40 And apply speed, torque, pid and compliancy to all motors of the actuator. 41 42 This class is meant to be derived by Orbita2d and Orbita3d 43 """ 44 45 def __init__(self, uid: int, name: str, orbita_type: str, stub: Orbita2dServiceStub | Orbita3dServiceStub, part: Part): 46 """Initialize the Orbita actuator with its common attributes. 47 48 Args: 49 uid: The unique identifier for the actuator. 50 name: The name of the actuator. 51 orbita_type: Specifies the type of Orbita, either "2d" or "3d". 52 stub: The gRPC stub used for communicating with the actuator, which can be an 53 instance of either `Orbita2dServiceStub` or `Orbita3dServiceStub`. 54 part: The parent part to which the Orbita belongs, used for referencing the 55 part's attributes. 56 """ 57 self._logger = logging.getLogger(__name__) 58 self._name = name 59 self._id = uid 60 self._orbita_type = orbita_type 61 self._stub = stub 62 self._part = part 63 64 self._compliant: bool 65 66 self._joints: Dict[str, Any] = {} 67 self._axis_name_by_joint: Dict[Any, str] = {} 68 self._motors: Dict[str, OrbitaMotor] = {} 69 self._outgoing_goal_positions: Dict[str, float] = {} 70 self._axis: Dict[str, OrbitaAxis] = {} 71 72 self._error_status: Optional[str] = None 73 74 self._thread_check_position: Optional[Thread] = None 75 self._cancel_check = False 76 77 @abstractmethod 78 def _create_dict_state(self, initial_state: Orbita2dState | Orbita3dState) -> Dict[str, Dict[str, FloatValue]]: 79 """Create a dictionary representation of the joint states. 80 81 This method must be implemented by subclasses to create the dict state of the Orbita. 82 83 Args: 84 initial_state: The initial state of the Orbita, provided as an instance of 85 `Orbita2dState` or `Orbita3dState`. 86 87 Returns: 88 A dictionary where each key corresponds to a joint attribute, and each value 89 is another dictionary of state information with string keys and `FloatValue` values. 90 """ 91 pass # pragma: no cover 92 93 def __repr__(self) -> str: 94 """Clean representation of an Orbita.""" 95 s = "\n\t".join([str(joint) for joint in self._joints.values()]) 96 return f"""<Orbita{self._orbita_type} on={self.is_on()} joints=\n\t{ 97 s 98 }\n>""" 99 100 @abstractmethod 101 def set_speed_limits(self, speed_limit: float | int) -> None: 102 """Set the speed limits for the Orbita actuator. 103 104 This method defines the maximum speed for the joints, specified as a percentage 105 of the maximum speed capability. 106 107 Args: 108 speed_limit: The desired speed limit as a percentage (0-100). 109 110 Raises: 111 TypeError: If the provided speed_limit is not a float or int. 112 ValueError: If the provided speed_limit is outside the range [0, 100]. 113 """ 114 if not isinstance(speed_limit, float | int): 115 raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}") 116 if not (0 <= speed_limit <= 100): 117 raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.") 118 119 @abstractmethod 120 def set_torque_limits(self, torque_limit: float | int) -> None: 121 """Set the torque limits for the Orbita actuator. 122 123 This method defines the maximum torque for the joints, specified as a percentage 124 of the maximum torque capability. 125 126 Args: 127 torque_limit: The desired torque limit as a percentage (0-100). 128 129 Raises: 130 TypeError: If the provided torque_limit is not a float or int. 131 ValueError: If the provided torque_limit is outside the range [0, 100]. 132 """ 133 if not isinstance(torque_limit, float | int): 134 raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}") 135 if not (0 <= torque_limit <= 100): 136 raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.") 137 138 # def set_pid(self, pid: Tuple[float, float, float]) -> None: 139 # """Set a pid value on all motors of the actuator""" 140 # if isinstance(pid, tuple) and len(pid) == 3 and all(isinstance(n, float | int) for n in pid): 141 # for m in self._motors.values(): 142 # m._tmp_pid = pid 143 # self._update_loop("pid") 144 # else: 145 # raise ValueError("pid should be of type Tuple[float, float, float]") 146 147 def get_speed_limits(self) -> Dict[str, float]: 148 """Get the speed limits for all motors of the actuator. 149 150 The speed limits are expressed as percentages of the maximum speed for each motor. 151 152 Returns: 153 A dictionary where each key is the motor name and the value is the speed limit 154 percentage (0-100) for that motor. Motor names are of format "motor_{n}". 155 """ 156 return {motor_name: m.speed_limit for motor_name, m in self._motors.items()} 157 158 def get_torque_limits(self) -> Dict[str, float]: 159 """Get the torque limits for all motors of the actuator. 160 161 The torque limits are expressed as percentages of the maximum torque for each motor. 162 163 Returns: 164 A dictionary where each key is the motor name and the value is the torque limit 165 percentage (0-100) for that motor. Motor names are of format "motor_{n}". 166 """ 167 return {motor_name: m.torque_limit for motor_name, m in self._motors.items()} 168 169 def get_pids(self) -> Dict[str, Tuple[float, float, float]]: 170 """Get the PID values for all motors of the actuator. 171 172 Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned. 173 174 Returns: 175 A dictionary where each key is the motor name and the value is a tuple containing 176 the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}". 177 """ 178 return {motor_name: m.pid for motor_name, m in self._motors.items()} 179 180 def turn_on(self) -> None: 181 """Turn on all motors of the actuator.""" 182 self._set_compliant(False) 183 184 def turn_off(self) -> None: 185 """Turn off all motors of the actuator.""" 186 self._set_compliant(True) 187 188 def is_on(self) -> bool: 189 """Check if the actuator is currently stiff. 190 191 Returns: 192 `True` if the actuator is stiff (not compliant), `False` otherwise. 193 """ 194 return not self._compliant 195 196 def is_off(self) -> bool: 197 """Check if the actuator is currently compliant. 198 199 Returns: 200 `True` if the actuator is compliant (not stiff), `False` otherwise. 201 """ 202 return self._compliant 203 204 @property 205 def temperatures(self) -> Dict[str, float]: 206 """Get the current temperatures of all the motors in the actuator. 207 208 Returns: 209 A dictionary where each key is the motor name and the value is the 210 current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}". 211 """ 212 return {motor_name: m.temperature for motor_name, m in self._motors.items()} 213 214 def _set_compliant(self, compliant: bool) -> None: 215 """Set the compliance mode of the actuator's motors. 216 217 Compliance mode determines whether the motors are stiff or compliant. 218 219 Args: 220 compliant: A boolean value indicating whether to set the motors to 221 compliant (`True`) or stiff (`False`). 222 """ 223 command = Orbita2dsCommand( 224 cmd=[ 225 Orbita2dCommand( 226 id=ComponentId(id=self._id), 227 compliant=BoolValue(value=compliant), 228 ) 229 ] 230 ) 231 self._stub.SendCommand(command) 232 233 def _set_outgoing_goal_position(self, axis_name: str, goal_position: float) -> None: 234 """Set the goal position for a specified axis. 235 236 This method sets the target position for an axis, preparing it to be sent as a goal position. 237 238 Args: 239 axis_name: The name of the axis for which to set the goal position. Could be "roll", "pitch", "yaw" for Orbita3d 240 or "axis_1", "axis_2" for Orbita2d. 241 goal_position: The desired goal position converted in radians for the specified axis. 242 """ 243 joint = getattr(self, axis_name) 244 axis = self._axis_name_by_joint[joint] 245 self._outgoing_goal_positions[axis] = goal_position 246 247 @abstractmethod 248 def send_goal_positions(self, check_positions: bool = False) -> None: 249 """Send the goal positions to the actuator. 250 251 This method is abstract and should be implemented in derived classes to 252 send the specified goal positions to the actuator's joints. 253 254 Args: 255 check_positions: A boolean value indicating whether to check the positions of the joints 256 after sending the goal positions. If `True`, a background thread is started to monitor 257 the joint positions relative to their last goal positions. 258 Default is `True`. 259 """ 260 pass # pragma: no cover 261 262 def _post_send_goal_positions(self) -> None: 263 """Start a background thread to check the goal positions after sending them. 264 265 This method stops any ongoing position check thread and starts a new thread 266 to monitor the current positions of the joints relative to their last goal positions. 267 """ 268 self._cancel_check = True 269 if self._thread_check_position is not None and self._thread_check_position.is_alive(): 270 self._thread_check_position.join() 271 self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True) 272 self._thread_check_position.start() 273 274 def _check_goal_positions(self) -> None: 275 """Monitor the joint positions to check if they reach the specified goals. 276 277 This method checks the current positions of the joints and compares them to 278 the goal positions. If a position is significantly different from the goal after 1 second, 279 a warning is logged indicating that the position may be unreachable. 280 """ 281 self._cancel_check = False 282 t1 = time.time() 283 while time.time() - t1 < 1: 284 time.sleep(0.0001) 285 if self._cancel_check: 286 # in case of multiple send_goal_positions we'll check the next call 287 return 288 289 for joint, orbitajoint in self._joints.items(): 290 # precision is low we are looking for unreachable positions 291 if not np.isclose(orbitajoint.present_position, orbitajoint.goal_position, atol=1): 292 self._logger.warning( 293 f"Required goal position ({round(orbitajoint.goal_position, 2)}) for {self._name}.{joint} is unreachable." 294 f"\nCurrent position is ({round(orbitajoint.present_position, 2)})." 295 ) 296 297 def _update_with(self, new_state: Orbita2dState | Orbita3dState) -> None: 298 """Update the actuator's state with new data. 299 300 This method updates the internal state of the motors, axes, and joints based on 301 the new state data received. 302 303 Args: 304 new_state: The new state of the actuator, either as an Orbita2dState or 305 Orbita3dState object. 306 """ 307 state: Dict[str, Dict[str, FloatValue]] = self._create_dict_state(new_state) 308 309 for name, motor in self._motors.items(): 310 motor._update_with(state[name]) 311 312 for name, axis in self._axis.items(): 313 axis._update_with(state[name]) 314 315 for name, joints in self._joints.items(): 316 joints._update_with(state[name]) 317 318 @property 319 def status(self) -> Optional[str]: 320 """Get the current audit status of the actuator. 321 322 Returns: 323 The audit status as a string, representing the latest error or status 324 message, or `None` if there is no error. 325 """ 326 return self._error_status 327 328 def _update_audit_status(self, new_status: Orbita2dStatus | Orbita3dStatus) -> None: 329 """Update the audit status based on the new status data. 330 331 Args: 332 new_status: The new status data, either as an Orbita2dStatus or 333 Orbita3dStatus object, containing error details. 334 """ 335 self._error_status = new_status.errors[0].details
The Orbita class is an abstract class to represent any Orbita actuator.
The Orbita class is used to store the up-to-date state of the actuator, especially:
- its compliancy
- its joints state
- its motors state
- its axis state
And apply speed, torque, pid and compliancy to all motors of the actuator.
This class is meant to be derived by Orbita2d and Orbita3d
45 def __init__(self, uid: int, name: str, orbita_type: str, stub: Orbita2dServiceStub | Orbita3dServiceStub, part: Part): 46 """Initialize the Orbita actuator with its common attributes. 47 48 Args: 49 uid: The unique identifier for the actuator. 50 name: The name of the actuator. 51 orbita_type: Specifies the type of Orbita, either "2d" or "3d". 52 stub: The gRPC stub used for communicating with the actuator, which can be an 53 instance of either `Orbita2dServiceStub` or `Orbita3dServiceStub`. 54 part: The parent part to which the Orbita belongs, used for referencing the 55 part's attributes. 56 """ 57 self._logger = logging.getLogger(__name__) 58 self._name = name 59 self._id = uid 60 self._orbita_type = orbita_type 61 self._stub = stub 62 self._part = part 63 64 self._compliant: bool 65 66 self._joints: Dict[str, Any] = {} 67 self._axis_name_by_joint: Dict[Any, str] = {} 68 self._motors: Dict[str, OrbitaMotor] = {} 69 self._outgoing_goal_positions: Dict[str, float] = {} 70 self._axis: Dict[str, OrbitaAxis] = {} 71 72 self._error_status: Optional[str] = None 73 74 self._thread_check_position: Optional[Thread] = None 75 self._cancel_check = False
Initialize the Orbita actuator with its common attributes.
Arguments:
- uid: The unique identifier for the actuator.
- name: The name of the actuator.
- orbita_type: Specifies the type of Orbita, either "2d" or "3d".
- stub: The gRPC stub used for communicating with the actuator, which can be an
instance of either
Orbita2dServiceStub
orOrbita3dServiceStub
. - part: The parent part to which the Orbita belongs, used for referencing the part's attributes.
100 @abstractmethod 101 def set_speed_limits(self, speed_limit: float | int) -> None: 102 """Set the speed limits for the Orbita actuator. 103 104 This method defines the maximum speed for the joints, specified as a percentage 105 of the maximum speed capability. 106 107 Args: 108 speed_limit: The desired speed limit as a percentage (0-100). 109 110 Raises: 111 TypeError: If the provided speed_limit is not a float or int. 112 ValueError: If the provided speed_limit is outside the range [0, 100]. 113 """ 114 if not isinstance(speed_limit, float | int): 115 raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}") 116 if not (0 <= speed_limit <= 100): 117 raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")
Set the speed limits for the Orbita actuator.
This method defines the maximum speed for the joints, specified as a percentage of the maximum speed capability.
Arguments:
- speed_limit: The desired speed limit as a percentage (0-100).
Raises:
- TypeError: If the provided speed_limit is not a float or int.
- ValueError: If the provided speed_limit is outside the range [0, 100].
119 @abstractmethod 120 def set_torque_limits(self, torque_limit: float | int) -> None: 121 """Set the torque limits for the Orbita actuator. 122 123 This method defines the maximum torque for the joints, specified as a percentage 124 of the maximum torque capability. 125 126 Args: 127 torque_limit: The desired torque limit as a percentage (0-100). 128 129 Raises: 130 TypeError: If the provided torque_limit is not a float or int. 131 ValueError: If the provided torque_limit is outside the range [0, 100]. 132 """ 133 if not isinstance(torque_limit, float | int): 134 raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}") 135 if not (0 <= torque_limit <= 100): 136 raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")
Set the torque limits for the Orbita actuator.
This method defines the maximum torque for the joints, specified as a percentage of the maximum torque capability.
Arguments:
- torque_limit: The desired torque limit as a percentage (0-100).
Raises:
- TypeError: If the provided torque_limit is not a float or int.
- ValueError: If the provided torque_limit is outside the range [0, 100].
147 def get_speed_limits(self) -> Dict[str, float]: 148 """Get the speed limits for all motors of the actuator. 149 150 The speed limits are expressed as percentages of the maximum speed for each motor. 151 152 Returns: 153 A dictionary where each key is the motor name and the value is the speed limit 154 percentage (0-100) for that motor. Motor names are of format "motor_{n}". 155 """ 156 return {motor_name: m.speed_limit for motor_name, m in self._motors.items()}
Get the speed limits for all motors of the actuator.
The speed limits are expressed as percentages of the maximum speed for each motor.
Returns:
A dictionary where each key is the motor name and the value is the speed limit percentage (0-100) for that motor. Motor names are of format "motor_{n}".
158 def get_torque_limits(self) -> Dict[str, float]: 159 """Get the torque limits for all motors of the actuator. 160 161 The torque limits are expressed as percentages of the maximum torque for each motor. 162 163 Returns: 164 A dictionary where each key is the motor name and the value is the torque limit 165 percentage (0-100) for that motor. Motor names are of format "motor_{n}". 166 """ 167 return {motor_name: m.torque_limit for motor_name, m in self._motors.items()}
Get the torque limits for all motors of the actuator.
The torque limits are expressed as percentages of the maximum torque for each motor.
Returns:
A dictionary where each key is the motor name and the value is the torque limit percentage (0-100) for that motor. Motor names are of format "motor_{n}".
169 def get_pids(self) -> Dict[str, Tuple[float, float, float]]: 170 """Get the PID values for all motors of the actuator. 171 172 Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned. 173 174 Returns: 175 A dictionary where each key is the motor name and the value is a tuple containing 176 the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}". 177 """ 178 return {motor_name: m.pid for motor_name, m in self._motors.items()}
Get the PID values for all motors of the actuator.
Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned.
Returns:
A dictionary where each key is the motor name and the value is a tuple containing the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}".
180 def turn_on(self) -> None: 181 """Turn on all motors of the actuator.""" 182 self._set_compliant(False)
Turn on all motors of the actuator.
184 def turn_off(self) -> None: 185 """Turn off all motors of the actuator.""" 186 self._set_compliant(True)
Turn off all motors of the actuator.
188 def is_on(self) -> bool: 189 """Check if the actuator is currently stiff. 190 191 Returns: 192 `True` if the actuator is stiff (not compliant), `False` otherwise. 193 """ 194 return not self._compliant
Check if the actuator is currently stiff.
Returns:
True
if the actuator is stiff (not compliant),False
otherwise.
196 def is_off(self) -> bool: 197 """Check if the actuator is currently compliant. 198 199 Returns: 200 `True` if the actuator is compliant (not stiff), `False` otherwise. 201 """ 202 return self._compliant
Check if the actuator is currently compliant.
Returns:
True
if the actuator is compliant (not stiff),False
otherwise.
204 @property 205 def temperatures(self) -> Dict[str, float]: 206 """Get the current temperatures of all the motors in the actuator. 207 208 Returns: 209 A dictionary where each key is the motor name and the value is the 210 current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}". 211 """ 212 return {motor_name: m.temperature for motor_name, m in self._motors.items()}
Get the current temperatures of all the motors in the actuator.
Returns:
A dictionary where each key is the motor name and the value is the current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}".
247 @abstractmethod 248 def send_goal_positions(self, check_positions: bool = False) -> None: 249 """Send the goal positions to the actuator. 250 251 This method is abstract and should be implemented in derived classes to 252 send the specified goal positions to the actuator's joints. 253 254 Args: 255 check_positions: A boolean value indicating whether to check the positions of the joints 256 after sending the goal positions. If `True`, a background thread is started to monitor 257 the joint positions relative to their last goal positions. 258 Default is `True`. 259 """ 260 pass # pragma: no cover
Send the goal positions to the actuator.
This method is abstract and should be implemented in derived classes to send the specified goal positions to the actuator's joints.
Arguments:
- check_positions: A boolean value indicating whether to check the positions of the joints
after sending the goal positions. If
True
, a background thread is started to monitor the joint positions relative to their last goal positions. Default isTrue
.
318 @property 319 def status(self) -> Optional[str]: 320 """Get the current audit status of the actuator. 321 322 Returns: 323 The audit status as a string, representing the latest error or status 324 message, or `None` if there is no error. 325 """ 326 return self._error_status
Get the current audit status of the actuator.
Returns:
The audit status as a string, representing the latest error or status message, or
None
if there is no error.