reachy2_sdk.utils.goto_based_element

Reachy IGoToBasedElement interface.

Handles common interface for elements (parts or components) performing movements using goto mechanism.

  1"""Reachy IGoToBasedElement interface.
  2
  3Handles common interface for elements (parts or components) performing movements using goto mechanism.
  4"""
  5
  6import logging
  7import time
  8from abc import ABC, abstractmethod
  9from typing import Any, Dict, List, Optional
 10
 11import numpy as np
 12from reachy2_sdk_api.component_pb2 import ComponentId
 13from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToId, GoToRequest
 14from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
 15from reachy2_sdk_api.part_pb2 import PartId
 16
 17from ..utils.utils import (
 18    EllipticalParameters,
 19    JointsRequest,
 20    OdometryRequest,
 21    SimplifiedRequest,
 22    TargetJointsRequest,
 23    arm_position_to_list,
 24    ext_euler_angles_to_list,
 25    get_arc_direction,
 26    get_interpolation_mode,
 27    get_interpolation_space,
 28)
 29
 30
 31class IGoToBasedElement(ABC):
 32    """Interface for elements (parts or components) of Reachy that use goto functions.
 33
 34    The `IGoToBasedElements` class defines a common interface for handling goto-based movements. It is
 35    designed to be implemented by any parts or components of the robot that perform movements via the goto mechanism.
 36    """
 37
 38    def __init__(
 39        self,
 40        element_id: ComponentId | PartId,
 41        goto_stub: GoToServiceStub,
 42    ) -> None:
 43        """Initialize the IGoToBasedElement interface.
 44
 45        Sets up the common attributes needed for handling goto-based movements. This includes
 46        associating the component with the interface and setting up the gRPC stub for performing
 47        goto commands.
 48
 49        Args:
 50            element_id: The robot component or part that uses this interface.
 51            goto_stub: The gRPC stub used to send goto commands to the robot element.
 52        """
 53        self._element_id = element_id
 54        self._goto_stub = goto_stub
 55        self._logger_goto = logging.getLogger(__name__)  # not using self._logger to avoid name conflict in multiple inheritance
 56
 57    @abstractmethod
 58    def get_goto_playing(self) -> GoToId:
 59        """Return the GoToId of the currently playing goto movement on a specific element."""
 60        pass
 61
 62    @abstractmethod
 63    def get_goto_queue(self) -> List[GoToId]:
 64        """Return a list of all GoToIds waiting to be played on a specific element."""
 65        pass
 66
 67    @abstractmethod
 68    def cancel_all_goto(self) -> GoToAck:
 69        """Request the cancellation of all playing and waiting goto commands for a specific element.
 70
 71        Returns:
 72            A GoToAck acknowledging the cancellation of all goto commands.
 73        """
 74        pass
 75
 76    def _get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]:
 77        """Retrieve the details of a goto command based on its GoToId.
 78
 79        Args:
 80            goto_id: The ID of the goto command for which details are requested.
 81
 82        Returns:
 83            A `SimplifiedRequest` object containing the element name, joint goal positions
 84            (in degrees), movement duration, and interpolation mode.
 85            Returns `None` if the robot is not connected or if the `goto_id` is invalid.
 86
 87        Raises:
 88            TypeError: If `goto_id` is not an instance of `GoToId`.
 89            ValueError: If `goto_id` is -1, indicating an invalid command.
 90        """
 91        if not isinstance(goto_id, GoToId):
 92            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
 93        if goto_id.id == -1:
 94            raise ValueError("No answer was found for given move, goto_id is -1")
 95
 96        response = self._goto_stub.GetGoToRequest(goto_id)
 97
 98        full_request = process_goto_request(response)
 99
100        return full_request
101
102    def _is_goto_finished(self, id: GoToId) -> bool:
103        """Check if the goto movement has been completed or cancelled.
104
105        Returns:
106           `True` if the goto has been played or cancelled, `False` otherwise.
107        """
108        state = self._goto_stub.GetGoToState(id)
109        result = bool(
110            state.goal_status == GoalStatus.STATUS_ABORTED
111            or state.goal_status == GoalStatus.STATUS_CANCELED
112            or state.goal_status == GoalStatus.STATUS_SUCCEEDED
113        )
114        return result
115
116    def _wait_goto(self, id: GoToId, duration: float) -> None:
117        """Wait for a goto to finish. timeout is in seconds."""
118        self._logger_goto.info(f"Waiting for movement with {id}.")
119
120        if not self._is_goto_already_over(id, duration):
121            info_gotos = [self._get_goto_request(id)]
122            ids_queue = self.get_goto_queue()
123            for goto_id in ids_queue:
124                info_gotos.append(self._get_goto_request(goto_id))
125
126            timeout = 1  # adding one more sec
127            for igoto in info_gotos:
128                if igoto is not None:
129                    if type(igoto.request) is JointsRequest:
130                        timeout += igoto.request.duration
131                    elif type(igoto.request) is OdometryRequest:
132                        timeout += igoto.request.timeout
133
134            self._logger_goto.debug(f"timeout is set to {timeout}")
135
136            t_start = time.time()  # timeout for others
137            while not self._is_goto_finished(id):
138                time.sleep(0.1)
139
140                if time.time() - t_start > timeout:
141                    self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.")
142                    return
143
144            self._logger_goto.info(f"Movement with {id} finished.")
145
146    def _is_goto_already_over(self, id: GoToId, timeout: float) -> bool:
147        """Check if the goto movement is already over."""
148        t0 = time.time()
149        id_playing = self.get_goto_playing()
150        while id_playing.id == -1:
151            time.sleep(0.005)
152            id_playing = self.get_goto_playing()
153
154            if self._is_goto_finished(id):
155                return True
156
157            # manage an id_playing staying at -1
158            if time.time() - t0 > timeout:
159                self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.")
160                return True
161        return False
162
163    @abstractmethod
164    def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None:
165        """Check the validity of the parameters for a goto movement."""
166        pass  # pragma: no cover
167
168    @abstractmethod
169    def goto_posture(
170        self,
171        common_posture: str = "default",
172        duration: float = 2,
173        wait: bool = False,
174        wait_for_goto_end: bool = True,
175        interpolation_mode: str = "minimum_jerk",
176    ) -> GoToId:
177        """Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode."""
178        pass  # pragma: no cover
179
180
181def process_goto_request(response: GoToRequest) -> Optional[SimplifiedRequest]:
182    """Process the response from a goto request and return a SimplifiedRequest object."""
183    if response.HasField("cartesian_goal"):
184        request_dict = process_arm_cartesian_goal(response)
185        return build_simplified_joints_request(request_dict)
186
187    elif response.HasField("joints_goal"):
188        match response.joints_goal.WhichOneof("joints_goal"):
189            case "arm_joint_goal":
190                request_dict = process_arm_joint_goal(response)
191            case "neck_joint_goal":
192                request_dict = process_neck_joint_goal(response)
193            case "antenna_joint_goal":
194                request_dict = process_antenna_joint_goal(response)
195            case "hand_joint_goal":
196                request_dict = process_hand_joint_goal(response)
197            case _:
198                raise ValueError("No valid joint goal found in the response")
199        return build_simplified_joints_request(request_dict)
200
201    elif response.HasField("odometry_goal"):
202        request_dict = process_odometry_goal(response)
203        return build_simplified_odometry_request(request_dict)
204
205    else:
206        raise ValueError("No valid request found in the response")
207
208
209def process_arm_cartesian_goal(response: GoToRequest) -> Dict[str, Any]:
210    """Process the response from a goto request containing an arm cartesian goal."""
211    request_dict = {}
212    request_dict["part"] = response.cartesian_goal.arm_cartesian_goal.id.name
213    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
214    request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space)
215    request_dict["duration"] = response.cartesian_goal.arm_cartesian_goal.duration.value
216    request_dict["target_pose"] = np.reshape(response.cartesian_goal.arm_cartesian_goal.goal_pose.data, (4, 4))
217    if request_dict["mode"] == "elliptical":
218        arc_direction = get_arc_direction(response.elliptical_parameters.arc_direction)
219        secondary_radius = response.elliptical_parameters.secondary_radius.value
220        request_dict["elliptical_params"] = EllipticalParameters(arc_direction, secondary_radius)
221    else:
222        request_dict["elliptical_params"] = None
223    return request_dict
224
225
226def process_arm_joint_goal(response: GoToRequest) -> Dict[str, Any]:
227    """Process the response from a goto request containing an arm joint goal."""
228    request_dict = {}
229    request_dict["part"] = response.joints_goal.arm_joint_goal.id.name
230    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
231    request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space)
232    request_dict["duration"] = response.joints_goal.arm_joint_goal.duration.value
233    request_dict["target_joints"] = arm_position_to_list(response.joints_goal.arm_joint_goal.joints_goal, degrees=True)
234    request_dict["elliptical_params"] = None
235    return request_dict
236
237
238def process_neck_joint_goal(response: GoToRequest) -> Dict[str, Any]:
239    """Process the response from a goto request containing a neck joint goal."""
240    request_dict = {}
241    request_dict["part"] = response.joints_goal.neck_joint_goal.id.name
242    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
243    request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space)
244    request_dict["duration"] = response.joints_goal.neck_joint_goal.duration.value
245    request_dict["target_joints"] = ext_euler_angles_to_list(
246        response.joints_goal.neck_joint_goal.joints_goal.rotation.rpy, degrees=True
247    )
248    request_dict["elliptical_params"] = None
249    return request_dict
250
251
252def process_antenna_joint_goal(response: GoToRequest) -> Dict[str, Any]:
253    """Process the response from a goto request containing an antenna joint goal."""
254    request_dict = {}
255    request_dict["part"] = response.joints_goal.antenna_joint_goal.antenna.id.name
256    if request_dict["part"] == "antenna_right":
257        request_dict["part"] = "r_antenna"
258    elif request_dict["part"] == "antenna_left":
259        request_dict["part"] = "l_antenna"
260    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
261    request_dict["interpolation_space"] = "joint_space"
262    request_dict["duration"] = response.joints_goal.antenna_joint_goal.duration.value
263    request_dict["target_joints"] = np.rad2deg(response.joints_goal.antenna_joint_goal.joint_goal.value)
264    request_dict["elliptical_params"] = None
265    return request_dict
266
267
268def process_hand_joint_goal(response: GoToRequest) -> Dict[str, Any]:
269    """Process the response from a goto request containing a hand joint goal."""
270    request_dict = {}
271    request_dict["part"] = response.joints_goal.hand_joint_goal.goal_request.id.name
272    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
273    request_dict["interpolation_space"] = "joint_space"
274    request_dict["duration"] = response.joints_goal.hand_joint_goal.duration.value
275    request_dict["target_joints"] = np.rad2deg(
276        response.joints_goal.hand_joint_goal.goal_request.position.parallel_gripper.position.value
277    )
278    request_dict["elliptical_params"] = None
279    return request_dict
280
281
282def process_odometry_goal(response: GoToRequest) -> Dict[str, Any]:
283    """Process the response from a goto request containing an odometry goal."""
284    request_dict = {}
285    request_dict["part"] = response.odometry_goal.odometry_goal.id.name
286    request_dict["timeout"] = response.odometry_goal.timeout.value
287    request_dict["distance_tolerance"] = response.odometry_goal.distance_tolerance.value
288    request_dict["angle_tolerance"] = np.rad2deg(response.odometry_goal.angle_tolerance.value)
289    request_dict["target"] = {
290        "x": response.odometry_goal.odometry_goal.direction.x.value,
291        "y": response.odometry_goal.odometry_goal.direction.y.value,
292        "theta": np.rad2deg(response.odometry_goal.odometry_goal.direction.theta.value),
293    }
294    return request_dict
295
296
297def build_simplified_joints_request(request_dict: Dict[str, Any]) -> SimplifiedRequest:
298    """Build a SimplifiedRequest object from a dictionary of joints request details."""
299    target = TargetJointsRequest(
300        joints=request_dict.get("target_joints", None),
301        pose=request_dict.get("target_pose", None),
302    )
303
304    joints_request = JointsRequest(
305        target=target,
306        duration=request_dict["duration"],
307        mode=request_dict["mode"],
308        interpolation_space=request_dict["interpolation_space"],
309        elliptical_parameters=request_dict["elliptical_params"],
310    )
311
312    full_request = SimplifiedRequest(
313        part=request_dict["part"],
314        request=joints_request,
315    )
316
317    return full_request
318
319
320def build_simplified_odometry_request(request_dict: Dict[str, Any]) -> SimplifiedRequest:
321    """Build a SimplifiedRequest object from a dictionary of odomztry request details."""
322    odometry_request = OdometryRequest(
323        target=request_dict["target"],
324        timeout=request_dict["timeout"],
325        distance_tolerance=request_dict["distance_tolerance"],
326        angle_tolerance=request_dict["angle_tolerance"],
327    )
328
329    full_request = SimplifiedRequest(
330        part=request_dict["part"],
331        request=odometry_request,
332    )
333
334    return full_request
class IGoToBasedElement(abc.ABC):
 32class IGoToBasedElement(ABC):
 33    """Interface for elements (parts or components) of Reachy that use goto functions.
 34
 35    The `IGoToBasedElements` class defines a common interface for handling goto-based movements. It is
 36    designed to be implemented by any parts or components of the robot that perform movements via the goto mechanism.
 37    """
 38
 39    def __init__(
 40        self,
 41        element_id: ComponentId | PartId,
 42        goto_stub: GoToServiceStub,
 43    ) -> None:
 44        """Initialize the IGoToBasedElement interface.
 45
 46        Sets up the common attributes needed for handling goto-based movements. This includes
 47        associating the component with the interface and setting up the gRPC stub for performing
 48        goto commands.
 49
 50        Args:
 51            element_id: The robot component or part that uses this interface.
 52            goto_stub: The gRPC stub used to send goto commands to the robot element.
 53        """
 54        self._element_id = element_id
 55        self._goto_stub = goto_stub
 56        self._logger_goto = logging.getLogger(__name__)  # not using self._logger to avoid name conflict in multiple inheritance
 57
 58    @abstractmethod
 59    def get_goto_playing(self) -> GoToId:
 60        """Return the GoToId of the currently playing goto movement on a specific element."""
 61        pass
 62
 63    @abstractmethod
 64    def get_goto_queue(self) -> List[GoToId]:
 65        """Return a list of all GoToIds waiting to be played on a specific element."""
 66        pass
 67
 68    @abstractmethod
 69    def cancel_all_goto(self) -> GoToAck:
 70        """Request the cancellation of all playing and waiting goto commands for a specific element.
 71
 72        Returns:
 73            A GoToAck acknowledging the cancellation of all goto commands.
 74        """
 75        pass
 76
 77    def _get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]:
 78        """Retrieve the details of a goto command based on its GoToId.
 79
 80        Args:
 81            goto_id: The ID of the goto command for which details are requested.
 82
 83        Returns:
 84            A `SimplifiedRequest` object containing the element name, joint goal positions
 85            (in degrees), movement duration, and interpolation mode.
 86            Returns `None` if the robot is not connected or if the `goto_id` is invalid.
 87
 88        Raises:
 89            TypeError: If `goto_id` is not an instance of `GoToId`.
 90            ValueError: If `goto_id` is -1, indicating an invalid command.
 91        """
 92        if not isinstance(goto_id, GoToId):
 93            raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}")
 94        if goto_id.id == -1:
 95            raise ValueError("No answer was found for given move, goto_id is -1")
 96
 97        response = self._goto_stub.GetGoToRequest(goto_id)
 98
 99        full_request = process_goto_request(response)
100
101        return full_request
102
103    def _is_goto_finished(self, id: GoToId) -> bool:
104        """Check if the goto movement has been completed or cancelled.
105
106        Returns:
107           `True` if the goto has been played or cancelled, `False` otherwise.
108        """
109        state = self._goto_stub.GetGoToState(id)
110        result = bool(
111            state.goal_status == GoalStatus.STATUS_ABORTED
112            or state.goal_status == GoalStatus.STATUS_CANCELED
113            or state.goal_status == GoalStatus.STATUS_SUCCEEDED
114        )
115        return result
116
117    def _wait_goto(self, id: GoToId, duration: float) -> None:
118        """Wait for a goto to finish. timeout is in seconds."""
119        self._logger_goto.info(f"Waiting for movement with {id}.")
120
121        if not self._is_goto_already_over(id, duration):
122            info_gotos = [self._get_goto_request(id)]
123            ids_queue = self.get_goto_queue()
124            for goto_id in ids_queue:
125                info_gotos.append(self._get_goto_request(goto_id))
126
127            timeout = 1  # adding one more sec
128            for igoto in info_gotos:
129                if igoto is not None:
130                    if type(igoto.request) is JointsRequest:
131                        timeout += igoto.request.duration
132                    elif type(igoto.request) is OdometryRequest:
133                        timeout += igoto.request.timeout
134
135            self._logger_goto.debug(f"timeout is set to {timeout}")
136
137            t_start = time.time()  # timeout for others
138            while not self._is_goto_finished(id):
139                time.sleep(0.1)
140
141                if time.time() - t_start > timeout:
142                    self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.")
143                    return
144
145            self._logger_goto.info(f"Movement with {id} finished.")
146
147    def _is_goto_already_over(self, id: GoToId, timeout: float) -> bool:
148        """Check if the goto movement is already over."""
149        t0 = time.time()
150        id_playing = self.get_goto_playing()
151        while id_playing.id == -1:
152            time.sleep(0.005)
153            id_playing = self.get_goto_playing()
154
155            if self._is_goto_finished(id):
156                return True
157
158            # manage an id_playing staying at -1
159            if time.time() - t0 > timeout:
160                self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.")
161                return True
162        return False
163
164    @abstractmethod
165    def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None:
166        """Check the validity of the parameters for a goto movement."""
167        pass  # pragma: no cover
168
169    @abstractmethod
170    def goto_posture(
171        self,
172        common_posture: str = "default",
173        duration: float = 2,
174        wait: bool = False,
175        wait_for_goto_end: bool = True,
176        interpolation_mode: str = "minimum_jerk",
177    ) -> GoToId:
178        """Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode."""
179        pass  # pragma: no cover

Interface for elements (parts or components) of Reachy that use goto functions.

The IGoToBasedElements class defines a common interface for handling goto-based movements. It is designed to be implemented by any parts or components of the robot that perform movements via the goto mechanism.

IGoToBasedElement( element_id: component_pb2.ComponentId | part_pb2.PartId, goto_stub: reachy2_sdk_api.goto_pb2_grpc.GoToServiceStub)
39    def __init__(
40        self,
41        element_id: ComponentId | PartId,
42        goto_stub: GoToServiceStub,
43    ) -> None:
44        """Initialize the IGoToBasedElement interface.
45
46        Sets up the common attributes needed for handling goto-based movements. This includes
47        associating the component with the interface and setting up the gRPC stub for performing
48        goto commands.
49
50        Args:
51            element_id: The robot component or part that uses this interface.
52            goto_stub: The gRPC stub used to send goto commands to the robot element.
53        """
54        self._element_id = element_id
55        self._goto_stub = goto_stub
56        self._logger_goto = logging.getLogger(__name__)  # not using self._logger to avoid name conflict in multiple inheritance

Initialize the IGoToBasedElement interface.

Sets up the common attributes needed for handling goto-based movements. This includes associating the component with the interface and setting up the gRPC stub for performing goto commands.

Arguments:
  • element_id: The robot component or part that uses this interface.
  • goto_stub: The gRPC stub used to send goto commands to the robot element.
@abstractmethod
def get_goto_playing(self) -> goto_pb2.GoToId:
58    @abstractmethod
59    def get_goto_playing(self) -> GoToId:
60        """Return the GoToId of the currently playing goto movement on a specific element."""
61        pass

Return the GoToId of the currently playing goto movement on a specific element.

@abstractmethod
def get_goto_queue(self) -> List[goto_pb2.GoToId]:
63    @abstractmethod
64    def get_goto_queue(self) -> List[GoToId]:
65        """Return a list of all GoToIds waiting to be played on a specific element."""
66        pass

Return a list of all GoToIds waiting to be played on a specific element.

@abstractmethod
def cancel_all_goto(self) -> goto_pb2.GoToAck:
68    @abstractmethod
69    def cancel_all_goto(self) -> GoToAck:
70        """Request the cancellation of all playing and waiting goto commands for a specific element.
71
72        Returns:
73            A GoToAck acknowledging the cancellation of all goto commands.
74        """
75        pass

Request the cancellation of all playing and waiting goto commands for a specific element.

Returns:

A GoToAck acknowledging the cancellation of all goto commands.

@abstractmethod
def goto_posture( self, common_posture: str = 'default', duration: float = 2, wait: bool = False, wait_for_goto_end: bool = True, interpolation_mode: str = 'minimum_jerk') -> goto_pb2.GoToId:
169    @abstractmethod
170    def goto_posture(
171        self,
172        common_posture: str = "default",
173        duration: float = 2,
174        wait: bool = False,
175        wait_for_goto_end: bool = True,
176        interpolation_mode: str = "minimum_jerk",
177    ) -> GoToId:
178        """Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode."""
179        pass  # pragma: no cover

Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode.

def process_goto_request( response: goto_pb2.GoToRequest) -> Optional[reachy2_sdk.utils.utils.SimplifiedRequest]:
182def process_goto_request(response: GoToRequest) -> Optional[SimplifiedRequest]:
183    """Process the response from a goto request and return a SimplifiedRequest object."""
184    if response.HasField("cartesian_goal"):
185        request_dict = process_arm_cartesian_goal(response)
186        return build_simplified_joints_request(request_dict)
187
188    elif response.HasField("joints_goal"):
189        match response.joints_goal.WhichOneof("joints_goal"):
190            case "arm_joint_goal":
191                request_dict = process_arm_joint_goal(response)
192            case "neck_joint_goal":
193                request_dict = process_neck_joint_goal(response)
194            case "antenna_joint_goal":
195                request_dict = process_antenna_joint_goal(response)
196            case "hand_joint_goal":
197                request_dict = process_hand_joint_goal(response)
198            case _:
199                raise ValueError("No valid joint goal found in the response")
200        return build_simplified_joints_request(request_dict)
201
202    elif response.HasField("odometry_goal"):
203        request_dict = process_odometry_goal(response)
204        return build_simplified_odometry_request(request_dict)
205
206    else:
207        raise ValueError("No valid request found in the response")

Process the response from a goto request and return a SimplifiedRequest object.

def process_arm_cartesian_goal(response: goto_pb2.GoToRequest) -> Dict[str, Any]:
210def process_arm_cartesian_goal(response: GoToRequest) -> Dict[str, Any]:
211    """Process the response from a goto request containing an arm cartesian goal."""
212    request_dict = {}
213    request_dict["part"] = response.cartesian_goal.arm_cartesian_goal.id.name
214    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
215    request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space)
216    request_dict["duration"] = response.cartesian_goal.arm_cartesian_goal.duration.value
217    request_dict["target_pose"] = np.reshape(response.cartesian_goal.arm_cartesian_goal.goal_pose.data, (4, 4))
218    if request_dict["mode"] == "elliptical":
219        arc_direction = get_arc_direction(response.elliptical_parameters.arc_direction)
220        secondary_radius = response.elliptical_parameters.secondary_radius.value
221        request_dict["elliptical_params"] = EllipticalParameters(arc_direction, secondary_radius)
222    else:
223        request_dict["elliptical_params"] = None
224    return request_dict

Process the response from a goto request containing an arm cartesian goal.

def process_arm_joint_goal(response: goto_pb2.GoToRequest) -> Dict[str, Any]:
227def process_arm_joint_goal(response: GoToRequest) -> Dict[str, Any]:
228    """Process the response from a goto request containing an arm joint goal."""
229    request_dict = {}
230    request_dict["part"] = response.joints_goal.arm_joint_goal.id.name
231    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
232    request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space)
233    request_dict["duration"] = response.joints_goal.arm_joint_goal.duration.value
234    request_dict["target_joints"] = arm_position_to_list(response.joints_goal.arm_joint_goal.joints_goal, degrees=True)
235    request_dict["elliptical_params"] = None
236    return request_dict

Process the response from a goto request containing an arm joint goal.

def process_neck_joint_goal(response: goto_pb2.GoToRequest) -> Dict[str, Any]:
239def process_neck_joint_goal(response: GoToRequest) -> Dict[str, Any]:
240    """Process the response from a goto request containing a neck joint goal."""
241    request_dict = {}
242    request_dict["part"] = response.joints_goal.neck_joint_goal.id.name
243    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
244    request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space)
245    request_dict["duration"] = response.joints_goal.neck_joint_goal.duration.value
246    request_dict["target_joints"] = ext_euler_angles_to_list(
247        response.joints_goal.neck_joint_goal.joints_goal.rotation.rpy, degrees=True
248    )
249    request_dict["elliptical_params"] = None
250    return request_dict

Process the response from a goto request containing a neck joint goal.

def process_antenna_joint_goal(response: goto_pb2.GoToRequest) -> Dict[str, Any]:
253def process_antenna_joint_goal(response: GoToRequest) -> Dict[str, Any]:
254    """Process the response from a goto request containing an antenna joint goal."""
255    request_dict = {}
256    request_dict["part"] = response.joints_goal.antenna_joint_goal.antenna.id.name
257    if request_dict["part"] == "antenna_right":
258        request_dict["part"] = "r_antenna"
259    elif request_dict["part"] == "antenna_left":
260        request_dict["part"] = "l_antenna"
261    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
262    request_dict["interpolation_space"] = "joint_space"
263    request_dict["duration"] = response.joints_goal.antenna_joint_goal.duration.value
264    request_dict["target_joints"] = np.rad2deg(response.joints_goal.antenna_joint_goal.joint_goal.value)
265    request_dict["elliptical_params"] = None
266    return request_dict

Process the response from a goto request containing an antenna joint goal.

def process_hand_joint_goal(response: goto_pb2.GoToRequest) -> Dict[str, Any]:
269def process_hand_joint_goal(response: GoToRequest) -> Dict[str, Any]:
270    """Process the response from a goto request containing a hand joint goal."""
271    request_dict = {}
272    request_dict["part"] = response.joints_goal.hand_joint_goal.goal_request.id.name
273    request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type)
274    request_dict["interpolation_space"] = "joint_space"
275    request_dict["duration"] = response.joints_goal.hand_joint_goal.duration.value
276    request_dict["target_joints"] = np.rad2deg(
277        response.joints_goal.hand_joint_goal.goal_request.position.parallel_gripper.position.value
278    )
279    request_dict["elliptical_params"] = None
280    return request_dict

Process the response from a goto request containing a hand joint goal.

def process_odometry_goal(response: goto_pb2.GoToRequest) -> Dict[str, Any]:
283def process_odometry_goal(response: GoToRequest) -> Dict[str, Any]:
284    """Process the response from a goto request containing an odometry goal."""
285    request_dict = {}
286    request_dict["part"] = response.odometry_goal.odometry_goal.id.name
287    request_dict["timeout"] = response.odometry_goal.timeout.value
288    request_dict["distance_tolerance"] = response.odometry_goal.distance_tolerance.value
289    request_dict["angle_tolerance"] = np.rad2deg(response.odometry_goal.angle_tolerance.value)
290    request_dict["target"] = {
291        "x": response.odometry_goal.odometry_goal.direction.x.value,
292        "y": response.odometry_goal.odometry_goal.direction.y.value,
293        "theta": np.rad2deg(response.odometry_goal.odometry_goal.direction.theta.value),
294    }
295    return request_dict

Process the response from a goto request containing an odometry goal.

def build_simplified_joints_request( request_dict: Dict[str, Any]) -> reachy2_sdk.utils.utils.SimplifiedRequest:
298def build_simplified_joints_request(request_dict: Dict[str, Any]) -> SimplifiedRequest:
299    """Build a SimplifiedRequest object from a dictionary of joints request details."""
300    target = TargetJointsRequest(
301        joints=request_dict.get("target_joints", None),
302        pose=request_dict.get("target_pose", None),
303    )
304
305    joints_request = JointsRequest(
306        target=target,
307        duration=request_dict["duration"],
308        mode=request_dict["mode"],
309        interpolation_space=request_dict["interpolation_space"],
310        elliptical_parameters=request_dict["elliptical_params"],
311    )
312
313    full_request = SimplifiedRequest(
314        part=request_dict["part"],
315        request=joints_request,
316    )
317
318    return full_request

Build a SimplifiedRequest object from a dictionary of joints request details.

def build_simplified_odometry_request( request_dict: Dict[str, Any]) -> reachy2_sdk.utils.utils.SimplifiedRequest:
321def build_simplified_odometry_request(request_dict: Dict[str, Any]) -> SimplifiedRequest:
322    """Build a SimplifiedRequest object from a dictionary of odomztry request details."""
323    odometry_request = OdometryRequest(
324        target=request_dict["target"],
325        timeout=request_dict["timeout"],
326        distance_tolerance=request_dict["distance_tolerance"],
327        angle_tolerance=request_dict["angle_tolerance"],
328    )
329
330    full_request = SimplifiedRequest(
331        part=request_dict["part"],
332        request=odometry_request,
333    )
334
335    return full_request

Build a SimplifiedRequest object from a dictionary of odomztry request details.