reachy2_sdk.orbita.orbita

Reachy Orbita module.

Handles all specific methods commmon to all Orbita2d and Orbita3d.

  1"""Reachy Orbita module.
  2
  3Handles all specific methods commmon to all Orbita2d and Orbita3d.
  4"""
  5
  6import logging
  7import time
  8from abc import ABC, abstractmethod
  9from threading import Thread
 10from typing import Any, Dict, Optional, Tuple
 11
 12import numpy as np
 13from google.protobuf.wrappers_pb2 import BoolValue, FloatValue
 14from reachy2_sdk_api.component_pb2 import ComponentId
 15from reachy2_sdk_api.orbita2d_pb2 import (
 16    Orbita2dCommand,
 17    Orbita2dsCommand,
 18    Orbita2dState,
 19    Orbita2dStatus,
 20)
 21from reachy2_sdk_api.orbita2d_pb2_grpc import Orbita2dServiceStub
 22from reachy2_sdk_api.orbita3d_pb2 import Orbita3dState, Orbita3dStatus
 23from reachy2_sdk_api.orbita3d_pb2_grpc import Orbita3dServiceStub
 24
 25from ..parts.part import Part
 26from .orbita_axis import OrbitaAxis
 27from .orbita_motor import OrbitaMotor
 28
 29
 30class Orbita(ABC):
 31    """The Orbita class is an abstract class to represent any Orbita actuator.
 32
 33    The Orbita class is used to store the up-to-date state of the actuator, especially:
 34    - its compliancy
 35    - its joints state
 36    - its motors state
 37    - its axis state
 38
 39    And apply speed, torque, pid and compliancy to all motors of the actuator.
 40
 41    This class is meant to be derived by Orbita2d and Orbita3d
 42    """
 43
 44    def __init__(self, uid: int, name: str, orbita_type: str, stub: Orbita2dServiceStub | Orbita3dServiceStub, part: Part):
 45        """Initialize the Orbita actuator with its common attributes.
 46
 47        Args:
 48            uid: The unique identifier for the actuator.
 49            name: The name of the actuator.
 50            orbita_type: Specifies the type of Orbita, either "2d" or "3d".
 51            stub: The gRPC stub used for communicating with the actuator, which can be an
 52                instance of either `Orbita2dServiceStub` or `Orbita3dServiceStub`.
 53            part: The parent part to which the Orbita belongs, used for referencing the
 54                part's attributes.
 55        """
 56        self._logger = logging.getLogger(__name__)
 57        self._name = name
 58        self._id = uid
 59        self._orbita_type = orbita_type
 60        self._stub = stub
 61        self._part = part
 62
 63        self._compliant: bool
 64
 65        self._joints: Dict[str, Any] = {}
 66        self._axis_name_by_joint: Dict[Any, str] = {}
 67        self._motors: Dict[str, OrbitaMotor] = {}
 68        self._outgoing_goal_positions: Dict[str, float] = {}
 69        self._axis: Dict[str, OrbitaAxis] = {}
 70
 71        self._error_status: Optional[str] = None
 72
 73        self._thread_check_position: Optional[Thread] = None
 74        self._cancel_check = False
 75
 76    @abstractmethod
 77    def _create_dict_state(self, initial_state: Orbita2dState | Orbita3dState) -> Dict[str, Dict[str, FloatValue]]:
 78        """Create a dictionary representation of the joint states.
 79
 80        This method must be implemented by subclasses to create the dict state of the Orbita.
 81
 82        Args:
 83            initial_state: The initial state of the Orbita, provided as an instance of
 84                `Orbita2dState` or `Orbita3dState`.
 85
 86        Returns:
 87            A dictionary where each key corresponds to a joint attribute, and each value
 88            is another dictionary of state information with string keys and `FloatValue` values.
 89        """
 90        pass  # pragma: no cover
 91
 92    def __repr__(self) -> str:
 93        """Clean representation of an Orbita."""
 94        s = "\n\t".join([str(joint) for joint in self._joints.values()])
 95        return f"""<Orbita{self._orbita_type} on={self.is_on()} joints=\n\t{
 96            s
 97        }\n>"""
 98
 99    @abstractmethod
100    def set_speed_limits(self, speed_limit: float | int) -> None:
101        """Set the speed limits for the Orbita actuator.
102
103        This method defines the maximum speed for the joints, specified as a percentage
104        of the maximum speed capability.
105
106        Args:
107            speed_limit: The desired speed limit as a percentage (0-100).
108
109        Raises:
110            TypeError: If the provided speed_limit is not a float or int.
111            ValueError: If the provided speed_limit is outside the range [0, 100].
112        """
113        if not isinstance(speed_limit, float | int):
114            raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}")
115        if not (0 <= speed_limit <= 100):
116            raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")
117
118    @abstractmethod
119    def set_torque_limits(self, torque_limit: float | int) -> None:
120        """Set the torque limits for the Orbita actuator.
121
122        This method defines the maximum torque for the joints, specified as a percentage
123        of the maximum torque capability.
124
125        Args:
126            torque_limit: The desired torque limit as a percentage (0-100).
127
128        Raises:
129            TypeError: If the provided torque_limit is not a float or int.
130            ValueError: If the provided torque_limit is outside the range [0, 100].
131        """
132        if not isinstance(torque_limit, float | int):
133            raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}")
134        if not (0 <= torque_limit <= 100):
135            raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")
136
137    # def set_pid(self, pid: Tuple[float, float, float]) -> None:
138    #     """Set a pid value on all motors of the actuator"""
139    #     if isinstance(pid, tuple) and len(pid) == 3 and all(isinstance(n, float | int) for n in pid):
140    #         for m in self._motors.values():
141    #             m._tmp_pid = pid
142    #         self._update_loop("pid")
143    #     else:
144    #         raise ValueError("pid should be of type Tuple[float, float, float]")
145
146    def get_speed_limits(self) -> Dict[str, float]:
147        """Get the speed limits for all motors of the actuator.
148
149        The speed limits are expressed as percentages of the maximum speed for each motor.
150
151        Returns:
152            A dictionary where each key is the motor name and the value is the speed limit
153            percentage (0-100) for that motor. Motor names are of format "motor_{n}".
154        """
155        return {motor_name: m.speed_limit for motor_name, m in self._motors.items()}
156
157    def get_torque_limits(self) -> Dict[str, float]:
158        """Get the torque limits for all motors of the actuator.
159
160        The torque limits are expressed as percentages of the maximum torque for each motor.
161
162        Returns:
163            A dictionary where each key is the motor name and the value is the torque limit
164            percentage (0-100) for that motor. Motor names are of format "motor_{n}".
165        """
166        return {motor_name: m.torque_limit for motor_name, m in self._motors.items()}
167
168    def get_pids(self) -> Dict[str, Tuple[float, float, float]]:
169        """Get the PID values for all motors of the actuator.
170
171        Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned.
172
173        Returns:
174            A dictionary where each key is the motor name and the value is a tuple containing
175            the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}".
176        """
177        return {motor_name: m.pid for motor_name, m in self._motors.items()}
178
179    def turn_on(self) -> None:
180        """Turn on all motors of the actuator."""
181        self._set_compliant(False)
182
183    def turn_off(self) -> None:
184        """Turn off all motors of the actuator."""
185        self._set_compliant(True)
186
187    def is_on(self) -> bool:
188        """Check if the actuator is currently stiff.
189
190        Returns:
191            `True` if the actuator is stiff (not compliant), `False` otherwise.
192        """
193        return not self._compliant
194
195    def is_off(self) -> bool:
196        """Check if the actuator is currently compliant.
197
198        Returns:
199            `True` if the actuator is compliant (not stiff), `False` otherwise.
200        """
201        return self._compliant
202
203    @property
204    def temperatures(self) -> Dict[str, float]:
205        """Get the current temperatures of all the motors in the actuator.
206
207        Returns:
208            A dictionary where each key is the motor name and the value is the
209            current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}".
210        """
211        return {motor_name: m.temperature for motor_name, m in self._motors.items()}
212
213    def _set_compliant(self, compliant: bool) -> None:
214        """Set the compliance mode of the actuator's motors.
215
216        Compliance mode determines whether the motors are stiff or compliant.
217
218        Args:
219            compliant: A boolean value indicating whether to set the motors to
220                compliant (`True`) or stiff (`False`).
221        """
222        command = Orbita2dsCommand(
223            cmd=[
224                Orbita2dCommand(
225                    id=ComponentId(id=self._id),
226                    compliant=BoolValue(value=compliant),
227                )
228            ]
229        )
230        self._stub.SendCommand(command)
231
232    def _set_outgoing_goal_position(self, axis_name: str, goal_position: float) -> None:
233        """Set the goal position for a specified axis.
234
235        This method sets the target position for an axis, preparing it to be sent as a goal position.
236
237        Args:
238            axis_name: The name of the axis for which to set the goal position. Could be "roll", "pitch", "yaw" for Orbita3d
239                or "axis_1", "axis_2" for Orbita2d.
240            goal_position: The desired goal position converted in radians for the specified axis.
241        """
242        joint = getattr(self, axis_name)
243        axis = self._axis_name_by_joint[joint]
244        self._outgoing_goal_positions[axis] = goal_position
245
246    @abstractmethod
247    def send_goal_positions(self, check_positions: bool = False) -> None:
248        """Send the goal positions to the actuator.
249
250        This method is abstract and should be implemented in derived classes to
251        send the specified goal positions to the actuator's joints.
252
253        Args:
254            check_positions: A boolean value indicating whether to check the positions of the joints
255                after sending the goal positions. If `True`, a background thread is started to monitor
256                the joint positions relative to their last goal positions.
257                Default is `True`.
258        """
259        pass  # pragma: no cover
260
261    def _post_send_goal_positions(self) -> None:
262        """Start a background thread to check the goal positions after sending them.
263
264        This method stops any ongoing position check thread and starts a new thread
265        to monitor the current positions of the joints relative to their last goal positions.
266        """
267        self._cancel_check = True
268        if self._thread_check_position is not None and self._thread_check_position.is_alive():
269            self._thread_check_position.join()
270        self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True)
271        self._thread_check_position.start()
272
273    def _check_goal_positions(self) -> None:
274        """Monitor the joint positions to check if they reach the specified goals.
275
276        This method checks the current positions of the joints and compares them to
277        the goal positions. If a position is significantly different from the goal after 1 second,
278        a warning is logged indicating that the position may be unreachable.
279        """
280        self._cancel_check = False
281        t1 = time.time()
282        while time.time() - t1 < 1:
283            time.sleep(0.0001)
284            if self._cancel_check:
285                # in case of multiple send_goal_positions we'll check the next call
286                return
287
288        for joint, orbitajoint in self._joints.items():
289            # precision is low we are looking for unreachable positions
290            if not np.isclose(orbitajoint.present_position, orbitajoint.goal_position, atol=1):
291                self._logger.warning(
292                    f"Required goal position ({round(orbitajoint.goal_position, 2)}) for {self._name}.{joint} is unreachable."
293                    f"\nCurrent position is ({round(orbitajoint.present_position, 2)})."
294                )
295
296    def _update_with(self, new_state: Orbita2dState | Orbita3dState) -> None:
297        """Update the actuator's state with new data.
298
299        This method updates the internal state of the motors, axes, and joints based on
300        the new state data received.
301
302        Args:
303            new_state: The new state of the actuator, either as an Orbita2dState or
304                Orbita3dState object.
305        """
306        state: Dict[str, Dict[str, FloatValue]] = self._create_dict_state(new_state)
307
308        for name, motor in self._motors.items():
309            motor._update_with(state[name])
310
311        for name, axis in self._axis.items():
312            axis._update_with(state[name])
313
314        for name, joints in self._joints.items():
315            joints._update_with(state[name])
316
317    @property
318    def status(self) -> Optional[str]:
319        """Get the current audit status of the actuator.
320
321        Returns:
322            The audit status as a string, representing the latest error or status
323            message, or `None` if there is no error.
324        """
325        return self._error_status
326
327    def _update_audit_status(self, new_status: Orbita2dStatus | Orbita3dStatus) -> None:
328        """Update the audit status based on the new status data.
329
330        Args:
331            new_status: The new status data, either as an Orbita2dStatus or
332                Orbita3dStatus object, containing error details.
333        """
334        self._error_status = new_status.errors[0].details
class Orbita(abc.ABC):
 31class Orbita(ABC):
 32    """The Orbita class is an abstract class to represent any Orbita actuator.
 33
 34    The Orbita class is used to store the up-to-date state of the actuator, especially:
 35    - its compliancy
 36    - its joints state
 37    - its motors state
 38    - its axis state
 39
 40    And apply speed, torque, pid and compliancy to all motors of the actuator.
 41
 42    This class is meant to be derived by Orbita2d and Orbita3d
 43    """
 44
 45    def __init__(self, uid: int, name: str, orbita_type: str, stub: Orbita2dServiceStub | Orbita3dServiceStub, part: Part):
 46        """Initialize the Orbita actuator with its common attributes.
 47
 48        Args:
 49            uid: The unique identifier for the actuator.
 50            name: The name of the actuator.
 51            orbita_type: Specifies the type of Orbita, either "2d" or "3d".
 52            stub: The gRPC stub used for communicating with the actuator, which can be an
 53                instance of either `Orbita2dServiceStub` or `Orbita3dServiceStub`.
 54            part: The parent part to which the Orbita belongs, used for referencing the
 55                part's attributes.
 56        """
 57        self._logger = logging.getLogger(__name__)
 58        self._name = name
 59        self._id = uid
 60        self._orbita_type = orbita_type
 61        self._stub = stub
 62        self._part = part
 63
 64        self._compliant: bool
 65
 66        self._joints: Dict[str, Any] = {}
 67        self._axis_name_by_joint: Dict[Any, str] = {}
 68        self._motors: Dict[str, OrbitaMotor] = {}
 69        self._outgoing_goal_positions: Dict[str, float] = {}
 70        self._axis: Dict[str, OrbitaAxis] = {}
 71
 72        self._error_status: Optional[str] = None
 73
 74        self._thread_check_position: Optional[Thread] = None
 75        self._cancel_check = False
 76
 77    @abstractmethod
 78    def _create_dict_state(self, initial_state: Orbita2dState | Orbita3dState) -> Dict[str, Dict[str, FloatValue]]:
 79        """Create a dictionary representation of the joint states.
 80
 81        This method must be implemented by subclasses to create the dict state of the Orbita.
 82
 83        Args:
 84            initial_state: The initial state of the Orbita, provided as an instance of
 85                `Orbita2dState` or `Orbita3dState`.
 86
 87        Returns:
 88            A dictionary where each key corresponds to a joint attribute, and each value
 89            is another dictionary of state information with string keys and `FloatValue` values.
 90        """
 91        pass  # pragma: no cover
 92
 93    def __repr__(self) -> str:
 94        """Clean representation of an Orbita."""
 95        s = "\n\t".join([str(joint) for joint in self._joints.values()])
 96        return f"""<Orbita{self._orbita_type} on={self.is_on()} joints=\n\t{
 97            s
 98        }\n>"""
 99
100    @abstractmethod
101    def set_speed_limits(self, speed_limit: float | int) -> None:
102        """Set the speed limits for the Orbita actuator.
103
104        This method defines the maximum speed for the joints, specified as a percentage
105        of the maximum speed capability.
106
107        Args:
108            speed_limit: The desired speed limit as a percentage (0-100).
109
110        Raises:
111            TypeError: If the provided speed_limit is not a float or int.
112            ValueError: If the provided speed_limit is outside the range [0, 100].
113        """
114        if not isinstance(speed_limit, float | int):
115            raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}")
116        if not (0 <= speed_limit <= 100):
117            raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")
118
119    @abstractmethod
120    def set_torque_limits(self, torque_limit: float | int) -> None:
121        """Set the torque limits for the Orbita actuator.
122
123        This method defines the maximum torque for the joints, specified as a percentage
124        of the maximum torque capability.
125
126        Args:
127            torque_limit: The desired torque limit as a percentage (0-100).
128
129        Raises:
130            TypeError: If the provided torque_limit is not a float or int.
131            ValueError: If the provided torque_limit is outside the range [0, 100].
132        """
133        if not isinstance(torque_limit, float | int):
134            raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}")
135        if not (0 <= torque_limit <= 100):
136            raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")
137
138    # def set_pid(self, pid: Tuple[float, float, float]) -> None:
139    #     """Set a pid value on all motors of the actuator"""
140    #     if isinstance(pid, tuple) and len(pid) == 3 and all(isinstance(n, float | int) for n in pid):
141    #         for m in self._motors.values():
142    #             m._tmp_pid = pid
143    #         self._update_loop("pid")
144    #     else:
145    #         raise ValueError("pid should be of type Tuple[float, float, float]")
146
147    def get_speed_limits(self) -> Dict[str, float]:
148        """Get the speed limits for all motors of the actuator.
149
150        The speed limits are expressed as percentages of the maximum speed for each motor.
151
152        Returns:
153            A dictionary where each key is the motor name and the value is the speed limit
154            percentage (0-100) for that motor. Motor names are of format "motor_{n}".
155        """
156        return {motor_name: m.speed_limit for motor_name, m in self._motors.items()}
157
158    def get_torque_limits(self) -> Dict[str, float]:
159        """Get the torque limits for all motors of the actuator.
160
161        The torque limits are expressed as percentages of the maximum torque for each motor.
162
163        Returns:
164            A dictionary where each key is the motor name and the value is the torque limit
165            percentage (0-100) for that motor. Motor names are of format "motor_{n}".
166        """
167        return {motor_name: m.torque_limit for motor_name, m in self._motors.items()}
168
169    def get_pids(self) -> Dict[str, Tuple[float, float, float]]:
170        """Get the PID values for all motors of the actuator.
171
172        Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned.
173
174        Returns:
175            A dictionary where each key is the motor name and the value is a tuple containing
176            the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}".
177        """
178        return {motor_name: m.pid for motor_name, m in self._motors.items()}
179
180    def turn_on(self) -> None:
181        """Turn on all motors of the actuator."""
182        self._set_compliant(False)
183
184    def turn_off(self) -> None:
185        """Turn off all motors of the actuator."""
186        self._set_compliant(True)
187
188    def is_on(self) -> bool:
189        """Check if the actuator is currently stiff.
190
191        Returns:
192            `True` if the actuator is stiff (not compliant), `False` otherwise.
193        """
194        return not self._compliant
195
196    def is_off(self) -> bool:
197        """Check if the actuator is currently compliant.
198
199        Returns:
200            `True` if the actuator is compliant (not stiff), `False` otherwise.
201        """
202        return self._compliant
203
204    @property
205    def temperatures(self) -> Dict[str, float]:
206        """Get the current temperatures of all the motors in the actuator.
207
208        Returns:
209            A dictionary where each key is the motor name and the value is the
210            current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}".
211        """
212        return {motor_name: m.temperature for motor_name, m in self._motors.items()}
213
214    def _set_compliant(self, compliant: bool) -> None:
215        """Set the compliance mode of the actuator's motors.
216
217        Compliance mode determines whether the motors are stiff or compliant.
218
219        Args:
220            compliant: A boolean value indicating whether to set the motors to
221                compliant (`True`) or stiff (`False`).
222        """
223        command = Orbita2dsCommand(
224            cmd=[
225                Orbita2dCommand(
226                    id=ComponentId(id=self._id),
227                    compliant=BoolValue(value=compliant),
228                )
229            ]
230        )
231        self._stub.SendCommand(command)
232
233    def _set_outgoing_goal_position(self, axis_name: str, goal_position: float) -> None:
234        """Set the goal position for a specified axis.
235
236        This method sets the target position for an axis, preparing it to be sent as a goal position.
237
238        Args:
239            axis_name: The name of the axis for which to set the goal position. Could be "roll", "pitch", "yaw" for Orbita3d
240                or "axis_1", "axis_2" for Orbita2d.
241            goal_position: The desired goal position converted in radians for the specified axis.
242        """
243        joint = getattr(self, axis_name)
244        axis = self._axis_name_by_joint[joint]
245        self._outgoing_goal_positions[axis] = goal_position
246
247    @abstractmethod
248    def send_goal_positions(self, check_positions: bool = False) -> None:
249        """Send the goal positions to the actuator.
250
251        This method is abstract and should be implemented in derived classes to
252        send the specified goal positions to the actuator's joints.
253
254        Args:
255            check_positions: A boolean value indicating whether to check the positions of the joints
256                after sending the goal positions. If `True`, a background thread is started to monitor
257                the joint positions relative to their last goal positions.
258                Default is `True`.
259        """
260        pass  # pragma: no cover
261
262    def _post_send_goal_positions(self) -> None:
263        """Start a background thread to check the goal positions after sending them.
264
265        This method stops any ongoing position check thread and starts a new thread
266        to monitor the current positions of the joints relative to their last goal positions.
267        """
268        self._cancel_check = True
269        if self._thread_check_position is not None and self._thread_check_position.is_alive():
270            self._thread_check_position.join()
271        self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True)
272        self._thread_check_position.start()
273
274    def _check_goal_positions(self) -> None:
275        """Monitor the joint positions to check if they reach the specified goals.
276
277        This method checks the current positions of the joints and compares them to
278        the goal positions. If a position is significantly different from the goal after 1 second,
279        a warning is logged indicating that the position may be unreachable.
280        """
281        self._cancel_check = False
282        t1 = time.time()
283        while time.time() - t1 < 1:
284            time.sleep(0.0001)
285            if self._cancel_check:
286                # in case of multiple send_goal_positions we'll check the next call
287                return
288
289        for joint, orbitajoint in self._joints.items():
290            # precision is low we are looking for unreachable positions
291            if not np.isclose(orbitajoint.present_position, orbitajoint.goal_position, atol=1):
292                self._logger.warning(
293                    f"Required goal position ({round(orbitajoint.goal_position, 2)}) for {self._name}.{joint} is unreachable."
294                    f"\nCurrent position is ({round(orbitajoint.present_position, 2)})."
295                )
296
297    def _update_with(self, new_state: Orbita2dState | Orbita3dState) -> None:
298        """Update the actuator's state with new data.
299
300        This method updates the internal state of the motors, axes, and joints based on
301        the new state data received.
302
303        Args:
304            new_state: The new state of the actuator, either as an Orbita2dState or
305                Orbita3dState object.
306        """
307        state: Dict[str, Dict[str, FloatValue]] = self._create_dict_state(new_state)
308
309        for name, motor in self._motors.items():
310            motor._update_with(state[name])
311
312        for name, axis in self._axis.items():
313            axis._update_with(state[name])
314
315        for name, joints in self._joints.items():
316            joints._update_with(state[name])
317
318    @property
319    def status(self) -> Optional[str]:
320        """Get the current audit status of the actuator.
321
322        Returns:
323            The audit status as a string, representing the latest error or status
324            message, or `None` if there is no error.
325        """
326        return self._error_status
327
328    def _update_audit_status(self, new_status: Orbita2dStatus | Orbita3dStatus) -> None:
329        """Update the audit status based on the new status data.
330
331        Args:
332            new_status: The new status data, either as an Orbita2dStatus or
333                Orbita3dStatus object, containing error details.
334        """
335        self._error_status = new_status.errors[0].details

The Orbita class is an abstract class to represent any Orbita actuator.

The Orbita class is used to store the up-to-date state of the actuator, especially:

  • its compliancy
  • its joints state
  • its motors state
  • its axis state

And apply speed, torque, pid and compliancy to all motors of the actuator.

This class is meant to be derived by Orbita2d and Orbita3d

Orbita( uid: int, name: str, orbita_type: str, stub: reachy2_sdk_api.orbita2d_pb2_grpc.Orbita2dServiceStub | reachy2_sdk_api.orbita3d_pb2_grpc.Orbita3dServiceStub, part: reachy2_sdk.parts.part.Part)
45    def __init__(self, uid: int, name: str, orbita_type: str, stub: Orbita2dServiceStub | Orbita3dServiceStub, part: Part):
46        """Initialize the Orbita actuator with its common attributes.
47
48        Args:
49            uid: The unique identifier for the actuator.
50            name: The name of the actuator.
51            orbita_type: Specifies the type of Orbita, either "2d" or "3d".
52            stub: The gRPC stub used for communicating with the actuator, which can be an
53                instance of either `Orbita2dServiceStub` or `Orbita3dServiceStub`.
54            part: The parent part to which the Orbita belongs, used for referencing the
55                part's attributes.
56        """
57        self._logger = logging.getLogger(__name__)
58        self._name = name
59        self._id = uid
60        self._orbita_type = orbita_type
61        self._stub = stub
62        self._part = part
63
64        self._compliant: bool
65
66        self._joints: Dict[str, Any] = {}
67        self._axis_name_by_joint: Dict[Any, str] = {}
68        self._motors: Dict[str, OrbitaMotor] = {}
69        self._outgoing_goal_positions: Dict[str, float] = {}
70        self._axis: Dict[str, OrbitaAxis] = {}
71
72        self._error_status: Optional[str] = None
73
74        self._thread_check_position: Optional[Thread] = None
75        self._cancel_check = False

Initialize the Orbita actuator with its common attributes.

Arguments:
  • uid: The unique identifier for the actuator.
  • name: The name of the actuator.
  • orbita_type: Specifies the type of Orbita, either "2d" or "3d".
  • stub: The gRPC stub used for communicating with the actuator, which can be an instance of either Orbita2dServiceStub or Orbita3dServiceStub.
  • part: The parent part to which the Orbita belongs, used for referencing the part's attributes.
@abstractmethod
def set_speed_limits(self, speed_limit: float | int) -> None:
100    @abstractmethod
101    def set_speed_limits(self, speed_limit: float | int) -> None:
102        """Set the speed limits for the Orbita actuator.
103
104        This method defines the maximum speed for the joints, specified as a percentage
105        of the maximum speed capability.
106
107        Args:
108            speed_limit: The desired speed limit as a percentage (0-100).
109
110        Raises:
111            TypeError: If the provided speed_limit is not a float or int.
112            ValueError: If the provided speed_limit is outside the range [0, 100].
113        """
114        if not isinstance(speed_limit, float | int):
115            raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}")
116        if not (0 <= speed_limit <= 100):
117            raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")

Set the speed limits for the Orbita actuator.

This method defines the maximum speed for the joints, specified as a percentage of the maximum speed capability.

Arguments:
  • speed_limit: The desired speed limit as a percentage (0-100).
Raises:
  • TypeError: If the provided speed_limit is not a float or int.
  • ValueError: If the provided speed_limit is outside the range [0, 100].
@abstractmethod
def set_torque_limits(self, torque_limit: float | int) -> None:
119    @abstractmethod
120    def set_torque_limits(self, torque_limit: float | int) -> None:
121        """Set the torque limits for the Orbita actuator.
122
123        This method defines the maximum torque for the joints, specified as a percentage
124        of the maximum torque capability.
125
126        Args:
127            torque_limit: The desired torque limit as a percentage (0-100).
128
129        Raises:
130            TypeError: If the provided torque_limit is not a float or int.
131            ValueError: If the provided torque_limit is outside the range [0, 100].
132        """
133        if not isinstance(torque_limit, float | int):
134            raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}")
135        if not (0 <= torque_limit <= 100):
136            raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")

Set the torque limits for the Orbita actuator.

This method defines the maximum torque for the joints, specified as a percentage of the maximum torque capability.

Arguments:
  • torque_limit: The desired torque limit as a percentage (0-100).
Raises:
  • TypeError: If the provided torque_limit is not a float or int.
  • ValueError: If the provided torque_limit is outside the range [0, 100].
def get_speed_limits(self) -> Dict[str, float]:
147    def get_speed_limits(self) -> Dict[str, float]:
148        """Get the speed limits for all motors of the actuator.
149
150        The speed limits are expressed as percentages of the maximum speed for each motor.
151
152        Returns:
153            A dictionary where each key is the motor name and the value is the speed limit
154            percentage (0-100) for that motor. Motor names are of format "motor_{n}".
155        """
156        return {motor_name: m.speed_limit for motor_name, m in self._motors.items()}

Get the speed limits for all motors of the actuator.

The speed limits are expressed as percentages of the maximum speed for each motor.

Returns:

A dictionary where each key is the motor name and the value is the speed limit percentage (0-100) for that motor. Motor names are of format "motor_{n}".

def get_torque_limits(self) -> Dict[str, float]:
158    def get_torque_limits(self) -> Dict[str, float]:
159        """Get the torque limits for all motors of the actuator.
160
161        The torque limits are expressed as percentages of the maximum torque for each motor.
162
163        Returns:
164            A dictionary where each key is the motor name and the value is the torque limit
165            percentage (0-100) for that motor. Motor names are of format "motor_{n}".
166        """
167        return {motor_name: m.torque_limit for motor_name, m in self._motors.items()}

Get the torque limits for all motors of the actuator.

The torque limits are expressed as percentages of the maximum torque for each motor.

Returns:

A dictionary where each key is the motor name and the value is the torque limit percentage (0-100) for that motor. Motor names are of format "motor_{n}".

def get_pids(self) -> Dict[str, Tuple[float, float, float]]:
169    def get_pids(self) -> Dict[str, Tuple[float, float, float]]:
170        """Get the PID values for all motors of the actuator.
171
172        Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned.
173
174        Returns:
175            A dictionary where each key is the motor name and the value is a tuple containing
176            the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}".
177        """
178        return {motor_name: m.pid for motor_name, m in self._motors.items()}

Get the PID values for all motors of the actuator.

Each motor's PID controller parameters (Proportional, Integral, Derivative) are returned.

Returns:

A dictionary where each key is the motor name and the value is a tuple containing the PID values (P, I, D) for that motor. Motor names are of format "motor_{n}".

def turn_on(self) -> None:
180    def turn_on(self) -> None:
181        """Turn on all motors of the actuator."""
182        self._set_compliant(False)

Turn on all motors of the actuator.

def turn_off(self) -> None:
184    def turn_off(self) -> None:
185        """Turn off all motors of the actuator."""
186        self._set_compliant(True)

Turn off all motors of the actuator.

def is_on(self) -> bool:
188    def is_on(self) -> bool:
189        """Check if the actuator is currently stiff.
190
191        Returns:
192            `True` if the actuator is stiff (not compliant), `False` otherwise.
193        """
194        return not self._compliant

Check if the actuator is currently stiff.

Returns:

True if the actuator is stiff (not compliant), False otherwise.

def is_off(self) -> bool:
196    def is_off(self) -> bool:
197        """Check if the actuator is currently compliant.
198
199        Returns:
200            `True` if the actuator is compliant (not stiff), `False` otherwise.
201        """
202        return self._compliant

Check if the actuator is currently compliant.

Returns:

True if the actuator is compliant (not stiff), False otherwise.

temperatures: Dict[str, float]
204    @property
205    def temperatures(self) -> Dict[str, float]:
206        """Get the current temperatures of all the motors in the actuator.
207
208        Returns:
209            A dictionary where each key is the motor name and the value is the
210            current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}".
211        """
212        return {motor_name: m.temperature for motor_name, m in self._motors.items()}

Get the current temperatures of all the motors in the actuator.

Returns:

A dictionary where each key is the motor name and the value is the current temperature of the motor in degrees Celsius. Motor names are of format "motor_{n}".

@abstractmethod
def send_goal_positions(self, check_positions: bool = False) -> None:
247    @abstractmethod
248    def send_goal_positions(self, check_positions: bool = False) -> None:
249        """Send the goal positions to the actuator.
250
251        This method is abstract and should be implemented in derived classes to
252        send the specified goal positions to the actuator's joints.
253
254        Args:
255            check_positions: A boolean value indicating whether to check the positions of the joints
256                after sending the goal positions. If `True`, a background thread is started to monitor
257                the joint positions relative to their last goal positions.
258                Default is `True`.
259        """
260        pass  # pragma: no cover

Send the goal positions to the actuator.

This method is abstract and should be implemented in derived classes to send the specified goal positions to the actuator's joints.

Arguments:
  • check_positions: A boolean value indicating whether to check the positions of the joints after sending the goal positions. If True, a background thread is started to monitor the joint positions relative to their last goal positions. Default is True.
status: Optional[str]
318    @property
319    def status(self) -> Optional[str]:
320        """Get the current audit status of the actuator.
321
322        Returns:
323            The audit status as a string, representing the latest error or status
324            message, or `None` if there is no error.
325        """
326        return self._error_status

Get the current audit status of the actuator.

Returns:

The audit status as a string, representing the latest error or status message, or None if there is no error.