reachy2_sdk.components.antenna

Reachy Antenna module.

Handles all specific methods to Antennas.

  1"""Reachy Antenna module.
  2
  3Handles all specific methods to Antennas.
  4"""
  5
  6import logging
  7import time
  8from threading import Thread
  9from typing import Any, Dict, List, Optional
 10
 11import numpy as np
 12from google.protobuf.wrappers_pb2 import FloatValue
 13from grpc import Channel
 14from reachy2_sdk_api.component_pb2 import ComponentId
 15from reachy2_sdk_api.dynamixel_motor_pb2 import DynamixelMotor as DynamixelMotor_proto
 16from reachy2_sdk_api.dynamixel_motor_pb2 import (
 17    DynamixelMotorsCommand,
 18    DynamixelMotorState,
 19    DynamixelMotorStatus,
 20)
 21from reachy2_sdk_api.goto_pb2 import GoToId, GoToRequest, JointsGoal
 22from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
 23from reachy2_sdk_api.head_pb2 import AntennaJointGoal
 24
 25from ..dynamixel.dynamixel_motor import DynamixelMotor
 26from ..parts.part import Part
 27from ..utils.utils import get_grpc_interpolation_mode
 28from .goto_based_component import IGoToBasedComponent
 29
 30
 31class Antenna(IGoToBasedComponent):
 32    """The Antenna class represents any antenna of the robot's head."""
 33
 34    def __init__(
 35        self,
 36        uid: int,
 37        name: str,
 38        initial_state: DynamixelMotorState,
 39        grpc_channel: Channel,
 40        goto_stub: GoToServiceStub,
 41        part: Part,
 42    ):
 43        """Initialize the Antenna with its initial state and configuration.
 44
 45        Args:
 46            uid: The unique identifier of the component.
 47            name: The name of the joint.
 48            initial_state: A dictionary containing the initial state of the joint, with
 49                each entry representing a specific parameter of the joint (e.g., present position).
 50            grpc_channel: The gRPC channel used to communicate with the DynamixelMotor service.
 51            goto_stub: The gRPC stub for controlling goto movements.
 52            part: The part to which this joint belongs.
 53        """
 54        self._logger = logging.getLogger(__name__)
 55        IGoToBasedComponent.__init__(self, ComponentId(id=uid, name=name), goto_stub)
 56        self._part = part
 57        self._error_status: Optional[str] = None
 58        self._joints: Dict[str, DynamixelMotor] = {}
 59        if name == "antenna_left":
 60            self._name = "l_antenna"
 61        else:
 62            self._name = "r_antenna"
 63        self._joints[self._name] = DynamixelMotor(uid, name, initial_state, grpc_channel)
 64
 65        self._thread_check_position: Optional[Thread] = None
 66        self._cancel_check = False
 67
 68    def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None:
 69        """Check the validity of the parameters for the `goto` method.
 70
 71        Args:
 72            duration: The time in seconds for the movement to be completed.
 73            target: The target position, either a float or int.
 74
 75        Raises:
 76            TypeError: If the target is not a list or a quaternion.
 77            ValueError: If the target list has a length other than 3.
 78            ValueError: If the duration is set to 0.
 79        """
 80        if not (isinstance(target, float) or isinstance(target, int)):
 81            raise TypeError(f"Antenna's goto target must be either a float or int, got {type(target)}.")
 82
 83        elif duration == 0:
 84            raise ValueError("duration cannot be set to 0.")
 85        elif duration is not None and duration < 0:
 86            raise ValueError("duration cannot be negative.")
 87
 88    def goto_posture(
 89        self,
 90        common_posture: str = "default",
 91        duration: float = 2,
 92        wait: bool = False,
 93        wait_for_goto_end: bool = True,
 94        interpolation_mode: str = "minimum_jerk",
 95    ) -> GoToId:
 96        """Send the antenna to standard positions within the specified duration.
 97
 98        The default posture sets the antenna is 0.0.
 99
100        Args:
101            common_posture: The standard positions to which all joints will be sent.
102                It can be 'default' or 'elbow_90'. Defaults to 'default'.
103            duration: The time duration in seconds for the robot to move to the specified posture.
104                Defaults to 2.
105            wait: Determines whether the program should wait for the movement to finish before
106                returning. If set to `True`, the program waits for the movement to complete before continuing
107                execution. Defaults to `False`.
108            wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
109                only after all previous commands in the queue have been executed. If set to `False`, the program
110                will cancel all executing moves and queues. Defaults to `True`.
111            interpolation_mode: The type of interpolation used when moving the arm's joints.
112                Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
113
114        Returns:
115            The unique GoToId associated with the movement command.
116        """
117        if not wait_for_goto_end:
118            self.cancel_all_goto()
119        if self.is_on():
120            return self.goto(0, duration, wait, interpolation_mode)
121        else:
122            self._logger.warning(f"{self._name} is off. No command sent.")
123        return GoToId(id=-1)
124
125    def goto(
126        self,
127        target: float,
128        duration: float = 2.0,
129        wait: bool = False,
130        interpolation_mode: str = "minimum_jerk",
131        degrees: bool = True,
132    ) -> GoToId:
133        """Send the antenna to a specified goal position.
134
135        Args:
136            target: The desired goal position for the antenna.
137            duration: The time in seconds for the movement to be completed. Defaults to 2.
138            wait: If True, the function waits until the movement is completed before returning.
139                    Defaults to False.
140            interpolation_mode: The interpolation method to be used. It can be either "minimum_jerk"
141                    or "linear". Defaults to "minimum_jerk".
142            degrees: If True, the joint value in the `target` argument is treated as degrees.
143                    Defaults to True.
144
145        Raises:
146            TypeError : If the input type for `target` is invalid
147            ValueError: If the `duration` is set to 0.
148
149        Returns:
150            GoToId: The unique identifier for the movement command.
151        """
152        if not self.is_on():
153            self._logger.warning(f"head.{self._name} is off. No command sent.")
154            return GoToId(id=-1)
155
156        self._check_goto_parameters(target, duration)
157
158        if degrees:
159            target = np.deg2rad(target)
160
161        request = GoToRequest(
162            joints_goal=JointsGoal(
163                antenna_joint_goal=AntennaJointGoal(
164                    id=self._part._part_id,
165                    antenna=DynamixelMotor_proto(
166                        id=ComponentId(id=self._joints[self._name]._id, name=self._joints[self._name]._name),
167                    ),
168                    joint_goal=FloatValue(value=target),
169                    duration=FloatValue(value=duration),
170                )
171            ),
172            interpolation_mode=get_grpc_interpolation_mode(interpolation_mode),
173        )
174
175        response = self._goto_stub.GoToJoints(request)
176
177        if response.id == -1:
178            self._logger.error(f"Position {target} was not reachable. No command sent.")
179        elif wait:
180            self._wait_goto(response, duration)
181        return response
182
183    def __repr__(self) -> str:
184        """Clean representation of the Antenna only joint (DynamixelMotor)."""
185        s = "\n\t".join([str(joint) for joint in self._joints.values()])
186        return f"""<Antenna on={self.is_on()} joints=\n\t{
187            s
188        }\n>"""
189
190    def turn_on(self) -> None:
191        """Turn on the antenna's motor."""
192        self._joints[self._name].turn_on()
193
194    def turn_off(self) -> None:
195        """Turn off the antenna's motor."""
196        self._joints[self._name].turn_off()
197
198    def is_on(self) -> bool:
199        """Check if the antenna is currently stiff.
200
201        Returns:
202            `True` if the antenna's motor is stiff (not compliant), `False` otherwise.
203        """
204        return bool(self._joints[self._name].is_on())
205
206    def is_off(self) -> bool:
207        """Check if the antenna is currently stiff.
208
209        Returns:
210            `True` if the antenna's motor is stiff (not compliant), `False` otherwise.
211        """
212        return not bool(self._joints[self._name].is_on())
213
214    @property
215    def present_position(self) -> float:
216        """Get the present position of the joint in degrees."""
217        return float(self._joints[self._name].present_position)
218
219    @property
220    def goal_position(self) -> float:
221        """Get the goal position of the joint in degrees."""
222        return float(self._joints[self._name].goal_position)
223
224    @goal_position.setter
225    def goal_position(self, value: float | int) -> None:
226        """Set the goal position of the joint in degrees.
227
228        The goal position is not send to the joint immediately, it is stored locally until the `send_goal_positions` method
229        is called.
230
231        Args:
232            value: The goal position to set, specified as a float or int.
233
234        Raises:
235            TypeError: If the provided value is not a float or int.
236        """
237        self._joints[self._name].goal_position = value
238
239    def _get_goal_positions_message(self) -> Optional[DynamixelMotorsCommand]:
240        """Get the Orbita2dsCommand message to send the goal positions to the actuator."""
241        return self._joints[self._name]._get_goal_positions_message()
242
243    def _clean_outgoing_goal_positions(self) -> None:
244        """Clean the outgoing goal positions."""
245        self._joints[self._name]._clean_outgoing_goal_positions()
246
247    def _post_send_goal_positions(self) -> None:
248        """Start a background thread to check the goal positions after sending them.
249
250        This method stops any ongoing position check thread and starts a new thread
251        to monitor the current positions of the joints relative to their last goal positions.
252        """
253        self._cancel_check = True
254        if self._thread_check_position is not None and self._thread_check_position.is_alive():
255            self._thread_check_position.join()
256        self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True)
257        self._thread_check_position.start()
258
259    def _check_goal_positions(self) -> None:
260        """Monitor the joint positions to check if they reach the specified goals.
261
262        This method checks the current positions of the joints and compares them to
263        the goal positions. If a position is significantly different from the goal after 1 second,
264        a warning is logged indicating that the position may be unreachable.
265        """
266        self._cancel_check = False
267        t1 = time.time()
268        while time.time() - t1 < 1:
269            time.sleep(0.0001)
270            if self._cancel_check:
271                # in case of multiple send_goal_positions we'll check the next call
272                return
273
274        # precision is low we are looking for unreachable positions
275        if not np.isclose(self._joints[self._name].present_position, self._joints[self._name].goal_position, atol=1):
276            self._logger.warning(
277                f"Required goal position ({round(self._joints[self._name].goal_position, 2)}) for {self._name} is unreachable."
278                f"\nCurrent position is ({round(self._joints[self._name].present_position, 2)})."
279            )
280
281    def send_goal_positions(self, check_positions: bool = False) -> None:
282        """Send goal positions to the motor.
283
284        If goal positions have been specified, sends them to the motor.
285        Args :
286            check_positions: A boolean indicating whether to check the positions after sending the command.
287                Defaults to True.
288        """
289        if self._joints[self._name]._outgoing_goal_position is not None:
290            if self.is_off():
291                self._logger.warning(f"{self._name} is off. Command not sent.")
292                return
293            self._joints[self._name].send_goal_positions(check_positions)
294
295    def set_speed_limits(self, speed_limit: float | int) -> None:
296        """Set the speed limit as a percentage of the maximum speed the motor.
297
298        Args:
299            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
300                specified as a float or int.
301        """
302        self._joints[self._name].set_speed_limits(speed_limit)
303
304    def _update_with(self, new_state: DynamixelMotorState) -> None:
305        """Update the present and goal positions of the joint with new state values.
306
307        Args:
308            new_state: A dictionary containing the new state values for the joint. The keys should include
309                "present_position" and "goal_position", with corresponding FloatValue objects as values.
310        """
311        self._joints[self._name]._update_with(new_state)
312
313    @property
314    def status(self) -> Optional[str]:
315        """Get the current audit status of the actuator.
316
317        Returns:
318            The audit status as a string, representing the latest error or status
319            message, or `None` if there is no error.
320        """
321        pass
322
323    def _update_audit_status(self, new_status: DynamixelMotorStatus) -> None:
324        """Update the audit status based on the new status data.
325
326        Args:
327            new_status: The new status data, as a DynamixelMotorStatus object, containing error details.
328        """
329        pass
 32class Antenna(IGoToBasedComponent):
 33    """The Antenna class represents any antenna of the robot's head."""
 34
 35    def __init__(
 36        self,
 37        uid: int,
 38        name: str,
 39        initial_state: DynamixelMotorState,
 40        grpc_channel: Channel,
 41        goto_stub: GoToServiceStub,
 42        part: Part,
 43    ):
 44        """Initialize the Antenna with its initial state and configuration.
 45
 46        Args:
 47            uid: The unique identifier of the component.
 48            name: The name of the joint.
 49            initial_state: A dictionary containing the initial state of the joint, with
 50                each entry representing a specific parameter of the joint (e.g., present position).
 51            grpc_channel: The gRPC channel used to communicate with the DynamixelMotor service.
 52            goto_stub: The gRPC stub for controlling goto movements.
 53            part: The part to which this joint belongs.
 54        """
 55        self._logger = logging.getLogger(__name__)
 56        IGoToBasedComponent.__init__(self, ComponentId(id=uid, name=name), goto_stub)
 57        self._part = part
 58        self._error_status: Optional[str] = None
 59        self._joints: Dict[str, DynamixelMotor] = {}
 60        if name == "antenna_left":
 61            self._name = "l_antenna"
 62        else:
 63            self._name = "r_antenna"
 64        self._joints[self._name] = DynamixelMotor(uid, name, initial_state, grpc_channel)
 65
 66        self._thread_check_position: Optional[Thread] = None
 67        self._cancel_check = False
 68
 69    def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None:
 70        """Check the validity of the parameters for the `goto` method.
 71
 72        Args:
 73            duration: The time in seconds for the movement to be completed.
 74            target: The target position, either a float or int.
 75
 76        Raises:
 77            TypeError: If the target is not a list or a quaternion.
 78            ValueError: If the target list has a length other than 3.
 79            ValueError: If the duration is set to 0.
 80        """
 81        if not (isinstance(target, float) or isinstance(target, int)):
 82            raise TypeError(f"Antenna's goto target must be either a float or int, got {type(target)}.")
 83
 84        elif duration == 0:
 85            raise ValueError("duration cannot be set to 0.")
 86        elif duration is not None and duration < 0:
 87            raise ValueError("duration cannot be negative.")
 88
 89    def goto_posture(
 90        self,
 91        common_posture: str = "default",
 92        duration: float = 2,
 93        wait: bool = False,
 94        wait_for_goto_end: bool = True,
 95        interpolation_mode: str = "minimum_jerk",
 96    ) -> GoToId:
 97        """Send the antenna to standard positions within the specified duration.
 98
 99        The default posture sets the antenna is 0.0.
100
101        Args:
102            common_posture: The standard positions to which all joints will be sent.
103                It can be 'default' or 'elbow_90'. Defaults to 'default'.
104            duration: The time duration in seconds for the robot to move to the specified posture.
105                Defaults to 2.
106            wait: Determines whether the program should wait for the movement to finish before
107                returning. If set to `True`, the program waits for the movement to complete before continuing
108                execution. Defaults to `False`.
109            wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
110                only after all previous commands in the queue have been executed. If set to `False`, the program
111                will cancel all executing moves and queues. Defaults to `True`.
112            interpolation_mode: The type of interpolation used when moving the arm's joints.
113                Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
114
115        Returns:
116            The unique GoToId associated with the movement command.
117        """
118        if not wait_for_goto_end:
119            self.cancel_all_goto()
120        if self.is_on():
121            return self.goto(0, duration, wait, interpolation_mode)
122        else:
123            self._logger.warning(f"{self._name} is off. No command sent.")
124        return GoToId(id=-1)
125
126    def goto(
127        self,
128        target: float,
129        duration: float = 2.0,
130        wait: bool = False,
131        interpolation_mode: str = "minimum_jerk",
132        degrees: bool = True,
133    ) -> GoToId:
134        """Send the antenna to a specified goal position.
135
136        Args:
137            target: The desired goal position for the antenna.
138            duration: The time in seconds for the movement to be completed. Defaults to 2.
139            wait: If True, the function waits until the movement is completed before returning.
140                    Defaults to False.
141            interpolation_mode: The interpolation method to be used. It can be either "minimum_jerk"
142                    or "linear". Defaults to "minimum_jerk".
143            degrees: If True, the joint value in the `target` argument is treated as degrees.
144                    Defaults to True.
145
146        Raises:
147            TypeError : If the input type for `target` is invalid
148            ValueError: If the `duration` is set to 0.
149
150        Returns:
151            GoToId: The unique identifier for the movement command.
152        """
153        if not self.is_on():
154            self._logger.warning(f"head.{self._name} is off. No command sent.")
155            return GoToId(id=-1)
156
157        self._check_goto_parameters(target, duration)
158
159        if degrees:
160            target = np.deg2rad(target)
161
162        request = GoToRequest(
163            joints_goal=JointsGoal(
164                antenna_joint_goal=AntennaJointGoal(
165                    id=self._part._part_id,
166                    antenna=DynamixelMotor_proto(
167                        id=ComponentId(id=self._joints[self._name]._id, name=self._joints[self._name]._name),
168                    ),
169                    joint_goal=FloatValue(value=target),
170                    duration=FloatValue(value=duration),
171                )
172            ),
173            interpolation_mode=get_grpc_interpolation_mode(interpolation_mode),
174        )
175
176        response = self._goto_stub.GoToJoints(request)
177
178        if response.id == -1:
179            self._logger.error(f"Position {target} was not reachable. No command sent.")
180        elif wait:
181            self._wait_goto(response, duration)
182        return response
183
184    def __repr__(self) -> str:
185        """Clean representation of the Antenna only joint (DynamixelMotor)."""
186        s = "\n\t".join([str(joint) for joint in self._joints.values()])
187        return f"""<Antenna on={self.is_on()} joints=\n\t{
188            s
189        }\n>"""
190
191    def turn_on(self) -> None:
192        """Turn on the antenna's motor."""
193        self._joints[self._name].turn_on()
194
195    def turn_off(self) -> None:
196        """Turn off the antenna's motor."""
197        self._joints[self._name].turn_off()
198
199    def is_on(self) -> bool:
200        """Check if the antenna is currently stiff.
201
202        Returns:
203            `True` if the antenna's motor is stiff (not compliant), `False` otherwise.
204        """
205        return bool(self._joints[self._name].is_on())
206
207    def is_off(self) -> bool:
208        """Check if the antenna is currently stiff.
209
210        Returns:
211            `True` if the antenna's motor is stiff (not compliant), `False` otherwise.
212        """
213        return not bool(self._joints[self._name].is_on())
214
215    @property
216    def present_position(self) -> float:
217        """Get the present position of the joint in degrees."""
218        return float(self._joints[self._name].present_position)
219
220    @property
221    def goal_position(self) -> float:
222        """Get the goal position of the joint in degrees."""
223        return float(self._joints[self._name].goal_position)
224
225    @goal_position.setter
226    def goal_position(self, value: float | int) -> None:
227        """Set the goal position of the joint in degrees.
228
229        The goal position is not send to the joint immediately, it is stored locally until the `send_goal_positions` method
230        is called.
231
232        Args:
233            value: The goal position to set, specified as a float or int.
234
235        Raises:
236            TypeError: If the provided value is not a float or int.
237        """
238        self._joints[self._name].goal_position = value
239
240    def _get_goal_positions_message(self) -> Optional[DynamixelMotorsCommand]:
241        """Get the Orbita2dsCommand message to send the goal positions to the actuator."""
242        return self._joints[self._name]._get_goal_positions_message()
243
244    def _clean_outgoing_goal_positions(self) -> None:
245        """Clean the outgoing goal positions."""
246        self._joints[self._name]._clean_outgoing_goal_positions()
247
248    def _post_send_goal_positions(self) -> None:
249        """Start a background thread to check the goal positions after sending them.
250
251        This method stops any ongoing position check thread and starts a new thread
252        to monitor the current positions of the joints relative to their last goal positions.
253        """
254        self._cancel_check = True
255        if self._thread_check_position is not None and self._thread_check_position.is_alive():
256            self._thread_check_position.join()
257        self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True)
258        self._thread_check_position.start()
259
260    def _check_goal_positions(self) -> None:
261        """Monitor the joint positions to check if they reach the specified goals.
262
263        This method checks the current positions of the joints and compares them to
264        the goal positions. If a position is significantly different from the goal after 1 second,
265        a warning is logged indicating that the position may be unreachable.
266        """
267        self._cancel_check = False
268        t1 = time.time()
269        while time.time() - t1 < 1:
270            time.sleep(0.0001)
271            if self._cancel_check:
272                # in case of multiple send_goal_positions we'll check the next call
273                return
274
275        # precision is low we are looking for unreachable positions
276        if not np.isclose(self._joints[self._name].present_position, self._joints[self._name].goal_position, atol=1):
277            self._logger.warning(
278                f"Required goal position ({round(self._joints[self._name].goal_position, 2)}) for {self._name} is unreachable."
279                f"\nCurrent position is ({round(self._joints[self._name].present_position, 2)})."
280            )
281
282    def send_goal_positions(self, check_positions: bool = False) -> None:
283        """Send goal positions to the motor.
284
285        If goal positions have been specified, sends them to the motor.
286        Args :
287            check_positions: A boolean indicating whether to check the positions after sending the command.
288                Defaults to True.
289        """
290        if self._joints[self._name]._outgoing_goal_position is not None:
291            if self.is_off():
292                self._logger.warning(f"{self._name} is off. Command not sent.")
293                return
294            self._joints[self._name].send_goal_positions(check_positions)
295
296    def set_speed_limits(self, speed_limit: float | int) -> None:
297        """Set the speed limit as a percentage of the maximum speed the motor.
298
299        Args:
300            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
301                specified as a float or int.
302        """
303        self._joints[self._name].set_speed_limits(speed_limit)
304
305    def _update_with(self, new_state: DynamixelMotorState) -> None:
306        """Update the present and goal positions of the joint with new state values.
307
308        Args:
309            new_state: A dictionary containing the new state values for the joint. The keys should include
310                "present_position" and "goal_position", with corresponding FloatValue objects as values.
311        """
312        self._joints[self._name]._update_with(new_state)
313
314    @property
315    def status(self) -> Optional[str]:
316        """Get the current audit status of the actuator.
317
318        Returns:
319            The audit status as a string, representing the latest error or status
320            message, or `None` if there is no error.
321        """
322        pass
323
324    def _update_audit_status(self, new_status: DynamixelMotorStatus) -> None:
325        """Update the audit status based on the new status data.
326
327        Args:
328            new_status: The new status data, as a DynamixelMotorStatus object, containing error details.
329        """
330        pass

The Antenna class represents any antenna of the robot's head.

Antenna( uid: int, name: str, initial_state: dynamixel_motor_pb2.DynamixelMotorState, grpc_channel: grpc.Channel, goto_stub: reachy2_sdk_api.goto_pb2_grpc.GoToServiceStub, part: reachy2_sdk.parts.part.Part)
35    def __init__(
36        self,
37        uid: int,
38        name: str,
39        initial_state: DynamixelMotorState,
40        grpc_channel: Channel,
41        goto_stub: GoToServiceStub,
42        part: Part,
43    ):
44        """Initialize the Antenna with its initial state and configuration.
45
46        Args:
47            uid: The unique identifier of the component.
48            name: The name of the joint.
49            initial_state: A dictionary containing the initial state of the joint, with
50                each entry representing a specific parameter of the joint (e.g., present position).
51            grpc_channel: The gRPC channel used to communicate with the DynamixelMotor service.
52            goto_stub: The gRPC stub for controlling goto movements.
53            part: The part to which this joint belongs.
54        """
55        self._logger = logging.getLogger(__name__)
56        IGoToBasedComponent.__init__(self, ComponentId(id=uid, name=name), goto_stub)
57        self._part = part
58        self._error_status: Optional[str] = None
59        self._joints: Dict[str, DynamixelMotor] = {}
60        if name == "antenna_left":
61            self._name = "l_antenna"
62        else:
63            self._name = "r_antenna"
64        self._joints[self._name] = DynamixelMotor(uid, name, initial_state, grpc_channel)
65
66        self._thread_check_position: Optional[Thread] = None
67        self._cancel_check = False

Initialize the Antenna with its initial state and configuration.

Arguments:
  • uid: The unique identifier of the component.
  • name: The name of the joint.
  • initial_state: A dictionary containing the initial state of the joint, with each entry representing a specific parameter of the joint (e.g., present position).
  • grpc_channel: The gRPC channel used to communicate with the DynamixelMotor service.
  • goto_stub: The gRPC stub for controlling goto movements.
  • part: The part to which this joint belongs.
def goto_posture( self, common_posture: str = 'default', duration: float = 2, wait: bool = False, wait_for_goto_end: bool = True, interpolation_mode: str = 'minimum_jerk') -> goto_pb2.GoToId:
 89    def goto_posture(
 90        self,
 91        common_posture: str = "default",
 92        duration: float = 2,
 93        wait: bool = False,
 94        wait_for_goto_end: bool = True,
 95        interpolation_mode: str = "minimum_jerk",
 96    ) -> GoToId:
 97        """Send the antenna to standard positions within the specified duration.
 98
 99        The default posture sets the antenna is 0.0.
100
101        Args:
102            common_posture: The standard positions to which all joints will be sent.
103                It can be 'default' or 'elbow_90'. Defaults to 'default'.
104            duration: The time duration in seconds for the robot to move to the specified posture.
105                Defaults to 2.
106            wait: Determines whether the program should wait for the movement to finish before
107                returning. If set to `True`, the program waits for the movement to complete before continuing
108                execution. Defaults to `False`.
109            wait_for_goto_end: Specifies whether commands will be sent to a part immediately or
110                only after all previous commands in the queue have been executed. If set to `False`, the program
111                will cancel all executing moves and queues. Defaults to `True`.
112            interpolation_mode: The type of interpolation used when moving the arm's joints.
113                Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
114
115        Returns:
116            The unique GoToId associated with the movement command.
117        """
118        if not wait_for_goto_end:
119            self.cancel_all_goto()
120        if self.is_on():
121            return self.goto(0, duration, wait, interpolation_mode)
122        else:
123            self._logger.warning(f"{self._name} is off. No command sent.")
124        return GoToId(id=-1)

Send the antenna to standard positions within the specified duration.

The default posture sets the antenna is 0.0.

Arguments:
  • common_posture: The standard positions to which all joints will be sent. It can be 'default' or 'elbow_90'. Defaults to 'default'.
  • duration: The time duration in seconds for the robot to move to the specified posture. Defaults to 2.
  • wait: Determines whether the program should wait for the movement to finish before returning. If set to True, the program waits for the movement to complete before continuing execution. Defaults to False.
  • wait_for_goto_end: Specifies whether commands will be sent to a part immediately or only after all previous commands in the queue have been executed. If set to False, the program will cancel all executing moves and queues. Defaults to True.
  • interpolation_mode: The type of interpolation used when moving the arm's joints. Can be 'minimum_jerk' or 'linear'. Defaults to 'minimum_jerk'.
Returns:

The unique GoToId associated with the movement command.

def goto( self, target: float, duration: float = 2.0, wait: bool = False, interpolation_mode: str = 'minimum_jerk', degrees: bool = True) -> goto_pb2.GoToId:
126    def goto(
127        self,
128        target: float,
129        duration: float = 2.0,
130        wait: bool = False,
131        interpolation_mode: str = "minimum_jerk",
132        degrees: bool = True,
133    ) -> GoToId:
134        """Send the antenna to a specified goal position.
135
136        Args:
137            target: The desired goal position for the antenna.
138            duration: The time in seconds for the movement to be completed. Defaults to 2.
139            wait: If True, the function waits until the movement is completed before returning.
140                    Defaults to False.
141            interpolation_mode: The interpolation method to be used. It can be either "minimum_jerk"
142                    or "linear". Defaults to "minimum_jerk".
143            degrees: If True, the joint value in the `target` argument is treated as degrees.
144                    Defaults to True.
145
146        Raises:
147            TypeError : If the input type for `target` is invalid
148            ValueError: If the `duration` is set to 0.
149
150        Returns:
151            GoToId: The unique identifier for the movement command.
152        """
153        if not self.is_on():
154            self._logger.warning(f"head.{self._name} is off. No command sent.")
155            return GoToId(id=-1)
156
157        self._check_goto_parameters(target, duration)
158
159        if degrees:
160            target = np.deg2rad(target)
161
162        request = GoToRequest(
163            joints_goal=JointsGoal(
164                antenna_joint_goal=AntennaJointGoal(
165                    id=self._part._part_id,
166                    antenna=DynamixelMotor_proto(
167                        id=ComponentId(id=self._joints[self._name]._id, name=self._joints[self._name]._name),
168                    ),
169                    joint_goal=FloatValue(value=target),
170                    duration=FloatValue(value=duration),
171                )
172            ),
173            interpolation_mode=get_grpc_interpolation_mode(interpolation_mode),
174        )
175
176        response = self._goto_stub.GoToJoints(request)
177
178        if response.id == -1:
179            self._logger.error(f"Position {target} was not reachable. No command sent.")
180        elif wait:
181            self._wait_goto(response, duration)
182        return response

Send the antenna to a specified goal position.

Arguments:
  • target: The desired goal position for the antenna.
  • duration: The time in seconds for the movement to be completed. Defaults to 2.
  • wait: If True, the function waits until the movement is completed before returning. Defaults to False.
  • interpolation_mode: The interpolation method to be used. It can be either "minimum_jerk" or "linear". Defaults to "minimum_jerk".
  • degrees: If True, the joint value in the target argument is treated as degrees. Defaults to True.
Raises:
  • TypeError : If the input type for target is invalid
  • ValueError: If the duration is set to 0.
Returns:

GoToId: The unique identifier for the movement command.

def turn_on(self) -> None:
191    def turn_on(self) -> None:
192        """Turn on the antenna's motor."""
193        self._joints[self._name].turn_on()

Turn on the antenna's motor.

def turn_off(self) -> None:
195    def turn_off(self) -> None:
196        """Turn off the antenna's motor."""
197        self._joints[self._name].turn_off()

Turn off the antenna's motor.

def is_on(self) -> bool:
199    def is_on(self) -> bool:
200        """Check if the antenna is currently stiff.
201
202        Returns:
203            `True` if the antenna's motor is stiff (not compliant), `False` otherwise.
204        """
205        return bool(self._joints[self._name].is_on())

Check if the antenna is currently stiff.

Returns:

True if the antenna's motor is stiff (not compliant), False otherwise.

def is_off(self) -> bool:
207    def is_off(self) -> bool:
208        """Check if the antenna is currently stiff.
209
210        Returns:
211            `True` if the antenna's motor is stiff (not compliant), `False` otherwise.
212        """
213        return not bool(self._joints[self._name].is_on())

Check if the antenna is currently stiff.

Returns:

True if the antenna's motor is stiff (not compliant), False otherwise.

present_position: float
215    @property
216    def present_position(self) -> float:
217        """Get the present position of the joint in degrees."""
218        return float(self._joints[self._name].present_position)

Get the present position of the joint in degrees.

goal_position: float
220    @property
221    def goal_position(self) -> float:
222        """Get the goal position of the joint in degrees."""
223        return float(self._joints[self._name].goal_position)

Get the goal position of the joint in degrees.

def send_goal_positions(self, check_positions: bool = False) -> None:
282    def send_goal_positions(self, check_positions: bool = False) -> None:
283        """Send goal positions to the motor.
284
285        If goal positions have been specified, sends them to the motor.
286        Args :
287            check_positions: A boolean indicating whether to check the positions after sending the command.
288                Defaults to True.
289        """
290        if self._joints[self._name]._outgoing_goal_position is not None:
291            if self.is_off():
292                self._logger.warning(f"{self._name} is off. Command not sent.")
293                return
294            self._joints[self._name].send_goal_positions(check_positions)

Send goal positions to the motor.

If goal positions have been specified, sends them to the motor.

Args :

check_positions: A boolean indicating whether to check the positions after sending the command. Defaults to True.

def set_speed_limits(self, speed_limit: float | int) -> None:
296    def set_speed_limits(self, speed_limit: float | int) -> None:
297        """Set the speed limit as a percentage of the maximum speed the motor.
298
299        Args:
300            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
301                specified as a float or int.
302        """
303        self._joints[self._name].set_speed_limits(speed_limit)

Set the speed limit as a percentage of the maximum speed the motor.

Arguments:
  • speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be specified as a float or int.
status: Optional[str]
314    @property
315    def status(self) -> Optional[str]:
316        """Get the current audit status of the actuator.
317
318        Returns:
319            The audit status as a string, representing the latest error or status
320            message, or `None` if there is no error.
321        """
322        pass

Get the current audit status of the actuator.

Returns:

The audit status as a string, representing the latest error or status message, or None if there is no error.