reachy2_sdk.utils.goto_based_element
Reachy IGoToBasedElement interface.
Handles common interface for elements (parts or components) performing movements using goto mechanism.
1"""Reachy IGoToBasedElement interface. 2 3Handles common interface for elements (parts or components) performing movements using goto mechanism. 4""" 5 6import logging 7import time 8from abc import ABC, abstractmethod 9from typing import Any, Dict, List, Optional 10 11import numpy as np 12from reachy2_sdk_api.component_pb2 import ComponentId 13from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToId, GoToRequest 14from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub 15from reachy2_sdk_api.part_pb2 import PartId 16 17from ..utils.utils import ( 18 EllipticalParameters, 19 JointsRequest, 20 OdometryRequest, 21 SimplifiedRequest, 22 TargetJointsRequest, 23 arm_position_to_list, 24 ext_euler_angles_to_list, 25 get_arc_direction, 26 get_interpolation_mode, 27 get_interpolation_space, 28) 29 30 31class IGoToBasedElement(ABC): 32 """Interface for elements (parts or components) of Reachy that use goto functions. 33 34 The `IGoToBasedElements` class defines a common interface for handling goto-based movements. It is 35 designed to be implemented by any parts or components of the robot that perform movements via the goto mechanism. 36 """ 37 38 def __init__( 39 self, 40 element_id: ComponentId | PartId, 41 goto_stub: GoToServiceStub, 42 ) -> None: 43 """Initialize the IGoToBasedElement interface. 44 45 Sets up the common attributes needed for handling goto-based movements. This includes 46 associating the component with the interface and setting up the gRPC stub for performing 47 goto commands. 48 49 Args: 50 element_id: The robot component or part that uses this interface. 51 goto_stub: The gRPC stub used to send goto commands to the robot element. 52 """ 53 self._element_id = element_id 54 self._goto_stub = goto_stub 55 self._logger_goto = logging.getLogger(__name__) # not using self._logger to avoid name conflict in multiple inheritance 56 57 @abstractmethod 58 def get_goto_playing(self) -> GoToId: 59 """Return the GoToId of the currently playing goto movement on a specific element.""" 60 pass 61 62 @abstractmethod 63 def get_goto_queue(self) -> List[GoToId]: 64 """Return a list of all GoToIds waiting to be played on a specific element.""" 65 pass 66 67 @abstractmethod 68 def cancel_all_goto(self) -> GoToAck: 69 """Request the cancellation of all playing and waiting goto commands for a specific element. 70 71 Returns: 72 A GoToAck acknowledging the cancellation of all goto commands. 73 """ 74 pass 75 76 def _get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]: 77 """Retrieve the details of a goto command based on its GoToId. 78 79 Args: 80 goto_id: The ID of the goto command for which details are requested. 81 82 Returns: 83 A `SimplifiedRequest` object containing the element name, joint goal positions 84 (in degrees), movement duration, and interpolation mode. 85 Returns `None` if the robot is not connected or if the `goto_id` is invalid. 86 87 Raises: 88 TypeError: If `goto_id` is not an instance of `GoToId`. 89 ValueError: If `goto_id` is -1, indicating an invalid command. 90 """ 91 if not isinstance(goto_id, GoToId): 92 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 93 if goto_id.id == -1: 94 raise ValueError("No answer was found for given move, goto_id is -1") 95 96 response = self._goto_stub.GetGoToRequest(goto_id) 97 98 full_request = process_goto_request(response) 99 100 return full_request 101 102 def _is_goto_finished(self, id: GoToId) -> bool: 103 """Check if the goto movement has been completed or cancelled. 104 105 Returns: 106 `True` if the goto has been played or cancelled, `False` otherwise. 107 """ 108 state = self._goto_stub.GetGoToState(id) 109 result = bool( 110 state.goal_status == GoalStatus.STATUS_ABORTED 111 or state.goal_status == GoalStatus.STATUS_CANCELED 112 or state.goal_status == GoalStatus.STATUS_SUCCEEDED 113 ) 114 return result 115 116 def _wait_goto(self, id: GoToId, duration: float) -> None: 117 """Wait for a goto to finish. timeout is in seconds.""" 118 self._logger_goto.info(f"Waiting for movement with {id}.") 119 120 if not self._is_goto_already_over(id, duration): 121 info_gotos = [self._get_goto_request(id)] 122 ids_queue = self.get_goto_queue() 123 for goto_id in ids_queue: 124 info_gotos.append(self._get_goto_request(goto_id)) 125 126 timeout = 1 # adding one more sec 127 for igoto in info_gotos: 128 if igoto is not None: 129 if type(igoto.request) is JointsRequest: 130 timeout += igoto.request.duration 131 elif type(igoto.request) is OdometryRequest: 132 timeout += igoto.request.timeout 133 134 self._logger_goto.debug(f"timeout is set to {timeout}") 135 136 t_start = time.time() # timeout for others 137 while not self._is_goto_finished(id): 138 time.sleep(0.1) 139 140 if time.time() - t_start > timeout: 141 self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.") 142 return 143 144 self._logger_goto.info(f"Movement with {id} finished.") 145 146 def _is_goto_already_over(self, id: GoToId, timeout: float) -> bool: 147 """Check if the goto movement is already over.""" 148 t0 = time.time() 149 id_playing = self.get_goto_playing() 150 while id_playing.id == -1: 151 time.sleep(0.005) 152 id_playing = self.get_goto_playing() 153 154 if self._is_goto_finished(id): 155 return True 156 157 # manage an id_playing staying at -1 158 if time.time() - t0 > timeout: 159 self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.") 160 return True 161 return False 162 163 @abstractmethod 164 def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None: 165 """Check the validity of the parameters for a goto movement.""" 166 pass # pragma: no cover 167 168 @abstractmethod 169 def goto_posture( 170 self, 171 common_posture: str = "default", 172 duration: float = 2, 173 wait: bool = False, 174 wait_for_goto_end: bool = True, 175 interpolation_mode: str = "minimum_jerk", 176 ) -> GoToId: 177 """Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode.""" 178 pass # pragma: no cover 179 180 181def process_goto_request(response: GoToRequest) -> Optional[SimplifiedRequest]: 182 """Process the response from a goto request and return a SimplifiedRequest object.""" 183 if response.HasField("cartesian_goal"): 184 request_dict = process_arm_cartesian_goal(response) 185 return build_simplified_joints_request(request_dict) 186 187 elif response.HasField("joints_goal"): 188 match response.joints_goal.WhichOneof("joints_goal"): 189 case "arm_joint_goal": 190 request_dict = process_arm_joint_goal(response) 191 case "neck_joint_goal": 192 request_dict = process_neck_joint_goal(response) 193 case "antenna_joint_goal": 194 request_dict = process_antenna_joint_goal(response) 195 case "hand_joint_goal": 196 request_dict = process_hand_joint_goal(response) 197 case _: 198 raise ValueError("No valid joint goal found in the response") 199 return build_simplified_joints_request(request_dict) 200 201 elif response.HasField("odometry_goal"): 202 request_dict = process_odometry_goal(response) 203 return build_simplified_odometry_request(request_dict) 204 205 else: 206 raise ValueError("No valid request found in the response") 207 208 209def process_arm_cartesian_goal(response: GoToRequest) -> Dict[str, Any]: 210 """Process the response from a goto request containing an arm cartesian goal.""" 211 request_dict = {} 212 request_dict["part"] = response.cartesian_goal.arm_cartesian_goal.id.name 213 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 214 request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space) 215 request_dict["duration"] = response.cartesian_goal.arm_cartesian_goal.duration.value 216 request_dict["target_pose"] = np.reshape(response.cartesian_goal.arm_cartesian_goal.goal_pose.data, (4, 4)) 217 if request_dict["mode"] == "elliptical": 218 arc_direction = get_arc_direction(response.elliptical_parameters.arc_direction) 219 secondary_radius = response.elliptical_parameters.secondary_radius.value 220 request_dict["elliptical_params"] = EllipticalParameters(arc_direction, secondary_radius) 221 else: 222 request_dict["elliptical_params"] = None 223 return request_dict 224 225 226def process_arm_joint_goal(response: GoToRequest) -> Dict[str, Any]: 227 """Process the response from a goto request containing an arm joint goal.""" 228 request_dict = {} 229 request_dict["part"] = response.joints_goal.arm_joint_goal.id.name 230 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 231 request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space) 232 request_dict["duration"] = response.joints_goal.arm_joint_goal.duration.value 233 request_dict["target_joints"] = arm_position_to_list(response.joints_goal.arm_joint_goal.joints_goal, degrees=True) 234 request_dict["elliptical_params"] = None 235 return request_dict 236 237 238def process_neck_joint_goal(response: GoToRequest) -> Dict[str, Any]: 239 """Process the response from a goto request containing a neck joint goal.""" 240 request_dict = {} 241 request_dict["part"] = response.joints_goal.neck_joint_goal.id.name 242 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 243 request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space) 244 request_dict["duration"] = response.joints_goal.neck_joint_goal.duration.value 245 request_dict["target_joints"] = ext_euler_angles_to_list( 246 response.joints_goal.neck_joint_goal.joints_goal.rotation.rpy, degrees=True 247 ) 248 request_dict["elliptical_params"] = None 249 return request_dict 250 251 252def process_antenna_joint_goal(response: GoToRequest) -> Dict[str, Any]: 253 """Process the response from a goto request containing an antenna joint goal.""" 254 request_dict = {} 255 request_dict["part"] = response.joints_goal.antenna_joint_goal.antenna.id.name 256 if request_dict["part"] == "antenna_right": 257 request_dict["part"] = "r_antenna" 258 elif request_dict["part"] == "antenna_left": 259 request_dict["part"] = "l_antenna" 260 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 261 request_dict["interpolation_space"] = "joint_space" 262 request_dict["duration"] = response.joints_goal.antenna_joint_goal.duration.value 263 request_dict["target_joints"] = np.rad2deg(response.joints_goal.antenna_joint_goal.joint_goal.value) 264 request_dict["elliptical_params"] = None 265 return request_dict 266 267 268def process_hand_joint_goal(response: GoToRequest) -> Dict[str, Any]: 269 """Process the response from a goto request containing a hand joint goal.""" 270 request_dict = {} 271 request_dict["part"] = response.joints_goal.hand_joint_goal.goal_request.id.name 272 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 273 request_dict["interpolation_space"] = "joint_space" 274 request_dict["duration"] = response.joints_goal.hand_joint_goal.duration.value 275 request_dict["target_joints"] = np.rad2deg( 276 response.joints_goal.hand_joint_goal.goal_request.position.parallel_gripper.position.value 277 ) 278 request_dict["elliptical_params"] = None 279 return request_dict 280 281 282def process_odometry_goal(response: GoToRequest) -> Dict[str, Any]: 283 """Process the response from a goto request containing an odometry goal.""" 284 request_dict = {} 285 request_dict["part"] = response.odometry_goal.odometry_goal.id.name 286 request_dict["timeout"] = response.odometry_goal.timeout.value 287 request_dict["distance_tolerance"] = response.odometry_goal.distance_tolerance.value 288 request_dict["angle_tolerance"] = np.rad2deg(response.odometry_goal.angle_tolerance.value) 289 request_dict["target"] = { 290 "x": response.odometry_goal.odometry_goal.direction.x.value, 291 "y": response.odometry_goal.odometry_goal.direction.y.value, 292 "theta": np.rad2deg(response.odometry_goal.odometry_goal.direction.theta.value), 293 } 294 return request_dict 295 296 297def build_simplified_joints_request(request_dict: Dict[str, Any]) -> SimplifiedRequest: 298 """Build a SimplifiedRequest object from a dictionary of joints request details.""" 299 target = TargetJointsRequest( 300 joints=request_dict.get("target_joints", None), 301 pose=request_dict.get("target_pose", None), 302 ) 303 304 joints_request = JointsRequest( 305 target=target, 306 duration=request_dict["duration"], 307 mode=request_dict["mode"], 308 interpolation_space=request_dict["interpolation_space"], 309 elliptical_parameters=request_dict["elliptical_params"], 310 ) 311 312 full_request = SimplifiedRequest( 313 part=request_dict["part"], 314 request=joints_request, 315 ) 316 317 return full_request 318 319 320def build_simplified_odometry_request(request_dict: Dict[str, Any]) -> SimplifiedRequest: 321 """Build a SimplifiedRequest object from a dictionary of odomztry request details.""" 322 odometry_request = OdometryRequest( 323 target=request_dict["target"], 324 timeout=request_dict["timeout"], 325 distance_tolerance=request_dict["distance_tolerance"], 326 angle_tolerance=request_dict["angle_tolerance"], 327 ) 328 329 full_request = SimplifiedRequest( 330 part=request_dict["part"], 331 request=odometry_request, 332 ) 333 334 return full_request
32class IGoToBasedElement(ABC): 33 """Interface for elements (parts or components) of Reachy that use goto functions. 34 35 The `IGoToBasedElements` class defines a common interface for handling goto-based movements. It is 36 designed to be implemented by any parts or components of the robot that perform movements via the goto mechanism. 37 """ 38 39 def __init__( 40 self, 41 element_id: ComponentId | PartId, 42 goto_stub: GoToServiceStub, 43 ) -> None: 44 """Initialize the IGoToBasedElement interface. 45 46 Sets up the common attributes needed for handling goto-based movements. This includes 47 associating the component with the interface and setting up the gRPC stub for performing 48 goto commands. 49 50 Args: 51 element_id: The robot component or part that uses this interface. 52 goto_stub: The gRPC stub used to send goto commands to the robot element. 53 """ 54 self._element_id = element_id 55 self._goto_stub = goto_stub 56 self._logger_goto = logging.getLogger(__name__) # not using self._logger to avoid name conflict in multiple inheritance 57 58 @abstractmethod 59 def get_goto_playing(self) -> GoToId: 60 """Return the GoToId of the currently playing goto movement on a specific element.""" 61 pass 62 63 @abstractmethod 64 def get_goto_queue(self) -> List[GoToId]: 65 """Return a list of all GoToIds waiting to be played on a specific element.""" 66 pass 67 68 @abstractmethod 69 def cancel_all_goto(self) -> GoToAck: 70 """Request the cancellation of all playing and waiting goto commands for a specific element. 71 72 Returns: 73 A GoToAck acknowledging the cancellation of all goto commands. 74 """ 75 pass 76 77 def _get_goto_request(self, goto_id: GoToId) -> Optional[SimplifiedRequest]: 78 """Retrieve the details of a goto command based on its GoToId. 79 80 Args: 81 goto_id: The ID of the goto command for which details are requested. 82 83 Returns: 84 A `SimplifiedRequest` object containing the element name, joint goal positions 85 (in degrees), movement duration, and interpolation mode. 86 Returns `None` if the robot is not connected or if the `goto_id` is invalid. 87 88 Raises: 89 TypeError: If `goto_id` is not an instance of `GoToId`. 90 ValueError: If `goto_id` is -1, indicating an invalid command. 91 """ 92 if not isinstance(goto_id, GoToId): 93 raise TypeError(f"goto_id must be a GoToId, got {type(goto_id).__name__}") 94 if goto_id.id == -1: 95 raise ValueError("No answer was found for given move, goto_id is -1") 96 97 response = self._goto_stub.GetGoToRequest(goto_id) 98 99 full_request = process_goto_request(response) 100 101 return full_request 102 103 def _is_goto_finished(self, id: GoToId) -> bool: 104 """Check if the goto movement has been completed or cancelled. 105 106 Returns: 107 `True` if the goto has been played or cancelled, `False` otherwise. 108 """ 109 state = self._goto_stub.GetGoToState(id) 110 result = bool( 111 state.goal_status == GoalStatus.STATUS_ABORTED 112 or state.goal_status == GoalStatus.STATUS_CANCELED 113 or state.goal_status == GoalStatus.STATUS_SUCCEEDED 114 ) 115 return result 116 117 def _wait_goto(self, id: GoToId, duration: float) -> None: 118 """Wait for a goto to finish. timeout is in seconds.""" 119 self._logger_goto.info(f"Waiting for movement with {id}.") 120 121 if not self._is_goto_already_over(id, duration): 122 info_gotos = [self._get_goto_request(id)] 123 ids_queue = self.get_goto_queue() 124 for goto_id in ids_queue: 125 info_gotos.append(self._get_goto_request(goto_id)) 126 127 timeout = 1 # adding one more sec 128 for igoto in info_gotos: 129 if igoto is not None: 130 if type(igoto.request) is JointsRequest: 131 timeout += igoto.request.duration 132 elif type(igoto.request) is OdometryRequest: 133 timeout += igoto.request.timeout 134 135 self._logger_goto.debug(f"timeout is set to {timeout}") 136 137 t_start = time.time() # timeout for others 138 while not self._is_goto_finished(id): 139 time.sleep(0.1) 140 141 if time.time() - t_start > timeout: 142 self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.") 143 return 144 145 self._logger_goto.info(f"Movement with {id} finished.") 146 147 def _is_goto_already_over(self, id: GoToId, timeout: float) -> bool: 148 """Check if the goto movement is already over.""" 149 t0 = time.time() 150 id_playing = self.get_goto_playing() 151 while id_playing.id == -1: 152 time.sleep(0.005) 153 id_playing = self.get_goto_playing() 154 155 if self._is_goto_finished(id): 156 return True 157 158 # manage an id_playing staying at -1 159 if time.time() - t0 > timeout: 160 self._logger_goto.warning(f"Waiting time for movement with {id} is timeout.") 161 return True 162 return False 163 164 @abstractmethod 165 def _check_goto_parameters(self, target: Any, duration: Optional[float], q0: Optional[List[float]] = None) -> None: 166 """Check the validity of the parameters for a goto movement.""" 167 pass # pragma: no cover 168 169 @abstractmethod 170 def goto_posture( 171 self, 172 common_posture: str = "default", 173 duration: float = 2, 174 wait: bool = False, 175 wait_for_goto_end: bool = True, 176 interpolation_mode: str = "minimum_jerk", 177 ) -> GoToId: 178 """Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode.""" 179 pass # pragma: no cover
Interface for elements (parts or components) of Reachy that use goto functions.
The IGoToBasedElements
class defines a common interface for handling goto-based movements. It is
designed to be implemented by any parts or components of the robot that perform movements via the goto mechanism.
39 def __init__( 40 self, 41 element_id: ComponentId | PartId, 42 goto_stub: GoToServiceStub, 43 ) -> None: 44 """Initialize the IGoToBasedElement interface. 45 46 Sets up the common attributes needed for handling goto-based movements. This includes 47 associating the component with the interface and setting up the gRPC stub for performing 48 goto commands. 49 50 Args: 51 element_id: The robot component or part that uses this interface. 52 goto_stub: The gRPC stub used to send goto commands to the robot element. 53 """ 54 self._element_id = element_id 55 self._goto_stub = goto_stub 56 self._logger_goto = logging.getLogger(__name__) # not using self._logger to avoid name conflict in multiple inheritance
Initialize the IGoToBasedElement interface.
Sets up the common attributes needed for handling goto-based movements. This includes associating the component with the interface and setting up the gRPC stub for performing goto commands.
Arguments:
- element_id: The robot component or part that uses this interface.
- goto_stub: The gRPC stub used to send goto commands to the robot element.
58 @abstractmethod 59 def get_goto_playing(self) -> GoToId: 60 """Return the GoToId of the currently playing goto movement on a specific element.""" 61 pass
Return the GoToId of the currently playing goto movement on a specific element.
63 @abstractmethod 64 def get_goto_queue(self) -> List[GoToId]: 65 """Return a list of all GoToIds waiting to be played on a specific element.""" 66 pass
Return a list of all GoToIds waiting to be played on a specific element.
68 @abstractmethod 69 def cancel_all_goto(self) -> GoToAck: 70 """Request the cancellation of all playing and waiting goto commands for a specific element. 71 72 Returns: 73 A GoToAck acknowledging the cancellation of all goto commands. 74 """ 75 pass
Request the cancellation of all playing and waiting goto commands for a specific element.
Returns:
A GoToAck acknowledging the cancellation of all goto commands.
169 @abstractmethod 170 def goto_posture( 171 self, 172 common_posture: str = "default", 173 duration: float = 2, 174 wait: bool = False, 175 wait_for_goto_end: bool = True, 176 interpolation_mode: str = "minimum_jerk", 177 ) -> GoToId: 178 """Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode.""" 179 pass # pragma: no cover
Send all joints to standard positions with optional parameters for duration, waiting, and interpolation mode.
182def process_goto_request(response: GoToRequest) -> Optional[SimplifiedRequest]: 183 """Process the response from a goto request and return a SimplifiedRequest object.""" 184 if response.HasField("cartesian_goal"): 185 request_dict = process_arm_cartesian_goal(response) 186 return build_simplified_joints_request(request_dict) 187 188 elif response.HasField("joints_goal"): 189 match response.joints_goal.WhichOneof("joints_goal"): 190 case "arm_joint_goal": 191 request_dict = process_arm_joint_goal(response) 192 case "neck_joint_goal": 193 request_dict = process_neck_joint_goal(response) 194 case "antenna_joint_goal": 195 request_dict = process_antenna_joint_goal(response) 196 case "hand_joint_goal": 197 request_dict = process_hand_joint_goal(response) 198 case _: 199 raise ValueError("No valid joint goal found in the response") 200 return build_simplified_joints_request(request_dict) 201 202 elif response.HasField("odometry_goal"): 203 request_dict = process_odometry_goal(response) 204 return build_simplified_odometry_request(request_dict) 205 206 else: 207 raise ValueError("No valid request found in the response")
Process the response from a goto request and return a SimplifiedRequest object.
210def process_arm_cartesian_goal(response: GoToRequest) -> Dict[str, Any]: 211 """Process the response from a goto request containing an arm cartesian goal.""" 212 request_dict = {} 213 request_dict["part"] = response.cartesian_goal.arm_cartesian_goal.id.name 214 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 215 request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space) 216 request_dict["duration"] = response.cartesian_goal.arm_cartesian_goal.duration.value 217 request_dict["target_pose"] = np.reshape(response.cartesian_goal.arm_cartesian_goal.goal_pose.data, (4, 4)) 218 if request_dict["mode"] == "elliptical": 219 arc_direction = get_arc_direction(response.elliptical_parameters.arc_direction) 220 secondary_radius = response.elliptical_parameters.secondary_radius.value 221 request_dict["elliptical_params"] = EllipticalParameters(arc_direction, secondary_radius) 222 else: 223 request_dict["elliptical_params"] = None 224 return request_dict
Process the response from a goto request containing an arm cartesian goal.
227def process_arm_joint_goal(response: GoToRequest) -> Dict[str, Any]: 228 """Process the response from a goto request containing an arm joint goal.""" 229 request_dict = {} 230 request_dict["part"] = response.joints_goal.arm_joint_goal.id.name 231 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 232 request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space) 233 request_dict["duration"] = response.joints_goal.arm_joint_goal.duration.value 234 request_dict["target_joints"] = arm_position_to_list(response.joints_goal.arm_joint_goal.joints_goal, degrees=True) 235 request_dict["elliptical_params"] = None 236 return request_dict
Process the response from a goto request containing an arm joint goal.
239def process_neck_joint_goal(response: GoToRequest) -> Dict[str, Any]: 240 """Process the response from a goto request containing a neck joint goal.""" 241 request_dict = {} 242 request_dict["part"] = response.joints_goal.neck_joint_goal.id.name 243 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 244 request_dict["interpolation_space"] = get_interpolation_space(response.interpolation_space.interpolation_space) 245 request_dict["duration"] = response.joints_goal.neck_joint_goal.duration.value 246 request_dict["target_joints"] = ext_euler_angles_to_list( 247 response.joints_goal.neck_joint_goal.joints_goal.rotation.rpy, degrees=True 248 ) 249 request_dict["elliptical_params"] = None 250 return request_dict
Process the response from a goto request containing a neck joint goal.
253def process_antenna_joint_goal(response: GoToRequest) -> Dict[str, Any]: 254 """Process the response from a goto request containing an antenna joint goal.""" 255 request_dict = {} 256 request_dict["part"] = response.joints_goal.antenna_joint_goal.antenna.id.name 257 if request_dict["part"] == "antenna_right": 258 request_dict["part"] = "r_antenna" 259 elif request_dict["part"] == "antenna_left": 260 request_dict["part"] = "l_antenna" 261 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 262 request_dict["interpolation_space"] = "joint_space" 263 request_dict["duration"] = response.joints_goal.antenna_joint_goal.duration.value 264 request_dict["target_joints"] = np.rad2deg(response.joints_goal.antenna_joint_goal.joint_goal.value) 265 request_dict["elliptical_params"] = None 266 return request_dict
Process the response from a goto request containing an antenna joint goal.
269def process_hand_joint_goal(response: GoToRequest) -> Dict[str, Any]: 270 """Process the response from a goto request containing a hand joint goal.""" 271 request_dict = {} 272 request_dict["part"] = response.joints_goal.hand_joint_goal.goal_request.id.name 273 request_dict["mode"] = get_interpolation_mode(response.interpolation_mode.interpolation_type) 274 request_dict["interpolation_space"] = "joint_space" 275 request_dict["duration"] = response.joints_goal.hand_joint_goal.duration.value 276 request_dict["target_joints"] = np.rad2deg( 277 response.joints_goal.hand_joint_goal.goal_request.position.parallel_gripper.position.value 278 ) 279 request_dict["elliptical_params"] = None 280 return request_dict
Process the response from a goto request containing a hand joint goal.
283def process_odometry_goal(response: GoToRequest) -> Dict[str, Any]: 284 """Process the response from a goto request containing an odometry goal.""" 285 request_dict = {} 286 request_dict["part"] = response.odometry_goal.odometry_goal.id.name 287 request_dict["timeout"] = response.odometry_goal.timeout.value 288 request_dict["distance_tolerance"] = response.odometry_goal.distance_tolerance.value 289 request_dict["angle_tolerance"] = np.rad2deg(response.odometry_goal.angle_tolerance.value) 290 request_dict["target"] = { 291 "x": response.odometry_goal.odometry_goal.direction.x.value, 292 "y": response.odometry_goal.odometry_goal.direction.y.value, 293 "theta": np.rad2deg(response.odometry_goal.odometry_goal.direction.theta.value), 294 } 295 return request_dict
Process the response from a goto request containing an odometry goal.
298def build_simplified_joints_request(request_dict: Dict[str, Any]) -> SimplifiedRequest: 299 """Build a SimplifiedRequest object from a dictionary of joints request details.""" 300 target = TargetJointsRequest( 301 joints=request_dict.get("target_joints", None), 302 pose=request_dict.get("target_pose", None), 303 ) 304 305 joints_request = JointsRequest( 306 target=target, 307 duration=request_dict["duration"], 308 mode=request_dict["mode"], 309 interpolation_space=request_dict["interpolation_space"], 310 elliptical_parameters=request_dict["elliptical_params"], 311 ) 312 313 full_request = SimplifiedRequest( 314 part=request_dict["part"], 315 request=joints_request, 316 ) 317 318 return full_request
Build a SimplifiedRequest object from a dictionary of joints request details.
321def build_simplified_odometry_request(request_dict: Dict[str, Any]) -> SimplifiedRequest: 322 """Build a SimplifiedRequest object from a dictionary of odomztry request details.""" 323 odometry_request = OdometryRequest( 324 target=request_dict["target"], 325 timeout=request_dict["timeout"], 326 distance_tolerance=request_dict["distance_tolerance"], 327 angle_tolerance=request_dict["angle_tolerance"], 328 ) 329 330 full_request = SimplifiedRequest( 331 part=request_dict["part"], 332 request=odometry_request, 333 ) 334 335 return full_request
Build a SimplifiedRequest object from a dictionary of odomztry request details.