reachy2_sdk.parts.head

Reachy Head module.

Handles all specific methods to a Head.

  1"""Reachy Head module.
  2
  3Handles all specific methods to a Head.
  4"""
  5
  6from typing import Any, Dict, List, Optional, overload
  7
  8import grpc
  9import numpy as np
 10from google.protobuf.wrappers_pb2 import FloatValue
 11from pyquaternion import Quaternion as pyQuat
 12from reachy2_sdk_api.goto_pb2 import (
 13    CartesianGoal,
 14    CustomJointGoal,
 15    GoToId,
 16    GoToRequest,
 17    JointsGoal,
 18)
 19from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
 20from reachy2_sdk_api.head_pb2 import CustomNeckJoints
 21from reachy2_sdk_api.head_pb2 import Head as Head_proto
 22from reachy2_sdk_api.head_pb2 import (
 23    HeadComponentsCommands,
 24    HeadState,
 25    HeadStatus,
 26    NeckCartesianGoal,
 27    NeckJointGoal,
 28    NeckJoints,
 29    NeckOrientation,
 30)
 31from reachy2_sdk_api.head_pb2_grpc import HeadServiceStub
 32from reachy2_sdk_api.kinematics_pb2 import ExtEulerAngles, Point, Quaternion, Rotation3d
 33
 34from ..components.antenna import Antenna
 35from ..orbita.orbita3d import Orbita3d
 36from ..utils.utils import get_grpc_interpolation_mode, quaternion_from_euler_angles
 37from .goto_based_part import IGoToBasedPart
 38from .joints_based_part import JointsBasedPart
 39
 40
 41class Head(JointsBasedPart, IGoToBasedPart):
 42    """Head class for controlling the head of Reachy.
 43
 44    The `Head` class manages the neck actuator and provides utilities for controlling the orientation
 45    of the head, such as moving to a specific posture or looking at a Cartesian point in Reachy's
 46    coordinate system.
 47
 48    Attributes:
 49        neck: An instance of `Orbita3d` representing the neck actuator of the head.
 50    """
 51
 52    def __init__(
 53        self,
 54        head_msg: Head_proto,
 55        initial_state: HeadState,
 56        grpc_channel: grpc.Channel,
 57        goto_stub: GoToServiceStub,
 58    ) -> None:
 59        """Initialize the Head component with its actuators.
 60
 61        Sets up the necessary attributes and configuration for the head, including the gRPC
 62        stubs and initial state.
 63
 64        Args:
 65            head_msg: The Head_proto object containing the configuration details for the head.
 66            initial_state: The initial state of the head, represented as a HeadState object.
 67            grpc_channel: The gRPC channel used to communicate with the head's gRPC service.
 68            goto_stub: The GoToServiceStub used to handle goto-based movements for the head.
 69        """
 70        JointsBasedPart.__init__(self, head_msg, grpc_channel, HeadServiceStub(grpc_channel))
 71        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
 72
 73        self._neck: Optional[Orbita3d] = None
 74        self._l_antenna: Optional[Antenna] = None
 75        self._r_antenna: Optional[Antenna] = None
 76        self._actuators: Dict[str, Orbita3d | Antenna] = {}
 77
 78        self._setup_head(head_msg, initial_state)
 79
 80    def _setup_head(self, head: Head_proto, initial_state: HeadState) -> None:
 81        """Set up the head with its actuators.
 82
 83        This method initializes the neck and antenna actuators for the head and sets their initial state.
 84
 85        Args:
 86            head: A Head_proto object containing the configuration details for the head.
 87            initial_state: A HeadState object representing the initial state of the head's actuators.
 88        """
 89        description = head.description
 90        if description.HasField("neck"):
 91            self._neck = Orbita3d(
 92                uid=description.neck.id.id,
 93                name=description.neck.id.name,
 94                initial_state=initial_state.neck_state,
 95                grpc_channel=self._grpc_channel,
 96                part=self,
 97                joints_position_order=[NeckJoints.ROLL, NeckJoints.PITCH, NeckJoints.YAW],
 98            )
 99            self._actuators["neck"] = self._neck
100        if description.HasField("l_antenna"):
101            self._l_antenna = Antenna(
102                uid=description.l_antenna.id.id,
103                name=description.l_antenna.id.name,
104                initial_state=initial_state.l_antenna_state,
105                grpc_channel=self._grpc_channel,
106                goto_stub=self._goto_stub,
107                part=self,
108            )
109            self._actuators["l_antenna"] = self._l_antenna
110        if description.HasField("r_antenna"):
111            self._r_antenna = Antenna(
112                uid=description.r_antenna.id.id,
113                name=description.r_antenna.id.name,
114                initial_state=initial_state.r_antenna_state,
115                grpc_channel=self._grpc_channel,
116                goto_stub=self._goto_stub,
117                part=self,
118            )
119            self._actuators["r_antenna"] = self._r_antenna
120
121    def __repr__(self) -> str:
122        """Clean representation of an Head."""
123        s = "\n\t".join([act_name + ": " + str(actuator) for act_name, actuator in self._actuators.items()])
124        return f"""<Head on={self.is_on()} actuators=\n\t{
125            s
126        }\n>"""
127
128    @property
129    def neck(self) -> Optional[Orbita3d]:
130        """Get the neck actuator of the head."""
131        return self._neck
132
133    @property
134    def l_antenna(self) -> Optional[Antenna]:
135        """Get the left antenna actuator of the head."""
136        return self._l_antenna
137
138    @property
139    def r_antenna(self) -> Optional[Antenna]:
140        """Get the right antenna actuator of the head."""
141        return self._r_antenna
142
143    def get_current_orientation(self) -> pyQuat:
144        """Get the current orientation of the head.
145
146        Returns:
147            The orientation of the head as a quaternion (w, x, y, z).
148        """
149        quat = self._stub.GetOrientation(self._part_id).q
150        return pyQuat(w=quat.w, x=quat.x, y=quat.y, z=quat.z)
151
152    def get_current_positions(self, degrees: bool = True) -> List[float]:
153        """Return the current joint positions of the neck.
154
155        Returns:
156            A list of the current neck joint positions in the order [roll, pitch, yaw].
157        """
158        if self.neck is None:
159            return []
160        roll = self.neck._joints["roll"].present_position
161        pitch = self.neck._joints["pitch"].present_position
162        yaw = self.neck._joints["yaw"].present_position
163        if degrees:
164            return [roll, pitch, yaw]
165        return [np.deg2rad(roll), np.deg2rad(pitch), np.deg2rad(yaw)]
166
167    @overload
168    def goto(
169        self,
170        target: List[float],
171        duration: float = 2.0,
172        wait: bool = False,
173        interpolation_mode: str = "minimum_jerk",
174        degrees: bool = True,
175    ) -> GoToId:
176        ...  # pragma: no cover
177
178    @overload
179    def goto(
180        self,
181        target: pyQuat,
182        duration: float = 2.0,
183        wait: bool = False,
184        interpolation_mode: str = "minimum_jerk",
185        degrees: bool = True,
186    ) -> GoToId:
187        ...  # pragma: no cover
188
189    def goto(
190        self,
191        target: Any,
192        duration: float = 2.0,
193        wait: bool = False,
194        interpolation_mode: str = "minimum_jerk",
195        degrees: bool = True,
196    ) -> GoToId:
197        """Send the neck to a specified orientation.
198
199        This method moves the neck either to a given roll-pitch-yaw (RPY) position or to a quaternion orientation.
200
201        Args:
202            target: The desired orientation for the neck. Can either be:
203                - A list of three floats [roll, pitch, yaw] representing the RPY orientation (in degrees if `degrees=True`).
204                - A pyQuat object representing a quaternion.
205            duration: The time in seconds for the movement to be completed. Defaults to 2.
206            wait: If True, the function waits until the movement is completed before returning.
207                    Defaults to False.
208            interpolation_mode: The interpolation method to be used. It can be either "minimum_jerk"
209                    or "linear". Defaults to "minimum_jerk".
210            degrees: If True, the RPY values in the `target` argument are treated as degrees.
211                    Defaults to True.
212
213        Raises:
214            TypeError : If the input type for `target` is invalid
215            ValueError: If the `duration` is set to 0.
216
217        Returns:
218            GoToId: The unique identifier for the movement command.
219        """
220        if self.neck is not None and not self.neck.is_on():
221            self._logger.warning("head.neck is off. No command sent.")
222            return GoToId(id=-1)
223
224        self._check_goto_parameters(target, duration)
225
226        if isinstance(target, list):
227            if degrees:
228                target = np.deg2rad(target).tolist()
229            joints_goal = NeckOrientation(
230                rotation=Rotation3d(
231                    rpy=ExtEulerAngles(
232                        roll=FloatValue(value=target[0]),
233                        pitch=FloatValue(value=target[1]),
234                        yaw=FloatValue(value=target[2]),
235                    )
236                )
237            )
238        elif isinstance(target, pyQuat):
239            joints_goal = NeckOrientation(rotation=Rotation3d(q=Quaternion(w=target.w, x=target.x, y=target.y, z=target.z)))
240
241        request = GoToRequest(
242            joints_goal=JointsGoal(
243                neck_joint_goal=NeckJointGoal(
244                    id=self._part_id,
245                    joints_goal=joints_goal,
246                    duration=FloatValue(value=duration),
247                )
248            ),
249            interpolation_mode=get_grpc_interpolation_mode(interpolation_mode),
250        )
251
252        response = self._goto_stub.GoToJoints(request)
253
254        if response.id == -1:
255            if isinstance(target, list):
256                self._logger.error(f"Position {target} was not reachable. No command sent.")
257            elif isinstance(target, pyQuat):
258                self._logger.error(f"Orientation {target} was not reachable. No command sent.")
259        elif wait:
260            self._wait_goto(response, duration)
261        return response
262
263    def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None:
264        """Check the validity of the parameters for the `goto` method.
265
266        Args:
267            duration: The time in seconds for the movement to be completed.
268            target: The target position, either a list of joint positions or a quaternion.
269
270        Raises:
271            TypeError: If the target is not a list or a quaternion.
272            ValueError: If the target list has a length other than 3.
273            ValueError: If the duration is set to 0.
274        """
275        if not (isinstance(target, pyQuat) or isinstance(target, list)):
276            raise TypeError(f"Invalid orientation: must be either a list or a quaternion, got {type(target)}.")
277
278        elif isinstance(target, list) and len(target) != 3:
279            raise ValueError(f"The joints list should be of length 3, got {len(target)}.")
280
281        elif duration == 0:
282            raise ValueError("duration cannot be set to 0.")
283
284    def _goto_single_joint(
285        self,
286        neck_joint: int,
287        goal_position: float,
288        duration: float = 2,
289        wait: bool = False,
290        interpolation_mode: str = "minimum_jerk",
291        degrees: bool = True,
292    ) -> GoToId:
293        """Move a single joint of the neck to a specified goal position.
294
295        Args:
296            neck_joint: The index of the neck joint to move (0 for roll, 1 for pitch, 2 for yaw).
297            goal_position: The target position for the joint.
298            duration: The time in seconds for the joint to reach the goal position. Defaults to 2.
299            wait: Whether to wait for the movement to complete before returning. Defaults to False.
300            interpolation_mode: The interpolation mode for the movement, either "minimum_jerk" or "linear".
301                Defaults to "minimum_jerk".
302            degrees: Whether the goal position is provided in degrees. If True, the position will be converted to radians.
303                Defaults to True.
304
305        Returns:
306            The GoToId associated with the movement command.
307
308        Raises:
309            ValueError: If the duration is set to 0.
310        """
311        if duration == 0:
312            raise ValueError("duration cannot be set to 0.")
313        if degrees:
314            goal_position = np.deg2rad(goal_position)
315        request = GoToRequest(
316            joints_goal=JointsGoal(
317                custom_joint_goal=CustomJointGoal(
318                    id=self._part_id,
319                    neck_joints=CustomNeckJoints(joints=[neck_joint]),
320                    joints_goals=[FloatValue(value=goal_position)],
321                    duration=FloatValue(value=duration),
322                )
323            ),
324            interpolation_mode=get_grpc_interpolation_mode(interpolation_mode),
325        )
326        response = self._goto_stub.GoToJoints(request)
327        if response.id == -1:
328            self._logger.error(f"Position {goal_position} was not reachable. No command sent.")
329        elif wait:
330            self._wait_goto(response, duration)
331        return response
332
333    def look_at(
334        self, x: float, y: float, z: float, duration: float = 2.0, wait: bool = False, interpolation_mode: str = "minimum_jerk"
335    ) -> GoToId:
336        """Compute and send a neck position to look at a specified point in Reachy's Cartesian space (torso frame).
337
338        The (x, y, z) coordinates are expressed in meters, where x is forward, y is left, and z is upward.
339
340        Args:
341            x: The x-coordinate of the target point.
342            y: The y-coordinate of the target point.
343            z: The z-coordinate of the target point.
344            duration: The time in seconds for the head to look at the point. Defaults to 2.0.
345            wait: Whether to wait for the movement to complete before returning. Defaults to False.
346            interpolation_mode: The interpolation mode for the movement, either "minimum_jerk" or "linear".
347                Defaults to "minimum_jerk".
348
349        Returns:
350            The unique GoToId associated with the movement command.
351
352        Raises:
353            ValueError: If the duration is set to 0.
354        """
355        if duration == 0:
356            raise ValueError("duration cannot be set to 0.")
357        if self.neck is not None and self.neck.is_off():
358            self._logger.warning("head.neck is off. No command sent.")
359            return GoToId(id=-1)
360
361        request = GoToRequest(
362            cartesian_goal=CartesianGoal(
363                neck_cartesian_goal=NeckCartesianGoal(
364                    id=self._part_id,
365                    point=Point(x=x, y=y, z=z),
366                    duration=FloatValue(value=duration),
367                )
368            ),
369            interpolation_mode=get_grpc_interpolation_mode(interpolation_mode),
370        )
371        response = self._goto_stub.GoToCartesian(request)
372
373        if response.id == -1:
374            self._logger.error(f"Position {x}, {y}, {z} was not reachable. No command sent.")
375        elif wait:
376            self._wait_goto(response, duration)
377        return response
378
379    def rotate_by(
380        self,
381        roll: float = 0,
382        pitch: float = 0,
383        yaw: float = 0,
384        duration: float = 2,
385        wait: bool = False,
386        degrees: bool = True,
387        frame: str = "robot",
388        interpolation_mode: str = "minimum_jerk",
389    ) -> GoToId:
390        """Rotate the neck by the specified angles.
391
392        Args:
393            roll: The angle in degrees to rotate around the x-axis (roll). Defaults to 0.
394            pitch: The angle in degrees to rotate around the y-axis (pitch). Defaults to 0.
395            yaw: The angle in degrees to rotate around the z-axis (yaw). Defaults to 0.
396            duration: The time in seconds for the neck to reach the target posture. Defaults to 2.
397            wait: Whether to wait for the movement to complete before returning. Defaults to False.
398            degrees: Whether the angles are provided in degrees. If True, the angles will be converted to radians.
399                Defaults to True.
400            frame: The frame of reference for the rotation. Can be either "robot" or "head". Defaults to "robot".
401            interpolation_mode: The interpolation mode for the movement, either "minimum_jerk" or "linear".
402                Defaults to "minimum_jerk".
403
404
405        Raises:
406            ValueError: If the frame is not "robot" or "head".
407            ValueError: If the duration is set to 0.
408            ValueError: If the interpolation mode is not "minimum_jerk" or "linear".
409        """
410        if frame not in ["robot", "head"]:
411            raise ValueError(f"Unknown frame {frame}! Should be 'robot' or 'head'")
412        if not degrees:
413            roll, pitch, yaw = np.rad2deg([roll, pitch, yaw])
414
415        try:
416            goto = self.get_goto_queue()[-1]
417        except IndexError:
418            goto = self.get_goto_playing()
419
420        if goto.id != -1:
421            joints_request = self._get_goto_request(goto)
422        else:
423            joints_request = None
424
425        if joints_request is not None:
426            initial_orientation = joints_request.request.target.joints
427
428            # as there is a 10° offset between the joint space
429            # and the zero position in cartesian space in Reachy's frame for the yaw joint :
430            initial_orientation[1] += 10
431            initial_quaternion = quaternion_from_euler_angles(
432                initial_orientation[0], initial_orientation[1], initial_orientation[2], degrees=True
433            )
434        else:
435            initial_quaternion = self.get_current_orientation()
436
437        additional_quaternion = quaternion_from_euler_angles(roll, pitch, yaw, degrees=True)
438
439        if frame == "head":
440            target_quaternion = initial_quaternion * additional_quaternion
441        elif frame == "robot":
442            target_quaternion = additional_quaternion * initial_quaternion
443
444        return self.goto(target_quaternion, duration, wait, interpolation_mode)
445
446    def goto_posture(
447        self,
448        common_posture: str = "default",
449        duration: float = 2,
450        wait: bool = False,
451        wait_for_goto_end: bool = True,
452        interpolation_mode: str = "minimum_jerk",
453    ) -> GoToId:
454        """Send all neck joints to standard positions within the specified duration.
455
456        The default posture sets the neck joints to [0, -10, 0] (roll, pitch, yaw).
457
458        Args:
459            common_posture: The standard positions to which all joints will be sent.
460                It can be 'default' or 'elbow_90'. Defaults to 'default'.
461            duration: The time in seconds for the neck to reach the target posture. Defaults to 2.
462            wait: Whether to wait for the movement to complete before returning. Defaults to False.
463            wait_for_goto_end: Whether to wait for all previous goto commands to finish before executing
464                the current command. If False, it cancels all ongoing commands. Defaults to True.
465            interpolation_mode: The interpolation mode for the movement, either "minimum_jerk" or "linear".
466                Defaults to "minimum_jerk".
467
468        Returns:
469            The unique GoToId associated with the movement command.
470        """
471        if not wait_for_goto_end:
472            self.cancel_all_goto()
473        if self.l_antenna is not None and self.l_antenna.is_on():
474            self.l_antenna.goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)
475        if self.r_antenna is not None and self.r_antenna.is_on():
476            self.r_antenna.goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)
477        if self.neck is not None and self.neck.is_on():
478            return self.goto([0, -10, 0], duration, wait, interpolation_mode)
479        else:
480            self._logger.warning("Head is off. No command sent.")
481        return GoToId(id=-1)
482
483    def _get_goal_positions_message(self) -> HeadComponentsCommands:
484        """Get the HeadComponentsCommands message to send the goal positions to the actuator."""
485        commands = {}
486        for actuator_name, actuator in self._actuators.items():
487            actuator_command = actuator._get_goal_positions_message()
488            if actuator_command is not None:
489                commands[f"{actuator_name}_command"] = actuator_command
490        return HeadComponentsCommands(**commands)
491
492    def _clean_outgoing_goal_positions(self) -> None:
493        """Clean the outgoing goal positions."""
494        for actuator in self._actuators.values():
495            actuator._clean_outgoing_goal_positions()
496
497    def _post_send_goal_positions(self) -> None:
498        """Monitor the joint positions to check if they reach the specified goals."""
499        for actuator in self._actuators.values():
500            actuator._post_send_goal_positions()
501
502    def _update_with(self, new_state: HeadState) -> None:
503        """Update the head with a newly received (partial) state from the gRPC server.
504
505        Args:
506            new_state: A HeadState object representing the new state of the head's actuators.
507        """
508        for actuator_name, actuator in self._actuators.items():
509            actuator._update_with(getattr(new_state, f"{actuator_name}_state"))
510
511    def _update_audit_status(self, new_status: HeadStatus) -> None:
512        """Update the audit status of the neck with the new status from the gRPC server.
513
514        Args:
515            new_status: A HeadStatus object representing the new status of the neck.
516        """
517        for actuator_name, actuator in self._actuators.items():
518            actuator._update_audit_status(getattr(new_status, f"{actuator_name}_status"))