reachy2_sdk.parts.mobile_base
Reachy MobileBase module.
Handles all specific methods to a MobileBase.
1"""Reachy MobileBase module. 2 3Handles all specific methods to a MobileBase. 4""" 5 6import logging 7import time 8from typing import Any, Dict, List, Optional 9 10import grpc 11import numpy as np 12from google.protobuf.wrappers_pb2 import FloatValue 13from numpy import deg2rad, rad2deg, round 14from reachy2_sdk_api.goto_pb2 import GoToId, GoToRequest, OdometryGoal 15from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub 16from reachy2_sdk_api.mobile_base_mobility_pb2 import ( 17 DirectionVector, 18 TargetDirectionCommand, 19) 20from reachy2_sdk_api.mobile_base_mobility_pb2_grpc import MobileBaseMobilityServiceStub 21from reachy2_sdk_api.mobile_base_utility_pb2 import ( 22 ControlModeCommand, 23 ControlModePossiblities, 24) 25from reachy2_sdk_api.mobile_base_utility_pb2 import MobileBase as MobileBase_proto 26from reachy2_sdk_api.mobile_base_utility_pb2 import ( 27 MobileBaseState, 28 MobileBaseStatus, 29 ZuuuModeCommand, 30 ZuuuModePossiblities, 31) 32from reachy2_sdk_api.mobile_base_utility_pb2_grpc import MobileBaseUtilityServiceStub 33 34from ..sensors.lidar import Lidar 35from .goto_based_part import IGoToBasedPart 36from .part import Part 37 38 39class MobileBase(Part, IGoToBasedPart): 40 """MobileBase class for controlling Reachy's mobile base. 41 42 This class provides methods to interact with and control the mobile base of a Reachy robot. It allows 43 users to access essential information such as battery voltage and odometry, as well as send commands 44 to move the base to specified positions or velocities. The class supports different drive modes and 45 control modes, and provides methods for resetting the base's odometry. 46 47 Attributes: 48 lidar: Lidar object for handling safety features. 49 """ 50 51 def __init__( 52 self, 53 mb_msg: MobileBase_proto, 54 initial_state: MobileBaseState, 55 grpc_channel: grpc.Channel, 56 goto_stub: GoToServiceStub, 57 ) -> None: 58 """Initialize the MobileBase with its gRPC communication and configuration. 59 60 This sets up the gRPC communication channel and service stubs for controlling the 61 mobile base, initializes the drive and control modes. 62 It also sets up the LIDAR safety monitoring. 63 64 Args: 65 mb_msg: A MobileBase_proto message containing the configuration details for the mobile base. 66 initial_state: The initial state of the mobile base, as a MobileBaseState object. 67 grpc_channel: The gRPC channel used to communicate with the mobile base service. 68 goto_stub: The gRPC service stub for the GoTo service. 69 """ 70 self._logger = logging.getLogger(__name__) 71 super().__init__(mb_msg, grpc_channel, MobileBaseUtilityServiceStub(grpc_channel)) 72 IGoToBasedPart.__init__(self, self._part_id, goto_stub) 73 74 self._mobility_stub = MobileBaseMobilityServiceStub(grpc_channel) 75 76 self._drive_mode: str = ZuuuModePossiblities.keys()[initial_state.zuuu_mode.mode].lower() 77 self._control_mode: str = ControlModePossiblities.keys()[initial_state.control_mode.mode].lower() 78 self._battery_level = 30.0 79 80 self._max_xy_vel = 0.61 81 self._max_rot_vel = 114.0 82 self._max_xy_goto = 1.0 83 84 self.lidar = Lidar(initial_state.lidar_safety, grpc_channel, self) 85 86 self._update_with(initial_state) 87 88 def __repr__(self) -> str: 89 """Clean representation of a mobile base.""" 90 repr_template = ( 91 "<MobileBase on={on} \n" " lidar_safety_enabled={lidar_safety_enabled} \n" " battery_voltage={battery_voltage}>" 92 ) 93 return repr_template.format( 94 on=self.is_on(), 95 lidar_safety_enabled=self.lidar.safety_enabled, 96 battery_voltage=self.battery_voltage, 97 ) 98 99 @property 100 def battery_voltage(self) -> float: 101 """Return the battery voltage. 102 103 The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low, 104 a warning message is logged. 105 106 Returns: 107 The current battery voltage as a float, rounded to one decimal place. 108 """ 109 battery_level = float(round(self._battery_level, 1)) 110 if battery_level < 24.5: 111 self._logger.warning(f"Low battery level: {battery_level}V. Consider recharging.") 112 return float(round(self._battery_level, 1)) 113 114 @property 115 def odometry(self) -> Dict[str, float]: 116 """Return the odometry of the base. 117 118 The odometry includes the x and y positions in meters and theta in degrees, along with the 119 velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second. 120 121 Returns: 122 A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta', 123 each rounded to three decimal places. 124 """ 125 response = self._stub.GetOdometry(self._part_id) 126 odom = { 127 "x": response.x.value, 128 "y": response.y.value, 129 "theta": rad2deg(response.theta.value), 130 "vx": response.vx.value, 131 "vy": response.vy.value, 132 "vtheta": rad2deg(response.vtheta.value), 133 } 134 return odom 135 136 @property 137 def last_cmd_vel(self) -> Dict[str, float]: 138 """Return the last command velocity sent to the base. 139 140 The velocity includes the x and y components in meters per second and the theta component in degrees per second. 141 142 Returns: 143 A dictionary containing the last command velocity with keys 'x', 'y', and 'theta', 144 each rounded to three decimal places. 145 """ 146 response = self._mobility_stub.GetLastDirection(self._part_id) 147 cmd_vel = { 148 "x": round(response.x.value, 3), 149 "y": round(response.y.value, 3), 150 "theta": round(rad2deg(response.theta.value), 3), 151 } 152 return cmd_vel 153 154 def _set_drive_mode(self, mode: str) -> None: 155 """Set the base's drive mode. 156 157 The drive mode must be one of the allowed modes, excluding 'speed' and 'goto'. If the mode is 158 valid, the base's drive mode is set accordingly. 159 160 Args: 161 mode: The desired drive mode as a string. Possible drive modes are: 162 ['cmd_vel', 'brake', 'free_wheel', 'emergency_stop', 'cmd_goto']. 163 164 Raises: 165 ValueError: If the specified drive mode is not one of the allowed modes. 166 """ 167 all_drive_modes = [mode.lower() for mode in ZuuuModePossiblities.keys()][1:] 168 possible_drive_modes = [mode for mode in all_drive_modes if mode not in ("speed", "goto")] 169 if mode in possible_drive_modes: 170 req = ZuuuModeCommand(mode=getattr(ZuuuModePossiblities, mode.upper())) 171 self._stub.SetZuuuMode(req) 172 self._drive_mode = mode 173 else: 174 raise ValueError(f"Drive mode requested should be in {possible_drive_modes}!") 175 176 def _set_control_mode(self, mode: str) -> None: 177 """Set the base's control mode. 178 179 The control mode must be one of the allowed modes. If the mode is valid, the base's control mode is set accordingly. 180 181 Args: 182 mode: The desired control mode as a string. Possible control modes are: ['open_loop', 'pid'] 183 184 Raises: 185 ValueError: If the specified control mode is not one of the allowed modes. 186 """ 187 possible_control_modes = [mode.lower() for mode in ControlModePossiblities.keys()][1:] 188 if mode in possible_control_modes: 189 req = ControlModeCommand(mode=getattr(ControlModePossiblities, mode.upper())) 190 self._stub.SetControlMode(req) 191 self._control_mode = mode 192 else: 193 raise ValueError(f"Control mode requested should be in {possible_control_modes}!") 194 195 def is_on(self) -> bool: 196 """Check if the mobile base is currently stiff (not in free-wheel mode). 197 198 Returns: 199 `True` if the mobile base is not compliant (stiff), `False` otherwise. 200 """ 201 return not self._drive_mode == "free_wheel" 202 203 def is_off(self) -> bool: 204 """Check if the mobile base is currently compliant (in free-wheel mode). 205 206 Returns: 207 True if the mobile base is compliant (in free-wheel mode), `False` otherwise. 208 """ 209 if self._drive_mode == "free_wheel": 210 return True 211 return False 212 213 def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]: 214 """Get the current odometry of the mobile base in its reference frame. 215 216 Args: 217 degrees (bool, optional): Whether to return the orientation (`theta` and `vtheta`) in degrees. 218 Defaults to True. 219 220 Returns: 221 Dict[str, float]: A dictionary containing the current odometry of the mobile base with: 222 - 'x': Position along the x-axis (in meters). 223 - 'y': Position along the y-axis (in meters). 224 - 'theta': Orientation (in degrees by default, radians if `degrees=False`). 225 - 'vx': Linear velocity along the x-axis (in meters per second). 226 - 'vy': Linear velocity along the y-axis (in meters per second). 227 - 'vtheta': Angular velocity (in degrees per second by default, radians if `degrees=False`). 228 """ 229 current_state = self.odometry.copy() 230 if not degrees: 231 current_state["theta"] = deg2rad(current_state["theta"]) 232 current_state["vtheta"] = deg2rad(current_state["vtheta"]) 233 234 return current_state 235 236 def goto( 237 self, 238 x: float, 239 y: float, 240 theta: float, 241 wait: bool = False, 242 degrees: bool = True, 243 distance_tolerance: Optional[float] = 0.05, 244 angle_tolerance: Optional[float] = None, 245 timeout: float = 100, 246 ) -> GoToId: 247 """Send the mobile base to a specified target position. 248 249 The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees. 250 The zero position is set when the mobile base is started or when the `reset_odometry` method is called. A timeout 251 can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for 252 reaching the target position. 253 254 Args: 255 x: The target x-coordinate in meters. 256 y: The target y-coordinate in meters. 257 theta: The target orientation in degrees. 258 wait: If True, the function waits until the movement is completed before returning. 259 Defaults to False. 260 degrees: If True, the theta value and angle_tolerance are treated as degrees. 261 Defaults to True. 262 distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters. 263 angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters. 264 timeout: Optional; the maximum time allowed to reach the target, in seconds. 265 266 Returns: 267 GoToId: The unique GoToId identifier for the movement command. 268 269 Raises: 270 TypeError: If the target is not reached and the mobile base is stopped due to an obstacle. 271 """ 272 if self.is_off(): 273 self._logger.warning("Mobile base is off. Goto not sent.") 274 return 275 276 self._check_goto_parameters(target=[x, y, theta]) 277 self._check_optional_goto_parameters(distance_tolerance, angle_tolerance, timeout) 278 279 if degrees: 280 theta = deg2rad(theta) 281 if angle_tolerance is not None: 282 angle_tolerance = deg2rad(angle_tolerance) 283 284 if angle_tolerance is None: 285 angle_tolerance = deg2rad(5.0) 286 287 vector_goal = TargetDirectionCommand( 288 id=self._part_id, 289 direction=DirectionVector( 290 x=FloatValue(value=x), 291 y=FloatValue(value=y), 292 theta=FloatValue(value=theta), 293 ), 294 ) 295 296 odometry_goal = OdometryGoal( 297 odometry_goal=vector_goal, 298 distance_tolerance=FloatValue(value=distance_tolerance), 299 angle_tolerance=FloatValue(value=angle_tolerance), 300 timeout=FloatValue(value=timeout), 301 ) 302 303 request = GoToRequest( 304 odometry_goal=odometry_goal, 305 ) 306 307 response = self._goto_stub.GoToOdometry(request) 308 309 if response.id == -1: 310 self._logger.error(f"Unable to go to requested position x={x}, y={y}, theta={theta}. No command sent.") 311 elif wait: 312 self._wait_goto(response, timeout) 313 314 return response 315 316 def translate_by( 317 self, 318 x: float, 319 y: float, 320 wait: bool = False, 321 distance_tolerance: Optional[float] = 0.05, 322 timeout: float = 100, 323 ) -> GoToId: 324 """Send a target position relative to the current position of the mobile base. 325 326 The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space. 327 328 Args: 329 x: The desired translation along the x-axis in meters. 330 y: The desired translation along the y-axis in meters. 331 wait: If True, the function waits until the movement is completed before returning. 332 distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters. 333 timeout: An optional timeout for reaching the target position, in seconds. 334 335 Returns: 336 The GoToId of the movement command, created using the `goto` method. 337 """ 338 try: 339 goto = self.get_goto_queue()[-1] 340 except IndexError: 341 goto = self.get_goto_playing() 342 343 if goto.id != -1: 344 odom_request = self._get_goto_request(goto) 345 else: 346 odom_request = None 347 348 angle_tolerance = None 349 350 if odom_request is not None: 351 base_odom = odom_request.request.target 352 angle_tolerance = deg2rad(odom_request.request.angle_tolerance) 353 else: 354 base_odom = self.odometry 355 356 theta_goal = deg2rad(base_odom["theta"]) 357 x_goal = base_odom["x"] + (x * np.cos(theta_goal) - y * np.sin(theta_goal)) 358 y_goal = base_odom["y"] + (x * np.sin(theta_goal) + y * np.cos(theta_goal)) 359 360 return self.goto( 361 x_goal, 362 y_goal, 363 theta_goal, 364 wait=wait, 365 distance_tolerance=distance_tolerance, 366 angle_tolerance=angle_tolerance, 367 degrees=False, 368 timeout=timeout, 369 ) 370 371 def rotate_by( 372 self, 373 theta: float, 374 wait: bool = False, 375 degrees: bool = True, 376 angle_tolerance: Optional[float] = None, 377 timeout: float = 100, 378 ) -> GoToId: 379 """Send a target rotation relative to the current rotation of the mobile base. 380 381 The theta parameter defines the desired rotation in degrees. 382 383 Args: 384 theta: The desired rotation in degrees, relative to the current orientation. 385 wait: If True, the function waits until the rotation is completed before returning. 386 degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians. 387 angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished. 388 timeout: An optional timeout for completing the rotation, in seconds. 389 """ 390 try: 391 goto = self.get_goto_queue()[-1] 392 except IndexError: 393 goto = self.get_goto_playing() 394 395 if goto.id != -1: 396 odom_request = self._get_goto_request(goto) 397 else: 398 odom_request = None 399 400 distance_tolerance = 0.05 401 402 if odom_request is not None: 403 base_odom = odom_request.request.target 404 if angle_tolerance is None: 405 angle_tolerance = odom_request.request.angle_tolerance 406 distance_tolerance = odom_request.request.distance_tolerance 407 else: 408 base_odom = self.odometry 409 410 if degrees: 411 theta = base_odom["theta"] + theta 412 else: 413 theta = deg2rad(base_odom["theta"]) + theta 414 x = base_odom["x"] 415 y = base_odom["y"] 416 417 return self.goto( 418 x, 419 y, 420 theta, 421 wait=wait, 422 degrees=degrees, 423 distance_tolerance=distance_tolerance, 424 angle_tolerance=angle_tolerance, 425 timeout=timeout, 426 ) 427 428 def reset_odometry(self) -> None: 429 """Reset the odometry. 430 431 This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0). 432 If any goto is being played, stop the goto and the queued ones. 433 """ 434 if self.get_goto_playing().id != -1 or len(self.get_goto_queue()) != 0: 435 self._logger.warning( 436 "Odometry reset requested while goto in progress: aborting the current goto and all queued gotos." 437 ) 438 self._stub.ResetOdometry(self._part_id) 439 time.sleep(0.05) 440 441 def set_goal_speed(self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None: 442 """Set the goal speed for the mobile base. 443 444 This method sets the target velocities for the mobile base's movement along the x and y axes, as well as 445 its rotational speed. The actual movement is executed after calling `send_speed_command`. 446 447 Args: 448 vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0. 449 vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0. 450 vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0. 451 452 Raises: 453 TypeError: If any of the velocity values (`vx`, `vy`, `vtheta`) are not of type `float` or `int`. 454 455 Notes: 456 - Use `send_speed_command` after this method to execute the movement. 457 - The velocities will be used to command the mobile base for a short duration (0.2 seconds). 458 """ 459 for vel in [vx, vy, vtheta]: 460 if not isinstance(vel, float) | isinstance(vel, int): 461 raise TypeError("goal_speed must be a float or int") 462 463 self._x_vel_goal = vx 464 self._y_vel_goal = vy 465 self._rot_vel_goal = vtheta 466 467 def send_speed_command(self) -> None: 468 """Send the speed command to the mobile base, based on previously set goal speeds. 469 470 This method sends the velocity commands for the mobile base that were set with `set_goal_speed`. 471 The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code. 472 473 Raises: 474 ValueError: If the absolute value of `x_vel`, `y_vel`, or `rot_vel` exceeds the configured maximum values. 475 Warning: If the mobile base is off, no command is sent, and a warning is logged. 476 477 Notes: 478 - This method is optimal for sending frequent speed instructions to the mobile base. 479 - The goal velocities must be set with `set_goal_speed` before calling this function. 480 """ 481 if self.is_off(): 482 self._logger.warning(f"{self._part_id.name} is off. speed_command not sent.") 483 return 484 for vel, value in {"x_vel": self._x_vel_goal, "y_vel": self._y_vel_goal}.items(): 485 if abs(value) > self._max_xy_vel: 486 raise ValueError(f"The absolute value of {vel} should not be more than {self._max_xy_vel}, got {abs(value)}") 487 488 if abs(self._rot_vel_goal) > self._max_rot_vel: 489 raise ValueError( 490 f"The absolute value of rot_vel should not be more than {self._max_rot_vel}, got {abs(self._rot_vel_goal)}" 491 ) 492 493 if self._drive_mode != "cmd_vel": 494 self._set_drive_mode("cmd_vel") 495 496 req = TargetDirectionCommand( 497 direction=DirectionVector( 498 x=FloatValue(value=self._x_vel_goal), 499 y=FloatValue(value=self._y_vel_goal), 500 theta=FloatValue(value=deg2rad(self._rot_vel_goal)), 501 ) 502 ) 503 self._mobility_stub.SendDirection(req) 504 505 def _update_with(self, new_state: MobileBaseState) -> None: 506 """Update the mobile base's state with newly received data from the gRPC server. 507 508 This method updates the battery level, LIDAR safety information, drive mode, and control mode 509 of the mobile base. 510 511 Args: 512 new_state: The new state of the mobile base, as a MobileBaseState object. 513 """ 514 self._battery_level = new_state.battery_level.level.value 515 self.lidar._update_with(new_state.lidar_safety) 516 self._drive_mode = ZuuuModePossiblities.keys()[new_state.zuuu_mode.mode].lower() 517 self._control_mode = ControlModePossiblities.keys()[new_state.control_mode.mode].lower() 518 519 def _update_audit_status(self, new_status: MobileBaseStatus) -> None: 520 """Update the audit status of the mobile base. 521 522 This is a placeholder method and does not perform any actions. 523 524 Args: 525 new_status: The new status of the mobile base, as a MobileBaseStatus object. 526 """ 527 pass # pragma: no cover 528 529 def _set_speed_limits(self, value: int) -> None: 530 """Set the speed limits for the mobile base. 531 532 This method overrides the base class implementation to set speed limits. 533 534 Args: 535 value: The speed limit value to be set, as an integer. 536 """ 537 return super()._set_speed_limits(value) 538 539 def _check_goto_parameters(self, target: Any, duration: Optional[float] = None, q0: Optional[List[float]] = None) -> None: 540 """Check the validity of the parameters for the `goto` method. 541 542 Args: 543 duration: Not required here. 544 target: The target goal, as a list [x, y, theta] in the odometry coordinate system. 545 q0: Not required here. Defaults to None. 546 547 Raises: 548 TypeError: If the x goal is not a float or int. 549 TypeError: If the y goal is not a float or int. 550 TypeError: If the theta goal is not a float or int. 551 """ 552 self._check_type_float(target[0], "x") 553 self._check_type_float(target[1], "y") 554 self._check_type_float(target[2], "theta") 555 556 try: 557 goto = self.get_goto_queue()[-1] 558 except IndexError: 559 goto = self.get_goto_playing() 560 561 if goto.id != -1: 562 odom_request = self._get_goto_request(goto) 563 else: 564 odom_request = None 565 566 if odom_request is not None: 567 base_odom = odom_request.request.target 568 x_offset = abs(target[0] - base_odom["x"]) 569 y_offset = abs(target[1] - base_odom["y"]) 570 else: 571 x_offset = abs(target[0] - self.odometry["x"]) 572 y_offset = abs(target[1] - self.odometry["y"]) 573 for pos, value in {"x": x_offset, "y": y_offset}.items(): 574 if abs(value) > self._max_xy_goto: 575 raise ValueError(f"The displacement in {pos} should not be more than {self._max_xy_goto}, got {abs(value)}") 576 577 def _check_optional_goto_parameters( 578 self, distance_tolerance: Optional[float], angle_tolerance: Optional[float], timeout: Optional[float] 579 ) -> None: 580 """Check the validity of the optional parameters for the `goto` method. 581 582 Args: 583 distance_tolerance: The distance tolerance value to be checked. 584 angle_tolerance: The angle tolerance value to be checked. 585 timeout: The timeout value to be checked. 586 587 Raises: 588 ValueError: If the distance_tolerance is negative. 589 ValueError: If the angle_tolerance is negative. 590 ValueError: If the timeout is negative or null. 591 """ 592 if distance_tolerance is not None: 593 self._check_type_float(distance_tolerance, "distance_tolerance") 594 if distance_tolerance < 0: 595 raise ValueError(f"distance_tolerance must be a positive value, got {distance_tolerance}") 596 if angle_tolerance is not None: 597 self._check_type_float(angle_tolerance, "angle_tolerance") 598 if angle_tolerance < 0: 599 raise ValueError(f"angle_tolerance must be a positive value, got {angle_tolerance}") 600 if timeout is not None: 601 self._check_type_float(timeout, "timeout") 602 if timeout <= 0: 603 raise ValueError(f"timeout must be a positive value greater than 0, got {timeout}") 604 605 def _check_type_float(self, value: Any, arg_name: str) -> None: 606 """Check the type of the value parameter. 607 608 Args: 609 value: The value to be checked. 610 611 Raises: 612 TypeError: If the value is not a float or int. 613 """ 614 if not (isinstance(value, float) | isinstance(value, int)): 615 raise TypeError(f"{arg_name} must be a float or int, got {type(value)} instead") 616 617 def set_max_xy_goto(self, value: float) -> None: 618 """Set the maximum displacement in the x and y directions for the mobile base. 619 620 Args: 621 value: The maximum displacement value to be set, in meters. 622 """ 623 self._max_xy_goto = value 624 625 def goto_posture( 626 self, 627 common_posture: str = "default", 628 duration: float = 2, 629 wait: bool = False, 630 wait_for_goto_end: bool = True, 631 interpolation_mode: str = "minimum_jerk", 632 ) -> GoToId: 633 """Mobile base is not affected by goto_posture. No command is sent.""" 634 return super().goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)
40class MobileBase(Part, IGoToBasedPart): 41 """MobileBase class for controlling Reachy's mobile base. 42 43 This class provides methods to interact with and control the mobile base of a Reachy robot. It allows 44 users to access essential information such as battery voltage and odometry, as well as send commands 45 to move the base to specified positions or velocities. The class supports different drive modes and 46 control modes, and provides methods for resetting the base's odometry. 47 48 Attributes: 49 lidar: Lidar object for handling safety features. 50 """ 51 52 def __init__( 53 self, 54 mb_msg: MobileBase_proto, 55 initial_state: MobileBaseState, 56 grpc_channel: grpc.Channel, 57 goto_stub: GoToServiceStub, 58 ) -> None: 59 """Initialize the MobileBase with its gRPC communication and configuration. 60 61 This sets up the gRPC communication channel and service stubs for controlling the 62 mobile base, initializes the drive and control modes. 63 It also sets up the LIDAR safety monitoring. 64 65 Args: 66 mb_msg: A MobileBase_proto message containing the configuration details for the mobile base. 67 initial_state: The initial state of the mobile base, as a MobileBaseState object. 68 grpc_channel: The gRPC channel used to communicate with the mobile base service. 69 goto_stub: The gRPC service stub for the GoTo service. 70 """ 71 self._logger = logging.getLogger(__name__) 72 super().__init__(mb_msg, grpc_channel, MobileBaseUtilityServiceStub(grpc_channel)) 73 IGoToBasedPart.__init__(self, self._part_id, goto_stub) 74 75 self._mobility_stub = MobileBaseMobilityServiceStub(grpc_channel) 76 77 self._drive_mode: str = ZuuuModePossiblities.keys()[initial_state.zuuu_mode.mode].lower() 78 self._control_mode: str = ControlModePossiblities.keys()[initial_state.control_mode.mode].lower() 79 self._battery_level = 30.0 80 81 self._max_xy_vel = 0.61 82 self._max_rot_vel = 114.0 83 self._max_xy_goto = 1.0 84 85 self.lidar = Lidar(initial_state.lidar_safety, grpc_channel, self) 86 87 self._update_with(initial_state) 88 89 def __repr__(self) -> str: 90 """Clean representation of a mobile base.""" 91 repr_template = ( 92 "<MobileBase on={on} \n" " lidar_safety_enabled={lidar_safety_enabled} \n" " battery_voltage={battery_voltage}>" 93 ) 94 return repr_template.format( 95 on=self.is_on(), 96 lidar_safety_enabled=self.lidar.safety_enabled, 97 battery_voltage=self.battery_voltage, 98 ) 99 100 @property 101 def battery_voltage(self) -> float: 102 """Return the battery voltage. 103 104 The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low, 105 a warning message is logged. 106 107 Returns: 108 The current battery voltage as a float, rounded to one decimal place. 109 """ 110 battery_level = float(round(self._battery_level, 1)) 111 if battery_level < 24.5: 112 self._logger.warning(f"Low battery level: {battery_level}V. Consider recharging.") 113 return float(round(self._battery_level, 1)) 114 115 @property 116 def odometry(self) -> Dict[str, float]: 117 """Return the odometry of the base. 118 119 The odometry includes the x and y positions in meters and theta in degrees, along with the 120 velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second. 121 122 Returns: 123 A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta', 124 each rounded to three decimal places. 125 """ 126 response = self._stub.GetOdometry(self._part_id) 127 odom = { 128 "x": response.x.value, 129 "y": response.y.value, 130 "theta": rad2deg(response.theta.value), 131 "vx": response.vx.value, 132 "vy": response.vy.value, 133 "vtheta": rad2deg(response.vtheta.value), 134 } 135 return odom 136 137 @property 138 def last_cmd_vel(self) -> Dict[str, float]: 139 """Return the last command velocity sent to the base. 140 141 The velocity includes the x and y components in meters per second and the theta component in degrees per second. 142 143 Returns: 144 A dictionary containing the last command velocity with keys 'x', 'y', and 'theta', 145 each rounded to three decimal places. 146 """ 147 response = self._mobility_stub.GetLastDirection(self._part_id) 148 cmd_vel = { 149 "x": round(response.x.value, 3), 150 "y": round(response.y.value, 3), 151 "theta": round(rad2deg(response.theta.value), 3), 152 } 153 return cmd_vel 154 155 def _set_drive_mode(self, mode: str) -> None: 156 """Set the base's drive mode. 157 158 The drive mode must be one of the allowed modes, excluding 'speed' and 'goto'. If the mode is 159 valid, the base's drive mode is set accordingly. 160 161 Args: 162 mode: The desired drive mode as a string. Possible drive modes are: 163 ['cmd_vel', 'brake', 'free_wheel', 'emergency_stop', 'cmd_goto']. 164 165 Raises: 166 ValueError: If the specified drive mode is not one of the allowed modes. 167 """ 168 all_drive_modes = [mode.lower() for mode in ZuuuModePossiblities.keys()][1:] 169 possible_drive_modes = [mode for mode in all_drive_modes if mode not in ("speed", "goto")] 170 if mode in possible_drive_modes: 171 req = ZuuuModeCommand(mode=getattr(ZuuuModePossiblities, mode.upper())) 172 self._stub.SetZuuuMode(req) 173 self._drive_mode = mode 174 else: 175 raise ValueError(f"Drive mode requested should be in {possible_drive_modes}!") 176 177 def _set_control_mode(self, mode: str) -> None: 178 """Set the base's control mode. 179 180 The control mode must be one of the allowed modes. If the mode is valid, the base's control mode is set accordingly. 181 182 Args: 183 mode: The desired control mode as a string. Possible control modes are: ['open_loop', 'pid'] 184 185 Raises: 186 ValueError: If the specified control mode is not one of the allowed modes. 187 """ 188 possible_control_modes = [mode.lower() for mode in ControlModePossiblities.keys()][1:] 189 if mode in possible_control_modes: 190 req = ControlModeCommand(mode=getattr(ControlModePossiblities, mode.upper())) 191 self._stub.SetControlMode(req) 192 self._control_mode = mode 193 else: 194 raise ValueError(f"Control mode requested should be in {possible_control_modes}!") 195 196 def is_on(self) -> bool: 197 """Check if the mobile base is currently stiff (not in free-wheel mode). 198 199 Returns: 200 `True` if the mobile base is not compliant (stiff), `False` otherwise. 201 """ 202 return not self._drive_mode == "free_wheel" 203 204 def is_off(self) -> bool: 205 """Check if the mobile base is currently compliant (in free-wheel mode). 206 207 Returns: 208 True if the mobile base is compliant (in free-wheel mode), `False` otherwise. 209 """ 210 if self._drive_mode == "free_wheel": 211 return True 212 return False 213 214 def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]: 215 """Get the current odometry of the mobile base in its reference frame. 216 217 Args: 218 degrees (bool, optional): Whether to return the orientation (`theta` and `vtheta`) in degrees. 219 Defaults to True. 220 221 Returns: 222 Dict[str, float]: A dictionary containing the current odometry of the mobile base with: 223 - 'x': Position along the x-axis (in meters). 224 - 'y': Position along the y-axis (in meters). 225 - 'theta': Orientation (in degrees by default, radians if `degrees=False`). 226 - 'vx': Linear velocity along the x-axis (in meters per second). 227 - 'vy': Linear velocity along the y-axis (in meters per second). 228 - 'vtheta': Angular velocity (in degrees per second by default, radians if `degrees=False`). 229 """ 230 current_state = self.odometry.copy() 231 if not degrees: 232 current_state["theta"] = deg2rad(current_state["theta"]) 233 current_state["vtheta"] = deg2rad(current_state["vtheta"]) 234 235 return current_state 236 237 def goto( 238 self, 239 x: float, 240 y: float, 241 theta: float, 242 wait: bool = False, 243 degrees: bool = True, 244 distance_tolerance: Optional[float] = 0.05, 245 angle_tolerance: Optional[float] = None, 246 timeout: float = 100, 247 ) -> GoToId: 248 """Send the mobile base to a specified target position. 249 250 The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees. 251 The zero position is set when the mobile base is started or when the `reset_odometry` method is called. A timeout 252 can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for 253 reaching the target position. 254 255 Args: 256 x: The target x-coordinate in meters. 257 y: The target y-coordinate in meters. 258 theta: The target orientation in degrees. 259 wait: If True, the function waits until the movement is completed before returning. 260 Defaults to False. 261 degrees: If True, the theta value and angle_tolerance are treated as degrees. 262 Defaults to True. 263 distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters. 264 angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters. 265 timeout: Optional; the maximum time allowed to reach the target, in seconds. 266 267 Returns: 268 GoToId: The unique GoToId identifier for the movement command. 269 270 Raises: 271 TypeError: If the target is not reached and the mobile base is stopped due to an obstacle. 272 """ 273 if self.is_off(): 274 self._logger.warning("Mobile base is off. Goto not sent.") 275 return 276 277 self._check_goto_parameters(target=[x, y, theta]) 278 self._check_optional_goto_parameters(distance_tolerance, angle_tolerance, timeout) 279 280 if degrees: 281 theta = deg2rad(theta) 282 if angle_tolerance is not None: 283 angle_tolerance = deg2rad(angle_tolerance) 284 285 if angle_tolerance is None: 286 angle_tolerance = deg2rad(5.0) 287 288 vector_goal = TargetDirectionCommand( 289 id=self._part_id, 290 direction=DirectionVector( 291 x=FloatValue(value=x), 292 y=FloatValue(value=y), 293 theta=FloatValue(value=theta), 294 ), 295 ) 296 297 odometry_goal = OdometryGoal( 298 odometry_goal=vector_goal, 299 distance_tolerance=FloatValue(value=distance_tolerance), 300 angle_tolerance=FloatValue(value=angle_tolerance), 301 timeout=FloatValue(value=timeout), 302 ) 303 304 request = GoToRequest( 305 odometry_goal=odometry_goal, 306 ) 307 308 response = self._goto_stub.GoToOdometry(request) 309 310 if response.id == -1: 311 self._logger.error(f"Unable to go to requested position x={x}, y={y}, theta={theta}. No command sent.") 312 elif wait: 313 self._wait_goto(response, timeout) 314 315 return response 316 317 def translate_by( 318 self, 319 x: float, 320 y: float, 321 wait: bool = False, 322 distance_tolerance: Optional[float] = 0.05, 323 timeout: float = 100, 324 ) -> GoToId: 325 """Send a target position relative to the current position of the mobile base. 326 327 The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space. 328 329 Args: 330 x: The desired translation along the x-axis in meters. 331 y: The desired translation along the y-axis in meters. 332 wait: If True, the function waits until the movement is completed before returning. 333 distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters. 334 timeout: An optional timeout for reaching the target position, in seconds. 335 336 Returns: 337 The GoToId of the movement command, created using the `goto` method. 338 """ 339 try: 340 goto = self.get_goto_queue()[-1] 341 except IndexError: 342 goto = self.get_goto_playing() 343 344 if goto.id != -1: 345 odom_request = self._get_goto_request(goto) 346 else: 347 odom_request = None 348 349 angle_tolerance = None 350 351 if odom_request is not None: 352 base_odom = odom_request.request.target 353 angle_tolerance = deg2rad(odom_request.request.angle_tolerance) 354 else: 355 base_odom = self.odometry 356 357 theta_goal = deg2rad(base_odom["theta"]) 358 x_goal = base_odom["x"] + (x * np.cos(theta_goal) - y * np.sin(theta_goal)) 359 y_goal = base_odom["y"] + (x * np.sin(theta_goal) + y * np.cos(theta_goal)) 360 361 return self.goto( 362 x_goal, 363 y_goal, 364 theta_goal, 365 wait=wait, 366 distance_tolerance=distance_tolerance, 367 angle_tolerance=angle_tolerance, 368 degrees=False, 369 timeout=timeout, 370 ) 371 372 def rotate_by( 373 self, 374 theta: float, 375 wait: bool = False, 376 degrees: bool = True, 377 angle_tolerance: Optional[float] = None, 378 timeout: float = 100, 379 ) -> GoToId: 380 """Send a target rotation relative to the current rotation of the mobile base. 381 382 The theta parameter defines the desired rotation in degrees. 383 384 Args: 385 theta: The desired rotation in degrees, relative to the current orientation. 386 wait: If True, the function waits until the rotation is completed before returning. 387 degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians. 388 angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished. 389 timeout: An optional timeout for completing the rotation, in seconds. 390 """ 391 try: 392 goto = self.get_goto_queue()[-1] 393 except IndexError: 394 goto = self.get_goto_playing() 395 396 if goto.id != -1: 397 odom_request = self._get_goto_request(goto) 398 else: 399 odom_request = None 400 401 distance_tolerance = 0.05 402 403 if odom_request is not None: 404 base_odom = odom_request.request.target 405 if angle_tolerance is None: 406 angle_tolerance = odom_request.request.angle_tolerance 407 distance_tolerance = odom_request.request.distance_tolerance 408 else: 409 base_odom = self.odometry 410 411 if degrees: 412 theta = base_odom["theta"] + theta 413 else: 414 theta = deg2rad(base_odom["theta"]) + theta 415 x = base_odom["x"] 416 y = base_odom["y"] 417 418 return self.goto( 419 x, 420 y, 421 theta, 422 wait=wait, 423 degrees=degrees, 424 distance_tolerance=distance_tolerance, 425 angle_tolerance=angle_tolerance, 426 timeout=timeout, 427 ) 428 429 def reset_odometry(self) -> None: 430 """Reset the odometry. 431 432 This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0). 433 If any goto is being played, stop the goto and the queued ones. 434 """ 435 if self.get_goto_playing().id != -1 or len(self.get_goto_queue()) != 0: 436 self._logger.warning( 437 "Odometry reset requested while goto in progress: aborting the current goto and all queued gotos." 438 ) 439 self._stub.ResetOdometry(self._part_id) 440 time.sleep(0.05) 441 442 def set_goal_speed(self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None: 443 """Set the goal speed for the mobile base. 444 445 This method sets the target velocities for the mobile base's movement along the x and y axes, as well as 446 its rotational speed. The actual movement is executed after calling `send_speed_command`. 447 448 Args: 449 vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0. 450 vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0. 451 vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0. 452 453 Raises: 454 TypeError: If any of the velocity values (`vx`, `vy`, `vtheta`) are not of type `float` or `int`. 455 456 Notes: 457 - Use `send_speed_command` after this method to execute the movement. 458 - The velocities will be used to command the mobile base for a short duration (0.2 seconds). 459 """ 460 for vel in [vx, vy, vtheta]: 461 if not isinstance(vel, float) | isinstance(vel, int): 462 raise TypeError("goal_speed must be a float or int") 463 464 self._x_vel_goal = vx 465 self._y_vel_goal = vy 466 self._rot_vel_goal = vtheta 467 468 def send_speed_command(self) -> None: 469 """Send the speed command to the mobile base, based on previously set goal speeds. 470 471 This method sends the velocity commands for the mobile base that were set with `set_goal_speed`. 472 The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code. 473 474 Raises: 475 ValueError: If the absolute value of `x_vel`, `y_vel`, or `rot_vel` exceeds the configured maximum values. 476 Warning: If the mobile base is off, no command is sent, and a warning is logged. 477 478 Notes: 479 - This method is optimal for sending frequent speed instructions to the mobile base. 480 - The goal velocities must be set with `set_goal_speed` before calling this function. 481 """ 482 if self.is_off(): 483 self._logger.warning(f"{self._part_id.name} is off. speed_command not sent.") 484 return 485 for vel, value in {"x_vel": self._x_vel_goal, "y_vel": self._y_vel_goal}.items(): 486 if abs(value) > self._max_xy_vel: 487 raise ValueError(f"The absolute value of {vel} should not be more than {self._max_xy_vel}, got {abs(value)}") 488 489 if abs(self._rot_vel_goal) > self._max_rot_vel: 490 raise ValueError( 491 f"The absolute value of rot_vel should not be more than {self._max_rot_vel}, got {abs(self._rot_vel_goal)}" 492 ) 493 494 if self._drive_mode != "cmd_vel": 495 self._set_drive_mode("cmd_vel") 496 497 req = TargetDirectionCommand( 498 direction=DirectionVector( 499 x=FloatValue(value=self._x_vel_goal), 500 y=FloatValue(value=self._y_vel_goal), 501 theta=FloatValue(value=deg2rad(self._rot_vel_goal)), 502 ) 503 ) 504 self._mobility_stub.SendDirection(req) 505 506 def _update_with(self, new_state: MobileBaseState) -> None: 507 """Update the mobile base's state with newly received data from the gRPC server. 508 509 This method updates the battery level, LIDAR safety information, drive mode, and control mode 510 of the mobile base. 511 512 Args: 513 new_state: The new state of the mobile base, as a MobileBaseState object. 514 """ 515 self._battery_level = new_state.battery_level.level.value 516 self.lidar._update_with(new_state.lidar_safety) 517 self._drive_mode = ZuuuModePossiblities.keys()[new_state.zuuu_mode.mode].lower() 518 self._control_mode = ControlModePossiblities.keys()[new_state.control_mode.mode].lower() 519 520 def _update_audit_status(self, new_status: MobileBaseStatus) -> None: 521 """Update the audit status of the mobile base. 522 523 This is a placeholder method and does not perform any actions. 524 525 Args: 526 new_status: The new status of the mobile base, as a MobileBaseStatus object. 527 """ 528 pass # pragma: no cover 529 530 def _set_speed_limits(self, value: int) -> None: 531 """Set the speed limits for the mobile base. 532 533 This method overrides the base class implementation to set speed limits. 534 535 Args: 536 value: The speed limit value to be set, as an integer. 537 """ 538 return super()._set_speed_limits(value) 539 540 def _check_goto_parameters(self, target: Any, duration: Optional[float] = None, q0: Optional[List[float]] = None) -> None: 541 """Check the validity of the parameters for the `goto` method. 542 543 Args: 544 duration: Not required here. 545 target: The target goal, as a list [x, y, theta] in the odometry coordinate system. 546 q0: Not required here. Defaults to None. 547 548 Raises: 549 TypeError: If the x goal is not a float or int. 550 TypeError: If the y goal is not a float or int. 551 TypeError: If the theta goal is not a float or int. 552 """ 553 self._check_type_float(target[0], "x") 554 self._check_type_float(target[1], "y") 555 self._check_type_float(target[2], "theta") 556 557 try: 558 goto = self.get_goto_queue()[-1] 559 except IndexError: 560 goto = self.get_goto_playing() 561 562 if goto.id != -1: 563 odom_request = self._get_goto_request(goto) 564 else: 565 odom_request = None 566 567 if odom_request is not None: 568 base_odom = odom_request.request.target 569 x_offset = abs(target[0] - base_odom["x"]) 570 y_offset = abs(target[1] - base_odom["y"]) 571 else: 572 x_offset = abs(target[0] - self.odometry["x"]) 573 y_offset = abs(target[1] - self.odometry["y"]) 574 for pos, value in {"x": x_offset, "y": y_offset}.items(): 575 if abs(value) > self._max_xy_goto: 576 raise ValueError(f"The displacement in {pos} should not be more than {self._max_xy_goto}, got {abs(value)}") 577 578 def _check_optional_goto_parameters( 579 self, distance_tolerance: Optional[float], angle_tolerance: Optional[float], timeout: Optional[float] 580 ) -> None: 581 """Check the validity of the optional parameters for the `goto` method. 582 583 Args: 584 distance_tolerance: The distance tolerance value to be checked. 585 angle_tolerance: The angle tolerance value to be checked. 586 timeout: The timeout value to be checked. 587 588 Raises: 589 ValueError: If the distance_tolerance is negative. 590 ValueError: If the angle_tolerance is negative. 591 ValueError: If the timeout is negative or null. 592 """ 593 if distance_tolerance is not None: 594 self._check_type_float(distance_tolerance, "distance_tolerance") 595 if distance_tolerance < 0: 596 raise ValueError(f"distance_tolerance must be a positive value, got {distance_tolerance}") 597 if angle_tolerance is not None: 598 self._check_type_float(angle_tolerance, "angle_tolerance") 599 if angle_tolerance < 0: 600 raise ValueError(f"angle_tolerance must be a positive value, got {angle_tolerance}") 601 if timeout is not None: 602 self._check_type_float(timeout, "timeout") 603 if timeout <= 0: 604 raise ValueError(f"timeout must be a positive value greater than 0, got {timeout}") 605 606 def _check_type_float(self, value: Any, arg_name: str) -> None: 607 """Check the type of the value parameter. 608 609 Args: 610 value: The value to be checked. 611 612 Raises: 613 TypeError: If the value is not a float or int. 614 """ 615 if not (isinstance(value, float) | isinstance(value, int)): 616 raise TypeError(f"{arg_name} must be a float or int, got {type(value)} instead") 617 618 def set_max_xy_goto(self, value: float) -> None: 619 """Set the maximum displacement in the x and y directions for the mobile base. 620 621 Args: 622 value: The maximum displacement value to be set, in meters. 623 """ 624 self._max_xy_goto = value 625 626 def goto_posture( 627 self, 628 common_posture: str = "default", 629 duration: float = 2, 630 wait: bool = False, 631 wait_for_goto_end: bool = True, 632 interpolation_mode: str = "minimum_jerk", 633 ) -> GoToId: 634 """Mobile base is not affected by goto_posture. No command is sent.""" 635 return super().goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)
MobileBase class for controlling Reachy's mobile base.
This class provides methods to interact with and control the mobile base of a Reachy robot. It allows users to access essential information such as battery voltage and odometry, as well as send commands to move the base to specified positions or velocities. The class supports different drive modes and control modes, and provides methods for resetting the base's odometry.
Attributes:
- lidar: Lidar object for handling safety features.
52 def __init__( 53 self, 54 mb_msg: MobileBase_proto, 55 initial_state: MobileBaseState, 56 grpc_channel: grpc.Channel, 57 goto_stub: GoToServiceStub, 58 ) -> None: 59 """Initialize the MobileBase with its gRPC communication and configuration. 60 61 This sets up the gRPC communication channel and service stubs for controlling the 62 mobile base, initializes the drive and control modes. 63 It also sets up the LIDAR safety monitoring. 64 65 Args: 66 mb_msg: A MobileBase_proto message containing the configuration details for the mobile base. 67 initial_state: The initial state of the mobile base, as a MobileBaseState object. 68 grpc_channel: The gRPC channel used to communicate with the mobile base service. 69 goto_stub: The gRPC service stub for the GoTo service. 70 """ 71 self._logger = logging.getLogger(__name__) 72 super().__init__(mb_msg, grpc_channel, MobileBaseUtilityServiceStub(grpc_channel)) 73 IGoToBasedPart.__init__(self, self._part_id, goto_stub) 74 75 self._mobility_stub = MobileBaseMobilityServiceStub(grpc_channel) 76 77 self._drive_mode: str = ZuuuModePossiblities.keys()[initial_state.zuuu_mode.mode].lower() 78 self._control_mode: str = ControlModePossiblities.keys()[initial_state.control_mode.mode].lower() 79 self._battery_level = 30.0 80 81 self._max_xy_vel = 0.61 82 self._max_rot_vel = 114.0 83 self._max_xy_goto = 1.0 84 85 self.lidar = Lidar(initial_state.lidar_safety, grpc_channel, self) 86 87 self._update_with(initial_state)
Initialize the MobileBase with its gRPC communication and configuration.
This sets up the gRPC communication channel and service stubs for controlling the mobile base, initializes the drive and control modes. It also sets up the LIDAR safety monitoring.
Arguments:
- mb_msg: A MobileBase_proto message containing the configuration details for the mobile base.
- initial_state: The initial state of the mobile base, as a MobileBaseState object.
- grpc_channel: The gRPC channel used to communicate with the mobile base service.
- goto_stub: The gRPC service stub for the GoTo service.
100 @property 101 def battery_voltage(self) -> float: 102 """Return the battery voltage. 103 104 The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low, 105 a warning message is logged. 106 107 Returns: 108 The current battery voltage as a float, rounded to one decimal place. 109 """ 110 battery_level = float(round(self._battery_level, 1)) 111 if battery_level < 24.5: 112 self._logger.warning(f"Low battery level: {battery_level}V. Consider recharging.") 113 return float(round(self._battery_level, 1))
Return the battery voltage.
The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low, a warning message is logged.
Returns:
The current battery voltage as a float, rounded to one decimal place.
115 @property 116 def odometry(self) -> Dict[str, float]: 117 """Return the odometry of the base. 118 119 The odometry includes the x and y positions in meters and theta in degrees, along with the 120 velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second. 121 122 Returns: 123 A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta', 124 each rounded to three decimal places. 125 """ 126 response = self._stub.GetOdometry(self._part_id) 127 odom = { 128 "x": response.x.value, 129 "y": response.y.value, 130 "theta": rad2deg(response.theta.value), 131 "vx": response.vx.value, 132 "vy": response.vy.value, 133 "vtheta": rad2deg(response.vtheta.value), 134 } 135 return odom
Return the odometry of the base.
The odometry includes the x and y positions in meters and theta in degrees, along with the velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second.
Returns:
A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta', each rounded to three decimal places.
137 @property 138 def last_cmd_vel(self) -> Dict[str, float]: 139 """Return the last command velocity sent to the base. 140 141 The velocity includes the x and y components in meters per second and the theta component in degrees per second. 142 143 Returns: 144 A dictionary containing the last command velocity with keys 'x', 'y', and 'theta', 145 each rounded to three decimal places. 146 """ 147 response = self._mobility_stub.GetLastDirection(self._part_id) 148 cmd_vel = { 149 "x": round(response.x.value, 3), 150 "y": round(response.y.value, 3), 151 "theta": round(rad2deg(response.theta.value), 3), 152 } 153 return cmd_vel
Return the last command velocity sent to the base.
The velocity includes the x and y components in meters per second and the theta component in degrees per second.
Returns:
A dictionary containing the last command velocity with keys 'x', 'y', and 'theta', each rounded to three decimal places.
196 def is_on(self) -> bool: 197 """Check if the mobile base is currently stiff (not in free-wheel mode). 198 199 Returns: 200 `True` if the mobile base is not compliant (stiff), `False` otherwise. 201 """ 202 return not self._drive_mode == "free_wheel"
Check if the mobile base is currently stiff (not in free-wheel mode).
Returns:
True
if the mobile base is not compliant (stiff),False
otherwise.
204 def is_off(self) -> bool: 205 """Check if the mobile base is currently compliant (in free-wheel mode). 206 207 Returns: 208 True if the mobile base is compliant (in free-wheel mode), `False` otherwise. 209 """ 210 if self._drive_mode == "free_wheel": 211 return True 212 return False
Check if the mobile base is currently compliant (in free-wheel mode).
Returns:
True if the mobile base is compliant (in free-wheel mode),
False
otherwise.
214 def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]: 215 """Get the current odometry of the mobile base in its reference frame. 216 217 Args: 218 degrees (bool, optional): Whether to return the orientation (`theta` and `vtheta`) in degrees. 219 Defaults to True. 220 221 Returns: 222 Dict[str, float]: A dictionary containing the current odometry of the mobile base with: 223 - 'x': Position along the x-axis (in meters). 224 - 'y': Position along the y-axis (in meters). 225 - 'theta': Orientation (in degrees by default, radians if `degrees=False`). 226 - 'vx': Linear velocity along the x-axis (in meters per second). 227 - 'vy': Linear velocity along the y-axis (in meters per second). 228 - 'vtheta': Angular velocity (in degrees per second by default, radians if `degrees=False`). 229 """ 230 current_state = self.odometry.copy() 231 if not degrees: 232 current_state["theta"] = deg2rad(current_state["theta"]) 233 current_state["vtheta"] = deg2rad(current_state["vtheta"]) 234 235 return current_state
Get the current odometry of the mobile base in its reference frame.
Arguments:
- degrees (bool, optional): Whether to return the orientation (
theta
andvtheta
) in degrees. Defaults to True.
Returns:
Dict[str, float]: A dictionary containing the current odometry of the mobile base with:
- 'x': Position along the x-axis (in meters).
- 'y': Position along the y-axis (in meters).
- 'theta': Orientation (in degrees by default, radians if
degrees=False
).- 'vx': Linear velocity along the x-axis (in meters per second).
- 'vy': Linear velocity along the y-axis (in meters per second).
- 'vtheta': Angular velocity (in degrees per second by default, radians if
degrees=False
).
237 def goto( 238 self, 239 x: float, 240 y: float, 241 theta: float, 242 wait: bool = False, 243 degrees: bool = True, 244 distance_tolerance: Optional[float] = 0.05, 245 angle_tolerance: Optional[float] = None, 246 timeout: float = 100, 247 ) -> GoToId: 248 """Send the mobile base to a specified target position. 249 250 The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees. 251 The zero position is set when the mobile base is started or when the `reset_odometry` method is called. A timeout 252 can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for 253 reaching the target position. 254 255 Args: 256 x: The target x-coordinate in meters. 257 y: The target y-coordinate in meters. 258 theta: The target orientation in degrees. 259 wait: If True, the function waits until the movement is completed before returning. 260 Defaults to False. 261 degrees: If True, the theta value and angle_tolerance are treated as degrees. 262 Defaults to True. 263 distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters. 264 angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters. 265 timeout: Optional; the maximum time allowed to reach the target, in seconds. 266 267 Returns: 268 GoToId: The unique GoToId identifier for the movement command. 269 270 Raises: 271 TypeError: If the target is not reached and the mobile base is stopped due to an obstacle. 272 """ 273 if self.is_off(): 274 self._logger.warning("Mobile base is off. Goto not sent.") 275 return 276 277 self._check_goto_parameters(target=[x, y, theta]) 278 self._check_optional_goto_parameters(distance_tolerance, angle_tolerance, timeout) 279 280 if degrees: 281 theta = deg2rad(theta) 282 if angle_tolerance is not None: 283 angle_tolerance = deg2rad(angle_tolerance) 284 285 if angle_tolerance is None: 286 angle_tolerance = deg2rad(5.0) 287 288 vector_goal = TargetDirectionCommand( 289 id=self._part_id, 290 direction=DirectionVector( 291 x=FloatValue(value=x), 292 y=FloatValue(value=y), 293 theta=FloatValue(value=theta), 294 ), 295 ) 296 297 odometry_goal = OdometryGoal( 298 odometry_goal=vector_goal, 299 distance_tolerance=FloatValue(value=distance_tolerance), 300 angle_tolerance=FloatValue(value=angle_tolerance), 301 timeout=FloatValue(value=timeout), 302 ) 303 304 request = GoToRequest( 305 odometry_goal=odometry_goal, 306 ) 307 308 response = self._goto_stub.GoToOdometry(request) 309 310 if response.id == -1: 311 self._logger.error(f"Unable to go to requested position x={x}, y={y}, theta={theta}. No command sent.") 312 elif wait: 313 self._wait_goto(response, timeout) 314 315 return response
Send the mobile base to a specified target position.
The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees.
The zero position is set when the mobile base is started or when the reset_odometry
method is called. A timeout
can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for
reaching the target position.
Arguments:
- x: The target x-coordinate in meters.
- y: The target y-coordinate in meters.
- theta: The target orientation in degrees.
- wait: If True, the function waits until the movement is completed before returning. Defaults to False.
- degrees: If True, the theta value and angle_tolerance are treated as degrees. Defaults to True.
- distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters.
- angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters.
- timeout: Optional; the maximum time allowed to reach the target, in seconds.
Returns:
GoToId: The unique GoToId identifier for the movement command.
Raises:
- TypeError: If the target is not reached and the mobile base is stopped due to an obstacle.
317 def translate_by( 318 self, 319 x: float, 320 y: float, 321 wait: bool = False, 322 distance_tolerance: Optional[float] = 0.05, 323 timeout: float = 100, 324 ) -> GoToId: 325 """Send a target position relative to the current position of the mobile base. 326 327 The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space. 328 329 Args: 330 x: The desired translation along the x-axis in meters. 331 y: The desired translation along the y-axis in meters. 332 wait: If True, the function waits until the movement is completed before returning. 333 distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters. 334 timeout: An optional timeout for reaching the target position, in seconds. 335 336 Returns: 337 The GoToId of the movement command, created using the `goto` method. 338 """ 339 try: 340 goto = self.get_goto_queue()[-1] 341 except IndexError: 342 goto = self.get_goto_playing() 343 344 if goto.id != -1: 345 odom_request = self._get_goto_request(goto) 346 else: 347 odom_request = None 348 349 angle_tolerance = None 350 351 if odom_request is not None: 352 base_odom = odom_request.request.target 353 angle_tolerance = deg2rad(odom_request.request.angle_tolerance) 354 else: 355 base_odom = self.odometry 356 357 theta_goal = deg2rad(base_odom["theta"]) 358 x_goal = base_odom["x"] + (x * np.cos(theta_goal) - y * np.sin(theta_goal)) 359 y_goal = base_odom["y"] + (x * np.sin(theta_goal) + y * np.cos(theta_goal)) 360 361 return self.goto( 362 x_goal, 363 y_goal, 364 theta_goal, 365 wait=wait, 366 distance_tolerance=distance_tolerance, 367 angle_tolerance=angle_tolerance, 368 degrees=False, 369 timeout=timeout, 370 )
Send a target position relative to the current position of the mobile base.
The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space.
Arguments:
- x: The desired translation along the x-axis in meters.
- y: The desired translation along the y-axis in meters.
- wait: If True, the function waits until the movement is completed before returning.
- distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters.
- timeout: An optional timeout for reaching the target position, in seconds.
Returns:
The GoToId of the movement command, created using the
goto
method.
372 def rotate_by( 373 self, 374 theta: float, 375 wait: bool = False, 376 degrees: bool = True, 377 angle_tolerance: Optional[float] = None, 378 timeout: float = 100, 379 ) -> GoToId: 380 """Send a target rotation relative to the current rotation of the mobile base. 381 382 The theta parameter defines the desired rotation in degrees. 383 384 Args: 385 theta: The desired rotation in degrees, relative to the current orientation. 386 wait: If True, the function waits until the rotation is completed before returning. 387 degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians. 388 angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished. 389 timeout: An optional timeout for completing the rotation, in seconds. 390 """ 391 try: 392 goto = self.get_goto_queue()[-1] 393 except IndexError: 394 goto = self.get_goto_playing() 395 396 if goto.id != -1: 397 odom_request = self._get_goto_request(goto) 398 else: 399 odom_request = None 400 401 distance_tolerance = 0.05 402 403 if odom_request is not None: 404 base_odom = odom_request.request.target 405 if angle_tolerance is None: 406 angle_tolerance = odom_request.request.angle_tolerance 407 distance_tolerance = odom_request.request.distance_tolerance 408 else: 409 base_odom = self.odometry 410 411 if degrees: 412 theta = base_odom["theta"] + theta 413 else: 414 theta = deg2rad(base_odom["theta"]) + theta 415 x = base_odom["x"] 416 y = base_odom["y"] 417 418 return self.goto( 419 x, 420 y, 421 theta, 422 wait=wait, 423 degrees=degrees, 424 distance_tolerance=distance_tolerance, 425 angle_tolerance=angle_tolerance, 426 timeout=timeout, 427 )
Send a target rotation relative to the current rotation of the mobile base.
The theta parameter defines the desired rotation in degrees.
Arguments:
- theta: The desired rotation in degrees, relative to the current orientation.
- wait: If True, the function waits until the rotation is completed before returning.
- degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians.
- angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished.
- timeout: An optional timeout for completing the rotation, in seconds.
429 def reset_odometry(self) -> None: 430 """Reset the odometry. 431 432 This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0). 433 If any goto is being played, stop the goto and the queued ones. 434 """ 435 if self.get_goto_playing().id != -1 or len(self.get_goto_queue()) != 0: 436 self._logger.warning( 437 "Odometry reset requested while goto in progress: aborting the current goto and all queued gotos." 438 ) 439 self._stub.ResetOdometry(self._part_id) 440 time.sleep(0.05)
Reset the odometry.
This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0). If any goto is being played, stop the goto and the queued ones.
442 def set_goal_speed(self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None: 443 """Set the goal speed for the mobile base. 444 445 This method sets the target velocities for the mobile base's movement along the x and y axes, as well as 446 its rotational speed. The actual movement is executed after calling `send_speed_command`. 447 448 Args: 449 vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0. 450 vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0. 451 vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0. 452 453 Raises: 454 TypeError: If any of the velocity values (`vx`, `vy`, `vtheta`) are not of type `float` or `int`. 455 456 Notes: 457 - Use `send_speed_command` after this method to execute the movement. 458 - The velocities will be used to command the mobile base for a short duration (0.2 seconds). 459 """ 460 for vel in [vx, vy, vtheta]: 461 if not isinstance(vel, float) | isinstance(vel, int): 462 raise TypeError("goal_speed must be a float or int") 463 464 self._x_vel_goal = vx 465 self._y_vel_goal = vy 466 self._rot_vel_goal = vtheta
Set the goal speed for the mobile base.
This method sets the target velocities for the mobile base's movement along the x and y axes, as well as
its rotational speed. The actual movement is executed after calling send_speed_command
.
Arguments:
- vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0.
- vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0.
- vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0.
Raises:
- TypeError: If any of the velocity values (
vx
,vy
,vtheta
) are not of typefloat
orint
.
Notes:
- Use
send_speed_command
after this method to execute the movement.- The velocities will be used to command the mobile base for a short duration (0.2 seconds).
468 def send_speed_command(self) -> None: 469 """Send the speed command to the mobile base, based on previously set goal speeds. 470 471 This method sends the velocity commands for the mobile base that were set with `set_goal_speed`. 472 The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code. 473 474 Raises: 475 ValueError: If the absolute value of `x_vel`, `y_vel`, or `rot_vel` exceeds the configured maximum values. 476 Warning: If the mobile base is off, no command is sent, and a warning is logged. 477 478 Notes: 479 - This method is optimal for sending frequent speed instructions to the mobile base. 480 - The goal velocities must be set with `set_goal_speed` before calling this function. 481 """ 482 if self.is_off(): 483 self._logger.warning(f"{self._part_id.name} is off. speed_command not sent.") 484 return 485 for vel, value in {"x_vel": self._x_vel_goal, "y_vel": self._y_vel_goal}.items(): 486 if abs(value) > self._max_xy_vel: 487 raise ValueError(f"The absolute value of {vel} should not be more than {self._max_xy_vel}, got {abs(value)}") 488 489 if abs(self._rot_vel_goal) > self._max_rot_vel: 490 raise ValueError( 491 f"The absolute value of rot_vel should not be more than {self._max_rot_vel}, got {abs(self._rot_vel_goal)}" 492 ) 493 494 if self._drive_mode != "cmd_vel": 495 self._set_drive_mode("cmd_vel") 496 497 req = TargetDirectionCommand( 498 direction=DirectionVector( 499 x=FloatValue(value=self._x_vel_goal), 500 y=FloatValue(value=self._y_vel_goal), 501 theta=FloatValue(value=deg2rad(self._rot_vel_goal)), 502 ) 503 ) 504 self._mobility_stub.SendDirection(req)
Send the speed command to the mobile base, based on previously set goal speeds.
This method sends the velocity commands for the mobile base that were set with set_goal_speed
.
The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code.
Raises:
- ValueError: If the absolute value of
x_vel
,y_vel
, orrot_vel
exceeds the configured maximum values. - Warning: If the mobile base is off, no command is sent, and a warning is logged.
Notes:
- This method is optimal for sending frequent speed instructions to the mobile base.
- The goal velocities must be set with
set_goal_speed
before calling this function.
618 def set_max_xy_goto(self, value: float) -> None: 619 """Set the maximum displacement in the x and y directions for the mobile base. 620 621 Args: 622 value: The maximum displacement value to be set, in meters. 623 """ 624 self._max_xy_goto = value
Set the maximum displacement in the x and y directions for the mobile base.
Arguments:
- value: The maximum displacement value to be set, in meters.
626 def goto_posture( 627 self, 628 common_posture: str = "default", 629 duration: float = 2, 630 wait: bool = False, 631 wait_for_goto_end: bool = True, 632 interpolation_mode: str = "minimum_jerk", 633 ) -> GoToId: 634 """Mobile base is not affected by goto_posture. No command is sent.""" 635 return super().goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)
Mobile base is not affected by goto_posture. No command is sent.