reachy2_sdk.parts.mobile_base

Reachy MobileBase module.

Handles all specific methods to a MobileBase.

  1"""Reachy MobileBase module.
  2
  3Handles all specific methods to a MobileBase.
  4"""
  5
  6import logging
  7import time
  8from typing import Any, Dict, List, Optional
  9
 10import grpc
 11import numpy as np
 12from google.protobuf.wrappers_pb2 import FloatValue
 13from numpy import deg2rad, rad2deg, round
 14from reachy2_sdk_api.goto_pb2 import GoToId, GoToRequest, OdometryGoal
 15from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
 16from reachy2_sdk_api.mobile_base_mobility_pb2 import (
 17    DirectionVector,
 18    TargetDirectionCommand,
 19)
 20from reachy2_sdk_api.mobile_base_mobility_pb2_grpc import MobileBaseMobilityServiceStub
 21from reachy2_sdk_api.mobile_base_utility_pb2 import (
 22    ControlModeCommand,
 23    ControlModePossiblities,
 24)
 25from reachy2_sdk_api.mobile_base_utility_pb2 import MobileBase as MobileBase_proto
 26from reachy2_sdk_api.mobile_base_utility_pb2 import (
 27    MobileBaseState,
 28    MobileBaseStatus,
 29    ZuuuModeCommand,
 30    ZuuuModePossiblities,
 31)
 32from reachy2_sdk_api.mobile_base_utility_pb2_grpc import MobileBaseUtilityServiceStub
 33
 34from ..sensors.lidar import Lidar
 35from .goto_based_part import IGoToBasedPart
 36from .part import Part
 37
 38
 39class MobileBase(Part, IGoToBasedPart):
 40    """MobileBase class for controlling Reachy's mobile base.
 41
 42    This class provides methods to interact with and control the mobile base of a Reachy robot. It allows
 43    users to access essential information such as battery voltage and odometry, as well as send commands
 44    to move the base to specified positions or velocities. The class supports different drive modes and
 45    control modes, and provides methods for resetting the base's odometry.
 46
 47    Attributes:
 48        lidar: Lidar object for handling safety features.
 49    """
 50
 51    def __init__(
 52        self,
 53        mb_msg: MobileBase_proto,
 54        initial_state: MobileBaseState,
 55        grpc_channel: grpc.Channel,
 56        goto_stub: GoToServiceStub,
 57    ) -> None:
 58        """Initialize the MobileBase with its gRPC communication and configuration.
 59
 60        This sets up the gRPC communication channel and service stubs for controlling the
 61        mobile base, initializes the drive and control modes.
 62        It also sets up the LIDAR safety monitoring.
 63
 64        Args:
 65            mb_msg: A MobileBase_proto message containing the configuration details for the mobile base.
 66            initial_state: The initial state of the mobile base, as a MobileBaseState object.
 67            grpc_channel: The gRPC channel used to communicate with the mobile base service.
 68            goto_stub: The gRPC service stub for the GoTo service.
 69        """
 70        self._logger = logging.getLogger(__name__)
 71        super().__init__(mb_msg, grpc_channel, MobileBaseUtilityServiceStub(grpc_channel))
 72        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
 73
 74        self._mobility_stub = MobileBaseMobilityServiceStub(grpc_channel)
 75
 76        self._drive_mode: str = ZuuuModePossiblities.keys()[initial_state.zuuu_mode.mode].lower()
 77        self._control_mode: str = ControlModePossiblities.keys()[initial_state.control_mode.mode].lower()
 78        self._battery_level = 30.0
 79
 80        self._max_xy_vel = 0.61
 81        self._max_rot_vel = 114.0
 82        self._max_xy_goto = 1.0
 83
 84        self.lidar = Lidar(initial_state.lidar_safety, grpc_channel, self)
 85
 86        self._update_with(initial_state)
 87
 88    def __repr__(self) -> str:
 89        """Clean representation of a mobile base."""
 90        repr_template = (
 91            "<MobileBase on={on} \n" " lidar_safety_enabled={lidar_safety_enabled} \n" " battery_voltage={battery_voltage}>"
 92        )
 93        return repr_template.format(
 94            on=self.is_on(),
 95            lidar_safety_enabled=self.lidar.safety_enabled,
 96            battery_voltage=self.battery_voltage,
 97        )
 98
 99    @property
100    def battery_voltage(self) -> float:
101        """Return the battery voltage.
102
103        The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low,
104        a warning message is logged.
105
106        Returns:
107            The current battery voltage as a float, rounded to one decimal place.
108        """
109        battery_level = float(round(self._battery_level, 1))
110        if battery_level < 24.5:
111            self._logger.warning(f"Low battery level: {battery_level}V. Consider recharging.")
112        return float(round(self._battery_level, 1))
113
114    @property
115    def odometry(self) -> Dict[str, float]:
116        """Return the odometry of the base.
117
118        The odometry includes the x and y positions in meters and theta in degrees, along with the
119        velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second.
120
121        Returns:
122            A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta',
123            each rounded to three decimal places.
124        """
125        response = self._stub.GetOdometry(self._part_id)
126        odom = {
127            "x": response.x.value,
128            "y": response.y.value,
129            "theta": rad2deg(response.theta.value),
130            "vx": response.vx.value,
131            "vy": response.vy.value,
132            "vtheta": rad2deg(response.vtheta.value),
133        }
134        return odom
135
136    @property
137    def last_cmd_vel(self) -> Dict[str, float]:
138        """Return the last command velocity sent to the base.
139
140        The velocity includes the x and y components in meters per second and the theta component in degrees per second.
141
142        Returns:
143            A dictionary containing the last command velocity with keys 'x', 'y', and 'theta',
144            each rounded to three decimal places.
145        """
146        response = self._mobility_stub.GetLastDirection(self._part_id)
147        cmd_vel = {
148            "x": round(response.x.value, 3),
149            "y": round(response.y.value, 3),
150            "theta": round(rad2deg(response.theta.value), 3),
151        }
152        return cmd_vel
153
154    def _set_drive_mode(self, mode: str) -> None:
155        """Set the base's drive mode.
156
157        The drive mode must be one of the allowed modes, excluding 'speed' and 'goto'. If the mode is
158        valid, the base's drive mode is set accordingly.
159
160        Args:
161            mode: The desired drive mode as a string. Possible drive modes are:
162                ['cmd_vel', 'brake', 'free_wheel', 'emergency_stop', 'cmd_goto'].
163
164        Raises:
165            ValueError: If the specified drive mode is not one of the allowed modes.
166        """
167        all_drive_modes = [mode.lower() for mode in ZuuuModePossiblities.keys()][1:]
168        possible_drive_modes = [mode for mode in all_drive_modes if mode not in ("speed", "goto")]
169        if mode in possible_drive_modes:
170            req = ZuuuModeCommand(mode=getattr(ZuuuModePossiblities, mode.upper()))
171            self._stub.SetZuuuMode(req)
172            self._drive_mode = mode
173        else:
174            raise ValueError(f"Drive mode requested should be in {possible_drive_modes}!")
175
176    def _set_control_mode(self, mode: str) -> None:
177        """Set the base's control mode.
178
179        The control mode must be one of the allowed modes. If the mode is valid, the base's control mode is set accordingly.
180
181        Args:
182            mode: The desired control mode as a string. Possible control modes are: ['open_loop', 'pid']
183
184        Raises:
185            ValueError: If the specified control mode is not one of the allowed modes.
186        """
187        possible_control_modes = [mode.lower() for mode in ControlModePossiblities.keys()][1:]
188        if mode in possible_control_modes:
189            req = ControlModeCommand(mode=getattr(ControlModePossiblities, mode.upper()))
190            self._stub.SetControlMode(req)
191            self._control_mode = mode
192        else:
193            raise ValueError(f"Control mode requested should be in {possible_control_modes}!")
194
195    def is_on(self) -> bool:
196        """Check if the mobile base is currently stiff (not in free-wheel mode).
197
198        Returns:
199            `True` if the mobile base is not compliant (stiff), `False` otherwise.
200        """
201        return not self._drive_mode == "free_wheel"
202
203    def is_off(self) -> bool:
204        """Check if the mobile base is currently compliant (in free-wheel mode).
205
206        Returns:
207            True if the mobile base is compliant (in free-wheel mode), `False` otherwise.
208        """
209        if self._drive_mode == "free_wheel":
210            return True
211        return False
212
213    def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]:
214        """Get the current odometry of the mobile base in its reference frame.
215
216        Args:
217            degrees (bool, optional): Whether to return the orientation (`theta` and `vtheta`) in degrees.
218                                    Defaults to True.
219
220        Returns:
221            Dict[str, float]: A dictionary containing the current odometry of the mobile base with:
222            - 'x': Position along the x-axis (in meters).
223            - 'y': Position along the y-axis (in meters).
224            - 'theta': Orientation (in degrees by default, radians if `degrees=False`).
225            - 'vx': Linear velocity along the x-axis (in meters per second).
226            - 'vy': Linear velocity along the y-axis (in meters per second).
227            - 'vtheta': Angular velocity (in degrees per second by default, radians if `degrees=False`).
228        """
229        current_state = self.odometry.copy()
230        if not degrees:
231            current_state["theta"] = deg2rad(current_state["theta"])
232            current_state["vtheta"] = deg2rad(current_state["vtheta"])
233
234        return current_state
235
236    def goto(
237        self,
238        x: float,
239        y: float,
240        theta: float,
241        wait: bool = False,
242        degrees: bool = True,
243        distance_tolerance: Optional[float] = 0.05,
244        angle_tolerance: Optional[float] = None,
245        timeout: float = 100,
246    ) -> GoToId:
247        """Send the mobile base to a specified target position.
248
249        The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees.
250        The zero position is set when the mobile base is started or when the `reset_odometry` method is called. A timeout
251        can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for
252        reaching the target position.
253
254        Args:
255            x: The target x-coordinate in meters.
256            y: The target y-coordinate in meters.
257            theta: The target orientation in degrees.
258            wait: If True, the function waits until the movement is completed before returning.
259                    Defaults to False.
260            degrees: If True, the theta value and angle_tolerance are treated as degrees.
261                    Defaults to True.
262            distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters.
263            angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters.
264            timeout: Optional; the maximum time allowed to reach the target, in seconds.
265
266        Returns:
267            GoToId: The unique GoToId identifier for the movement command.
268
269        Raises:
270            TypeError: If the target is not reached and the mobile base is stopped due to an obstacle.
271        """
272        if self.is_off():
273            self._logger.warning("Mobile base is off. Goto not sent.")
274            return
275
276        self._check_goto_parameters(target=[x, y, theta])
277        self._check_optional_goto_parameters(distance_tolerance, angle_tolerance, timeout)
278
279        if degrees:
280            theta = deg2rad(theta)
281            if angle_tolerance is not None:
282                angle_tolerance = deg2rad(angle_tolerance)
283
284        if angle_tolerance is None:
285            angle_tolerance = deg2rad(5.0)
286
287        vector_goal = TargetDirectionCommand(
288            id=self._part_id,
289            direction=DirectionVector(
290                x=FloatValue(value=x),
291                y=FloatValue(value=y),
292                theta=FloatValue(value=theta),
293            ),
294        )
295
296        odometry_goal = OdometryGoal(
297            odometry_goal=vector_goal,
298            distance_tolerance=FloatValue(value=distance_tolerance),
299            angle_tolerance=FloatValue(value=angle_tolerance),
300            timeout=FloatValue(value=timeout),
301        )
302
303        request = GoToRequest(
304            odometry_goal=odometry_goal,
305        )
306
307        response = self._goto_stub.GoToOdometry(request)
308
309        if response.id == -1:
310            self._logger.error(f"Unable to go to requested position x={x}, y={y}, theta={theta}. No command sent.")
311        elif wait:
312            self._wait_goto(response, timeout)
313
314        return response
315
316    def translate_by(
317        self,
318        x: float,
319        y: float,
320        wait: bool = False,
321        distance_tolerance: Optional[float] = 0.05,
322        timeout: float = 100,
323    ) -> GoToId:
324        """Send a target position relative to the current position of the mobile base.
325
326        The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space.
327
328        Args:
329            x: The desired translation along the x-axis in meters.
330            y: The desired translation along the y-axis in meters.
331            wait:  If True, the function waits until the movement is completed before returning.
332            distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters.
333            timeout: An optional timeout for reaching the target position, in seconds.
334
335        Returns:
336            The GoToId of the movement command, created using the `goto` method.
337        """
338        try:
339            goto = self.get_goto_queue()[-1]
340        except IndexError:
341            goto = self.get_goto_playing()
342
343        if goto.id != -1:
344            odom_request = self._get_goto_request(goto)
345        else:
346            odom_request = None
347
348        angle_tolerance = None
349
350        if odom_request is not None:
351            base_odom = odom_request.request.target
352            angle_tolerance = deg2rad(odom_request.request.angle_tolerance)
353        else:
354            base_odom = self.odometry
355
356        theta_goal = deg2rad(base_odom["theta"])
357        x_goal = base_odom["x"] + (x * np.cos(theta_goal) - y * np.sin(theta_goal))
358        y_goal = base_odom["y"] + (x * np.sin(theta_goal) + y * np.cos(theta_goal))
359
360        return self.goto(
361            x_goal,
362            y_goal,
363            theta_goal,
364            wait=wait,
365            distance_tolerance=distance_tolerance,
366            angle_tolerance=angle_tolerance,
367            degrees=False,
368            timeout=timeout,
369        )
370
371    def rotate_by(
372        self,
373        theta: float,
374        wait: bool = False,
375        degrees: bool = True,
376        angle_tolerance: Optional[float] = None,
377        timeout: float = 100,
378    ) -> GoToId:
379        """Send a target rotation relative to the current rotation of the mobile base.
380
381        The theta parameter defines the desired rotation in degrees.
382
383        Args:
384            theta: The desired rotation in degrees, relative to the current orientation.
385            wait: If True, the function waits until the rotation is completed before returning.
386            degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians.
387            angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished.
388            timeout: An optional timeout for completing the rotation, in seconds.
389        """
390        try:
391            goto = self.get_goto_queue()[-1]
392        except IndexError:
393            goto = self.get_goto_playing()
394
395        if goto.id != -1:
396            odom_request = self._get_goto_request(goto)
397        else:
398            odom_request = None
399
400        distance_tolerance = 0.05
401
402        if odom_request is not None:
403            base_odom = odom_request.request.target
404            if angle_tolerance is None:
405                angle_tolerance = odom_request.request.angle_tolerance
406            distance_tolerance = odom_request.request.distance_tolerance
407        else:
408            base_odom = self.odometry
409
410        if degrees:
411            theta = base_odom["theta"] + theta
412        else:
413            theta = deg2rad(base_odom["theta"]) + theta
414        x = base_odom["x"]
415        y = base_odom["y"]
416
417        return self.goto(
418            x,
419            y,
420            theta,
421            wait=wait,
422            degrees=degrees,
423            distance_tolerance=distance_tolerance,
424            angle_tolerance=angle_tolerance,
425            timeout=timeout,
426        )
427
428    def reset_odometry(self) -> None:
429        """Reset the odometry.
430
431        This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0).
432        If any goto is being played, stop the goto and the queued ones.
433        """
434        if self.get_goto_playing().id != -1 or len(self.get_goto_queue()) != 0:
435            self._logger.warning(
436                "Odometry reset requested while goto in progress: aborting the current goto and all queued gotos."
437            )
438        self._stub.ResetOdometry(self._part_id)
439        time.sleep(0.05)
440
441    def set_goal_speed(self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None:
442        """Set the goal speed for the mobile base.
443
444        This method sets the target velocities for the mobile base's movement along the x and y axes, as well as
445        its rotational speed. The actual movement is executed after calling `send_speed_command`.
446
447        Args:
448            vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0.
449            vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0.
450            vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0.
451
452        Raises:
453            TypeError: If any of the velocity values (`vx`, `vy`, `vtheta`) are not of type `float` or `int`.
454
455        Notes:
456            - Use `send_speed_command` after this method to execute the movement.
457            - The velocities will be used to command the mobile base for a short duration (0.2 seconds).
458        """
459        for vel in [vx, vy, vtheta]:
460            if not isinstance(vel, float) | isinstance(vel, int):
461                raise TypeError("goal_speed must be a float or int")
462
463        self._x_vel_goal = vx
464        self._y_vel_goal = vy
465        self._rot_vel_goal = vtheta
466
467    def send_speed_command(self) -> None:
468        """Send the speed command to the mobile base, based on previously set goal speeds.
469
470        This method sends the velocity commands for the mobile base that were set with `set_goal_speed`.
471        The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code.
472
473        Raises:
474            ValueError: If the absolute value of `x_vel`, `y_vel`, or `rot_vel` exceeds the configured maximum values.
475            Warning: If the mobile base is off, no command is sent, and a warning is logged.
476
477        Notes:
478            - This method is optimal for sending frequent speed instructions to the mobile base.
479            - The goal velocities must be set with `set_goal_speed` before calling this function.
480        """
481        if self.is_off():
482            self._logger.warning(f"{self._part_id.name} is off. speed_command not sent.")
483            return
484        for vel, value in {"x_vel": self._x_vel_goal, "y_vel": self._y_vel_goal}.items():
485            if abs(value) > self._max_xy_vel:
486                raise ValueError(f"The absolute value of {vel} should not be more than {self._max_xy_vel}, got {abs(value)}")
487
488        if abs(self._rot_vel_goal) > self._max_rot_vel:
489            raise ValueError(
490                f"The absolute value of rot_vel should not be more than {self._max_rot_vel}, got {abs(self._rot_vel_goal)}"
491            )
492
493        if self._drive_mode != "cmd_vel":
494            self._set_drive_mode("cmd_vel")
495
496        req = TargetDirectionCommand(
497            direction=DirectionVector(
498                x=FloatValue(value=self._x_vel_goal),
499                y=FloatValue(value=self._y_vel_goal),
500                theta=FloatValue(value=deg2rad(self._rot_vel_goal)),
501            )
502        )
503        self._mobility_stub.SendDirection(req)
504
505    def _update_with(self, new_state: MobileBaseState) -> None:
506        """Update the mobile base's state with newly received data from the gRPC server.
507
508        This method updates the battery level, LIDAR safety information, drive mode, and control mode
509        of the mobile base.
510
511        Args:
512            new_state: The new state of the mobile base, as a MobileBaseState object.
513        """
514        self._battery_level = new_state.battery_level.level.value
515        self.lidar._update_with(new_state.lidar_safety)
516        self._drive_mode = ZuuuModePossiblities.keys()[new_state.zuuu_mode.mode].lower()
517        self._control_mode = ControlModePossiblities.keys()[new_state.control_mode.mode].lower()
518
519    def _update_audit_status(self, new_status: MobileBaseStatus) -> None:
520        """Update the audit status of the mobile base.
521
522        This is a placeholder method and does not perform any actions.
523
524        Args:
525            new_status: The new status of the mobile base, as a MobileBaseStatus object.
526        """
527        pass  # pragma: no cover
528
529    def _set_speed_limits(self, value: int) -> None:
530        """Set the speed limits for the mobile base.
531
532        This method overrides the base class implementation to set speed limits.
533
534        Args:
535            value: The speed limit value to be set, as an integer.
536        """
537        return super()._set_speed_limits(value)
538
539    def _check_goto_parameters(self, target: Any, duration: Optional[float] = None, q0: Optional[List[float]] = None) -> None:
540        """Check the validity of the parameters for the `goto` method.
541
542        Args:
543            duration: Not required here.
544            target: The target goal, as a list [x, y, theta] in the odometry coordinate system.
545            q0: Not required here. Defaults to None.
546
547        Raises:
548            TypeError: If the x goal is not a float or int.
549            TypeError: If the y goal is not a float or int.
550            TypeError: If the theta goal is not a float or int.
551        """
552        self._check_type_float(target[0], "x")
553        self._check_type_float(target[1], "y")
554        self._check_type_float(target[2], "theta")
555
556        try:
557            goto = self.get_goto_queue()[-1]
558        except IndexError:
559            goto = self.get_goto_playing()
560
561        if goto.id != -1:
562            odom_request = self._get_goto_request(goto)
563        else:
564            odom_request = None
565
566        if odom_request is not None:
567            base_odom = odom_request.request.target
568            x_offset = abs(target[0] - base_odom["x"])
569            y_offset = abs(target[1] - base_odom["y"])
570        else:
571            x_offset = abs(target[0] - self.odometry["x"])
572            y_offset = abs(target[1] - self.odometry["y"])
573        for pos, value in {"x": x_offset, "y": y_offset}.items():
574            if abs(value) > self._max_xy_goto:
575                raise ValueError(f"The displacement in {pos} should not be more than {self._max_xy_goto}, got {abs(value)}")
576
577    def _check_optional_goto_parameters(
578        self, distance_tolerance: Optional[float], angle_tolerance: Optional[float], timeout: Optional[float]
579    ) -> None:
580        """Check the validity of the optional parameters for the `goto` method.
581
582        Args:
583            distance_tolerance: The distance tolerance value to be checked.
584            angle_tolerance: The angle tolerance value to be checked.
585            timeout: The timeout value to be checked.
586
587        Raises:
588            ValueError: If the distance_tolerance is negative.
589            ValueError: If the angle_tolerance is negative.
590            ValueError: If the timeout is negative or null.
591        """
592        if distance_tolerance is not None:
593            self._check_type_float(distance_tolerance, "distance_tolerance")
594            if distance_tolerance < 0:
595                raise ValueError(f"distance_tolerance must be a positive value, got {distance_tolerance}")
596        if angle_tolerance is not None:
597            self._check_type_float(angle_tolerance, "angle_tolerance")
598            if angle_tolerance < 0:
599                raise ValueError(f"angle_tolerance must be a positive value, got {angle_tolerance}")
600        if timeout is not None:
601            self._check_type_float(timeout, "timeout")
602            if timeout <= 0:
603                raise ValueError(f"timeout must be a positive value greater than 0, got {timeout}")
604
605    def _check_type_float(self, value: Any, arg_name: str) -> None:
606        """Check the type of the value parameter.
607
608        Args:
609            value: The value to be checked.
610
611        Raises:
612            TypeError: If the value is not a float or int.
613        """
614        if not (isinstance(value, float) | isinstance(value, int)):
615            raise TypeError(f"{arg_name} must be a float or int, got {type(value)} instead")
616
617    def set_max_xy_goto(self, value: float) -> None:
618        """Set the maximum displacement in the x and y directions for the mobile base.
619
620        Args:
621            value: The maximum displacement value to be set, in meters.
622        """
623        self._max_xy_goto = value
624
625    def goto_posture(
626        self,
627        common_posture: str = "default",
628        duration: float = 2,
629        wait: bool = False,
630        wait_for_goto_end: bool = True,
631        interpolation_mode: str = "minimum_jerk",
632    ) -> GoToId:
633        """Mobile base is not affected by goto_posture. No command is sent."""
634        return super().goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)
 40class MobileBase(Part, IGoToBasedPart):
 41    """MobileBase class for controlling Reachy's mobile base.
 42
 43    This class provides methods to interact with and control the mobile base of a Reachy robot. It allows
 44    users to access essential information such as battery voltage and odometry, as well as send commands
 45    to move the base to specified positions or velocities. The class supports different drive modes and
 46    control modes, and provides methods for resetting the base's odometry.
 47
 48    Attributes:
 49        lidar: Lidar object for handling safety features.
 50    """
 51
 52    def __init__(
 53        self,
 54        mb_msg: MobileBase_proto,
 55        initial_state: MobileBaseState,
 56        grpc_channel: grpc.Channel,
 57        goto_stub: GoToServiceStub,
 58    ) -> None:
 59        """Initialize the MobileBase with its gRPC communication and configuration.
 60
 61        This sets up the gRPC communication channel and service stubs for controlling the
 62        mobile base, initializes the drive and control modes.
 63        It also sets up the LIDAR safety monitoring.
 64
 65        Args:
 66            mb_msg: A MobileBase_proto message containing the configuration details for the mobile base.
 67            initial_state: The initial state of the mobile base, as a MobileBaseState object.
 68            grpc_channel: The gRPC channel used to communicate with the mobile base service.
 69            goto_stub: The gRPC service stub for the GoTo service.
 70        """
 71        self._logger = logging.getLogger(__name__)
 72        super().__init__(mb_msg, grpc_channel, MobileBaseUtilityServiceStub(grpc_channel))
 73        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
 74
 75        self._mobility_stub = MobileBaseMobilityServiceStub(grpc_channel)
 76
 77        self._drive_mode: str = ZuuuModePossiblities.keys()[initial_state.zuuu_mode.mode].lower()
 78        self._control_mode: str = ControlModePossiblities.keys()[initial_state.control_mode.mode].lower()
 79        self._battery_level = 30.0
 80
 81        self._max_xy_vel = 0.61
 82        self._max_rot_vel = 114.0
 83        self._max_xy_goto = 1.0
 84
 85        self.lidar = Lidar(initial_state.lidar_safety, grpc_channel, self)
 86
 87        self._update_with(initial_state)
 88
 89    def __repr__(self) -> str:
 90        """Clean representation of a mobile base."""
 91        repr_template = (
 92            "<MobileBase on={on} \n" " lidar_safety_enabled={lidar_safety_enabled} \n" " battery_voltage={battery_voltage}>"
 93        )
 94        return repr_template.format(
 95            on=self.is_on(),
 96            lidar_safety_enabled=self.lidar.safety_enabled,
 97            battery_voltage=self.battery_voltage,
 98        )
 99
100    @property
101    def battery_voltage(self) -> float:
102        """Return the battery voltage.
103
104        The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low,
105        a warning message is logged.
106
107        Returns:
108            The current battery voltage as a float, rounded to one decimal place.
109        """
110        battery_level = float(round(self._battery_level, 1))
111        if battery_level < 24.5:
112            self._logger.warning(f"Low battery level: {battery_level}V. Consider recharging.")
113        return float(round(self._battery_level, 1))
114
115    @property
116    def odometry(self) -> Dict[str, float]:
117        """Return the odometry of the base.
118
119        The odometry includes the x and y positions in meters and theta in degrees, along with the
120        velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second.
121
122        Returns:
123            A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta',
124            each rounded to three decimal places.
125        """
126        response = self._stub.GetOdometry(self._part_id)
127        odom = {
128            "x": response.x.value,
129            "y": response.y.value,
130            "theta": rad2deg(response.theta.value),
131            "vx": response.vx.value,
132            "vy": response.vy.value,
133            "vtheta": rad2deg(response.vtheta.value),
134        }
135        return odom
136
137    @property
138    def last_cmd_vel(self) -> Dict[str, float]:
139        """Return the last command velocity sent to the base.
140
141        The velocity includes the x and y components in meters per second and the theta component in degrees per second.
142
143        Returns:
144            A dictionary containing the last command velocity with keys 'x', 'y', and 'theta',
145            each rounded to three decimal places.
146        """
147        response = self._mobility_stub.GetLastDirection(self._part_id)
148        cmd_vel = {
149            "x": round(response.x.value, 3),
150            "y": round(response.y.value, 3),
151            "theta": round(rad2deg(response.theta.value), 3),
152        }
153        return cmd_vel
154
155    def _set_drive_mode(self, mode: str) -> None:
156        """Set the base's drive mode.
157
158        The drive mode must be one of the allowed modes, excluding 'speed' and 'goto'. If the mode is
159        valid, the base's drive mode is set accordingly.
160
161        Args:
162            mode: The desired drive mode as a string. Possible drive modes are:
163                ['cmd_vel', 'brake', 'free_wheel', 'emergency_stop', 'cmd_goto'].
164
165        Raises:
166            ValueError: If the specified drive mode is not one of the allowed modes.
167        """
168        all_drive_modes = [mode.lower() for mode in ZuuuModePossiblities.keys()][1:]
169        possible_drive_modes = [mode for mode in all_drive_modes if mode not in ("speed", "goto")]
170        if mode in possible_drive_modes:
171            req = ZuuuModeCommand(mode=getattr(ZuuuModePossiblities, mode.upper()))
172            self._stub.SetZuuuMode(req)
173            self._drive_mode = mode
174        else:
175            raise ValueError(f"Drive mode requested should be in {possible_drive_modes}!")
176
177    def _set_control_mode(self, mode: str) -> None:
178        """Set the base's control mode.
179
180        The control mode must be one of the allowed modes. If the mode is valid, the base's control mode is set accordingly.
181
182        Args:
183            mode: The desired control mode as a string. Possible control modes are: ['open_loop', 'pid']
184
185        Raises:
186            ValueError: If the specified control mode is not one of the allowed modes.
187        """
188        possible_control_modes = [mode.lower() for mode in ControlModePossiblities.keys()][1:]
189        if mode in possible_control_modes:
190            req = ControlModeCommand(mode=getattr(ControlModePossiblities, mode.upper()))
191            self._stub.SetControlMode(req)
192            self._control_mode = mode
193        else:
194            raise ValueError(f"Control mode requested should be in {possible_control_modes}!")
195
196    def is_on(self) -> bool:
197        """Check if the mobile base is currently stiff (not in free-wheel mode).
198
199        Returns:
200            `True` if the mobile base is not compliant (stiff), `False` otherwise.
201        """
202        return not self._drive_mode == "free_wheel"
203
204    def is_off(self) -> bool:
205        """Check if the mobile base is currently compliant (in free-wheel mode).
206
207        Returns:
208            True if the mobile base is compliant (in free-wheel mode), `False` otherwise.
209        """
210        if self._drive_mode == "free_wheel":
211            return True
212        return False
213
214    def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]:
215        """Get the current odometry of the mobile base in its reference frame.
216
217        Args:
218            degrees (bool, optional): Whether to return the orientation (`theta` and `vtheta`) in degrees.
219                                    Defaults to True.
220
221        Returns:
222            Dict[str, float]: A dictionary containing the current odometry of the mobile base with:
223            - 'x': Position along the x-axis (in meters).
224            - 'y': Position along the y-axis (in meters).
225            - 'theta': Orientation (in degrees by default, radians if `degrees=False`).
226            - 'vx': Linear velocity along the x-axis (in meters per second).
227            - 'vy': Linear velocity along the y-axis (in meters per second).
228            - 'vtheta': Angular velocity (in degrees per second by default, radians if `degrees=False`).
229        """
230        current_state = self.odometry.copy()
231        if not degrees:
232            current_state["theta"] = deg2rad(current_state["theta"])
233            current_state["vtheta"] = deg2rad(current_state["vtheta"])
234
235        return current_state
236
237    def goto(
238        self,
239        x: float,
240        y: float,
241        theta: float,
242        wait: bool = False,
243        degrees: bool = True,
244        distance_tolerance: Optional[float] = 0.05,
245        angle_tolerance: Optional[float] = None,
246        timeout: float = 100,
247    ) -> GoToId:
248        """Send the mobile base to a specified target position.
249
250        The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees.
251        The zero position is set when the mobile base is started or when the `reset_odometry` method is called. A timeout
252        can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for
253        reaching the target position.
254
255        Args:
256            x: The target x-coordinate in meters.
257            y: The target y-coordinate in meters.
258            theta: The target orientation in degrees.
259            wait: If True, the function waits until the movement is completed before returning.
260                    Defaults to False.
261            degrees: If True, the theta value and angle_tolerance are treated as degrees.
262                    Defaults to True.
263            distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters.
264            angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters.
265            timeout: Optional; the maximum time allowed to reach the target, in seconds.
266
267        Returns:
268            GoToId: The unique GoToId identifier for the movement command.
269
270        Raises:
271            TypeError: If the target is not reached and the mobile base is stopped due to an obstacle.
272        """
273        if self.is_off():
274            self._logger.warning("Mobile base is off. Goto not sent.")
275            return
276
277        self._check_goto_parameters(target=[x, y, theta])
278        self._check_optional_goto_parameters(distance_tolerance, angle_tolerance, timeout)
279
280        if degrees:
281            theta = deg2rad(theta)
282            if angle_tolerance is not None:
283                angle_tolerance = deg2rad(angle_tolerance)
284
285        if angle_tolerance is None:
286            angle_tolerance = deg2rad(5.0)
287
288        vector_goal = TargetDirectionCommand(
289            id=self._part_id,
290            direction=DirectionVector(
291                x=FloatValue(value=x),
292                y=FloatValue(value=y),
293                theta=FloatValue(value=theta),
294            ),
295        )
296
297        odometry_goal = OdometryGoal(
298            odometry_goal=vector_goal,
299            distance_tolerance=FloatValue(value=distance_tolerance),
300            angle_tolerance=FloatValue(value=angle_tolerance),
301            timeout=FloatValue(value=timeout),
302        )
303
304        request = GoToRequest(
305            odometry_goal=odometry_goal,
306        )
307
308        response = self._goto_stub.GoToOdometry(request)
309
310        if response.id == -1:
311            self._logger.error(f"Unable to go to requested position x={x}, y={y}, theta={theta}. No command sent.")
312        elif wait:
313            self._wait_goto(response, timeout)
314
315        return response
316
317    def translate_by(
318        self,
319        x: float,
320        y: float,
321        wait: bool = False,
322        distance_tolerance: Optional[float] = 0.05,
323        timeout: float = 100,
324    ) -> GoToId:
325        """Send a target position relative to the current position of the mobile base.
326
327        The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space.
328
329        Args:
330            x: The desired translation along the x-axis in meters.
331            y: The desired translation along the y-axis in meters.
332            wait:  If True, the function waits until the movement is completed before returning.
333            distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters.
334            timeout: An optional timeout for reaching the target position, in seconds.
335
336        Returns:
337            The GoToId of the movement command, created using the `goto` method.
338        """
339        try:
340            goto = self.get_goto_queue()[-1]
341        except IndexError:
342            goto = self.get_goto_playing()
343
344        if goto.id != -1:
345            odom_request = self._get_goto_request(goto)
346        else:
347            odom_request = None
348
349        angle_tolerance = None
350
351        if odom_request is not None:
352            base_odom = odom_request.request.target
353            angle_tolerance = deg2rad(odom_request.request.angle_tolerance)
354        else:
355            base_odom = self.odometry
356
357        theta_goal = deg2rad(base_odom["theta"])
358        x_goal = base_odom["x"] + (x * np.cos(theta_goal) - y * np.sin(theta_goal))
359        y_goal = base_odom["y"] + (x * np.sin(theta_goal) + y * np.cos(theta_goal))
360
361        return self.goto(
362            x_goal,
363            y_goal,
364            theta_goal,
365            wait=wait,
366            distance_tolerance=distance_tolerance,
367            angle_tolerance=angle_tolerance,
368            degrees=False,
369            timeout=timeout,
370        )
371
372    def rotate_by(
373        self,
374        theta: float,
375        wait: bool = False,
376        degrees: bool = True,
377        angle_tolerance: Optional[float] = None,
378        timeout: float = 100,
379    ) -> GoToId:
380        """Send a target rotation relative to the current rotation of the mobile base.
381
382        The theta parameter defines the desired rotation in degrees.
383
384        Args:
385            theta: The desired rotation in degrees, relative to the current orientation.
386            wait: If True, the function waits until the rotation is completed before returning.
387            degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians.
388            angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished.
389            timeout: An optional timeout for completing the rotation, in seconds.
390        """
391        try:
392            goto = self.get_goto_queue()[-1]
393        except IndexError:
394            goto = self.get_goto_playing()
395
396        if goto.id != -1:
397            odom_request = self._get_goto_request(goto)
398        else:
399            odom_request = None
400
401        distance_tolerance = 0.05
402
403        if odom_request is not None:
404            base_odom = odom_request.request.target
405            if angle_tolerance is None:
406                angle_tolerance = odom_request.request.angle_tolerance
407            distance_tolerance = odom_request.request.distance_tolerance
408        else:
409            base_odom = self.odometry
410
411        if degrees:
412            theta = base_odom["theta"] + theta
413        else:
414            theta = deg2rad(base_odom["theta"]) + theta
415        x = base_odom["x"]
416        y = base_odom["y"]
417
418        return self.goto(
419            x,
420            y,
421            theta,
422            wait=wait,
423            degrees=degrees,
424            distance_tolerance=distance_tolerance,
425            angle_tolerance=angle_tolerance,
426            timeout=timeout,
427        )
428
429    def reset_odometry(self) -> None:
430        """Reset the odometry.
431
432        This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0).
433        If any goto is being played, stop the goto and the queued ones.
434        """
435        if self.get_goto_playing().id != -1 or len(self.get_goto_queue()) != 0:
436            self._logger.warning(
437                "Odometry reset requested while goto in progress: aborting the current goto and all queued gotos."
438            )
439        self._stub.ResetOdometry(self._part_id)
440        time.sleep(0.05)
441
442    def set_goal_speed(self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None:
443        """Set the goal speed for the mobile base.
444
445        This method sets the target velocities for the mobile base's movement along the x and y axes, as well as
446        its rotational speed. The actual movement is executed after calling `send_speed_command`.
447
448        Args:
449            vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0.
450            vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0.
451            vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0.
452
453        Raises:
454            TypeError: If any of the velocity values (`vx`, `vy`, `vtheta`) are not of type `float` or `int`.
455
456        Notes:
457            - Use `send_speed_command` after this method to execute the movement.
458            - The velocities will be used to command the mobile base for a short duration (0.2 seconds).
459        """
460        for vel in [vx, vy, vtheta]:
461            if not isinstance(vel, float) | isinstance(vel, int):
462                raise TypeError("goal_speed must be a float or int")
463
464        self._x_vel_goal = vx
465        self._y_vel_goal = vy
466        self._rot_vel_goal = vtheta
467
468    def send_speed_command(self) -> None:
469        """Send the speed command to the mobile base, based on previously set goal speeds.
470
471        This method sends the velocity commands for the mobile base that were set with `set_goal_speed`.
472        The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code.
473
474        Raises:
475            ValueError: If the absolute value of `x_vel`, `y_vel`, or `rot_vel` exceeds the configured maximum values.
476            Warning: If the mobile base is off, no command is sent, and a warning is logged.
477
478        Notes:
479            - This method is optimal for sending frequent speed instructions to the mobile base.
480            - The goal velocities must be set with `set_goal_speed` before calling this function.
481        """
482        if self.is_off():
483            self._logger.warning(f"{self._part_id.name} is off. speed_command not sent.")
484            return
485        for vel, value in {"x_vel": self._x_vel_goal, "y_vel": self._y_vel_goal}.items():
486            if abs(value) > self._max_xy_vel:
487                raise ValueError(f"The absolute value of {vel} should not be more than {self._max_xy_vel}, got {abs(value)}")
488
489        if abs(self._rot_vel_goal) > self._max_rot_vel:
490            raise ValueError(
491                f"The absolute value of rot_vel should not be more than {self._max_rot_vel}, got {abs(self._rot_vel_goal)}"
492            )
493
494        if self._drive_mode != "cmd_vel":
495            self._set_drive_mode("cmd_vel")
496
497        req = TargetDirectionCommand(
498            direction=DirectionVector(
499                x=FloatValue(value=self._x_vel_goal),
500                y=FloatValue(value=self._y_vel_goal),
501                theta=FloatValue(value=deg2rad(self._rot_vel_goal)),
502            )
503        )
504        self._mobility_stub.SendDirection(req)
505
506    def _update_with(self, new_state: MobileBaseState) -> None:
507        """Update the mobile base's state with newly received data from the gRPC server.
508
509        This method updates the battery level, LIDAR safety information, drive mode, and control mode
510        of the mobile base.
511
512        Args:
513            new_state: The new state of the mobile base, as a MobileBaseState object.
514        """
515        self._battery_level = new_state.battery_level.level.value
516        self.lidar._update_with(new_state.lidar_safety)
517        self._drive_mode = ZuuuModePossiblities.keys()[new_state.zuuu_mode.mode].lower()
518        self._control_mode = ControlModePossiblities.keys()[new_state.control_mode.mode].lower()
519
520    def _update_audit_status(self, new_status: MobileBaseStatus) -> None:
521        """Update the audit status of the mobile base.
522
523        This is a placeholder method and does not perform any actions.
524
525        Args:
526            new_status: The new status of the mobile base, as a MobileBaseStatus object.
527        """
528        pass  # pragma: no cover
529
530    def _set_speed_limits(self, value: int) -> None:
531        """Set the speed limits for the mobile base.
532
533        This method overrides the base class implementation to set speed limits.
534
535        Args:
536            value: The speed limit value to be set, as an integer.
537        """
538        return super()._set_speed_limits(value)
539
540    def _check_goto_parameters(self, target: Any, duration: Optional[float] = None, q0: Optional[List[float]] = None) -> None:
541        """Check the validity of the parameters for the `goto` method.
542
543        Args:
544            duration: Not required here.
545            target: The target goal, as a list [x, y, theta] in the odometry coordinate system.
546            q0: Not required here. Defaults to None.
547
548        Raises:
549            TypeError: If the x goal is not a float or int.
550            TypeError: If the y goal is not a float or int.
551            TypeError: If the theta goal is not a float or int.
552        """
553        self._check_type_float(target[0], "x")
554        self._check_type_float(target[1], "y")
555        self._check_type_float(target[2], "theta")
556
557        try:
558            goto = self.get_goto_queue()[-1]
559        except IndexError:
560            goto = self.get_goto_playing()
561
562        if goto.id != -1:
563            odom_request = self._get_goto_request(goto)
564        else:
565            odom_request = None
566
567        if odom_request is not None:
568            base_odom = odom_request.request.target
569            x_offset = abs(target[0] - base_odom["x"])
570            y_offset = abs(target[1] - base_odom["y"])
571        else:
572            x_offset = abs(target[0] - self.odometry["x"])
573            y_offset = abs(target[1] - self.odometry["y"])
574        for pos, value in {"x": x_offset, "y": y_offset}.items():
575            if abs(value) > self._max_xy_goto:
576                raise ValueError(f"The displacement in {pos} should not be more than {self._max_xy_goto}, got {abs(value)}")
577
578    def _check_optional_goto_parameters(
579        self, distance_tolerance: Optional[float], angle_tolerance: Optional[float], timeout: Optional[float]
580    ) -> None:
581        """Check the validity of the optional parameters for the `goto` method.
582
583        Args:
584            distance_tolerance: The distance tolerance value to be checked.
585            angle_tolerance: The angle tolerance value to be checked.
586            timeout: The timeout value to be checked.
587
588        Raises:
589            ValueError: If the distance_tolerance is negative.
590            ValueError: If the angle_tolerance is negative.
591            ValueError: If the timeout is negative or null.
592        """
593        if distance_tolerance is not None:
594            self._check_type_float(distance_tolerance, "distance_tolerance")
595            if distance_tolerance < 0:
596                raise ValueError(f"distance_tolerance must be a positive value, got {distance_tolerance}")
597        if angle_tolerance is not None:
598            self._check_type_float(angle_tolerance, "angle_tolerance")
599            if angle_tolerance < 0:
600                raise ValueError(f"angle_tolerance must be a positive value, got {angle_tolerance}")
601        if timeout is not None:
602            self._check_type_float(timeout, "timeout")
603            if timeout <= 0:
604                raise ValueError(f"timeout must be a positive value greater than 0, got {timeout}")
605
606    def _check_type_float(self, value: Any, arg_name: str) -> None:
607        """Check the type of the value parameter.
608
609        Args:
610            value: The value to be checked.
611
612        Raises:
613            TypeError: If the value is not a float or int.
614        """
615        if not (isinstance(value, float) | isinstance(value, int)):
616            raise TypeError(f"{arg_name} must be a float or int, got {type(value)} instead")
617
618    def set_max_xy_goto(self, value: float) -> None:
619        """Set the maximum displacement in the x and y directions for the mobile base.
620
621        Args:
622            value: The maximum displacement value to be set, in meters.
623        """
624        self._max_xy_goto = value
625
626    def goto_posture(
627        self,
628        common_posture: str = "default",
629        duration: float = 2,
630        wait: bool = False,
631        wait_for_goto_end: bool = True,
632        interpolation_mode: str = "minimum_jerk",
633    ) -> GoToId:
634        """Mobile base is not affected by goto_posture. No command is sent."""
635        return super().goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)

MobileBase class for controlling Reachy's mobile base.

This class provides methods to interact with and control the mobile base of a Reachy robot. It allows users to access essential information such as battery voltage and odometry, as well as send commands to move the base to specified positions or velocities. The class supports different drive modes and control modes, and provides methods for resetting the base's odometry.

Attributes:
  • lidar: Lidar object for handling safety features.
MobileBase( mb_msg: mobile_base_utility_pb2.MobileBase, initial_state: mobile_base_utility_pb2.MobileBaseState, grpc_channel: grpc.Channel, goto_stub: reachy2_sdk_api.goto_pb2_grpc.GoToServiceStub)
52    def __init__(
53        self,
54        mb_msg: MobileBase_proto,
55        initial_state: MobileBaseState,
56        grpc_channel: grpc.Channel,
57        goto_stub: GoToServiceStub,
58    ) -> None:
59        """Initialize the MobileBase with its gRPC communication and configuration.
60
61        This sets up the gRPC communication channel and service stubs for controlling the
62        mobile base, initializes the drive and control modes.
63        It also sets up the LIDAR safety monitoring.
64
65        Args:
66            mb_msg: A MobileBase_proto message containing the configuration details for the mobile base.
67            initial_state: The initial state of the mobile base, as a MobileBaseState object.
68            grpc_channel: The gRPC channel used to communicate with the mobile base service.
69            goto_stub: The gRPC service stub for the GoTo service.
70        """
71        self._logger = logging.getLogger(__name__)
72        super().__init__(mb_msg, grpc_channel, MobileBaseUtilityServiceStub(grpc_channel))
73        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
74
75        self._mobility_stub = MobileBaseMobilityServiceStub(grpc_channel)
76
77        self._drive_mode: str = ZuuuModePossiblities.keys()[initial_state.zuuu_mode.mode].lower()
78        self._control_mode: str = ControlModePossiblities.keys()[initial_state.control_mode.mode].lower()
79        self._battery_level = 30.0
80
81        self._max_xy_vel = 0.61
82        self._max_rot_vel = 114.0
83        self._max_xy_goto = 1.0
84
85        self.lidar = Lidar(initial_state.lidar_safety, grpc_channel, self)
86
87        self._update_with(initial_state)

Initialize the MobileBase with its gRPC communication and configuration.

This sets up the gRPC communication channel and service stubs for controlling the mobile base, initializes the drive and control modes. It also sets up the LIDAR safety monitoring.

Arguments:
  • mb_msg: A MobileBase_proto message containing the configuration details for the mobile base.
  • initial_state: The initial state of the mobile base, as a MobileBaseState object.
  • grpc_channel: The gRPC channel used to communicate with the mobile base service.
  • goto_stub: The gRPC service stub for the GoTo service.
lidar
battery_voltage: float
100    @property
101    def battery_voltage(self) -> float:
102        """Return the battery voltage.
103
104        The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low,
105        a warning message is logged.
106
107        Returns:
108            The current battery voltage as a float, rounded to one decimal place.
109        """
110        battery_level = float(round(self._battery_level, 1))
111        if battery_level < 24.5:
112            self._logger.warning(f"Low battery level: {battery_level}V. Consider recharging.")
113        return float(round(self._battery_level, 1))

Return the battery voltage.

The battery should be recharged if the voltage reaches 24.5V or below. If the battery level is low, a warning message is logged.

Returns:

The current battery voltage as a float, rounded to one decimal place.

odometry: Dict[str, float]
115    @property
116    def odometry(self) -> Dict[str, float]:
117        """Return the odometry of the base.
118
119        The odometry includes the x and y positions in meters and theta in degrees, along with the
120        velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second.
121
122        Returns:
123            A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta',
124            each rounded to three decimal places.
125        """
126        response = self._stub.GetOdometry(self._part_id)
127        odom = {
128            "x": response.x.value,
129            "y": response.y.value,
130            "theta": rad2deg(response.theta.value),
131            "vx": response.vx.value,
132            "vy": response.vy.value,
133            "vtheta": rad2deg(response.vtheta.value),
134        }
135        return odom

Return the odometry of the base.

The odometry includes the x and y positions in meters and theta in degrees, along with the velocities in the x, y directions in meters per degrees and the angular velocity in degrees per second.

Returns:

A dictionary containing the current odometry with keys 'x', 'y', 'theta', 'vx', 'vy', and 'vtheta', each rounded to three decimal places.

last_cmd_vel: Dict[str, float]
137    @property
138    def last_cmd_vel(self) -> Dict[str, float]:
139        """Return the last command velocity sent to the base.
140
141        The velocity includes the x and y components in meters per second and the theta component in degrees per second.
142
143        Returns:
144            A dictionary containing the last command velocity with keys 'x', 'y', and 'theta',
145            each rounded to three decimal places.
146        """
147        response = self._mobility_stub.GetLastDirection(self._part_id)
148        cmd_vel = {
149            "x": round(response.x.value, 3),
150            "y": round(response.y.value, 3),
151            "theta": round(rad2deg(response.theta.value), 3),
152        }
153        return cmd_vel

Return the last command velocity sent to the base.

The velocity includes the x and y components in meters per second and the theta component in degrees per second.

Returns:

A dictionary containing the last command velocity with keys 'x', 'y', and 'theta', each rounded to three decimal places.

def is_on(self) -> bool:
196    def is_on(self) -> bool:
197        """Check if the mobile base is currently stiff (not in free-wheel mode).
198
199        Returns:
200            `True` if the mobile base is not compliant (stiff), `False` otherwise.
201        """
202        return not self._drive_mode == "free_wheel"

Check if the mobile base is currently stiff (not in free-wheel mode).

Returns:

True if the mobile base is not compliant (stiff), False otherwise.

def is_off(self) -> bool:
204    def is_off(self) -> bool:
205        """Check if the mobile base is currently compliant (in free-wheel mode).
206
207        Returns:
208            True if the mobile base is compliant (in free-wheel mode), `False` otherwise.
209        """
210        if self._drive_mode == "free_wheel":
211            return True
212        return False

Check if the mobile base is currently compliant (in free-wheel mode).

Returns:

True if the mobile base is compliant (in free-wheel mode), False otherwise.

def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]:
214    def get_current_odometry(self, degrees: bool = True) -> Dict[str, float]:
215        """Get the current odometry of the mobile base in its reference frame.
216
217        Args:
218            degrees (bool, optional): Whether to return the orientation (`theta` and `vtheta`) in degrees.
219                                    Defaults to True.
220
221        Returns:
222            Dict[str, float]: A dictionary containing the current odometry of the mobile base with:
223            - 'x': Position along the x-axis (in meters).
224            - 'y': Position along the y-axis (in meters).
225            - 'theta': Orientation (in degrees by default, radians if `degrees=False`).
226            - 'vx': Linear velocity along the x-axis (in meters per second).
227            - 'vy': Linear velocity along the y-axis (in meters per second).
228            - 'vtheta': Angular velocity (in degrees per second by default, radians if `degrees=False`).
229        """
230        current_state = self.odometry.copy()
231        if not degrees:
232            current_state["theta"] = deg2rad(current_state["theta"])
233            current_state["vtheta"] = deg2rad(current_state["vtheta"])
234
235        return current_state

Get the current odometry of the mobile base in its reference frame.

Arguments:
  • degrees (bool, optional): Whether to return the orientation (theta and vtheta) in degrees. Defaults to True.
Returns:

Dict[str, float]: A dictionary containing the current odometry of the mobile base with:

  • 'x': Position along the x-axis (in meters).
  • 'y': Position along the y-axis (in meters).
  • 'theta': Orientation (in degrees by default, radians if degrees=False).
  • 'vx': Linear velocity along the x-axis (in meters per second).
  • 'vy': Linear velocity along the y-axis (in meters per second).
  • 'vtheta': Angular velocity (in degrees per second by default, radians if degrees=False).
def goto( self, x: float, y: float, theta: float, wait: bool = False, degrees: bool = True, distance_tolerance: Optional[float] = 0.05, angle_tolerance: Optional[float] = None, timeout: float = 100) -> goto_pb2.GoToId:
237    def goto(
238        self,
239        x: float,
240        y: float,
241        theta: float,
242        wait: bool = False,
243        degrees: bool = True,
244        distance_tolerance: Optional[float] = 0.05,
245        angle_tolerance: Optional[float] = None,
246        timeout: float = 100,
247    ) -> GoToId:
248        """Send the mobile base to a specified target position.
249
250        The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees.
251        The zero position is set when the mobile base is started or when the `reset_odometry` method is called. A timeout
252        can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for
253        reaching the target position.
254
255        Args:
256            x: The target x-coordinate in meters.
257            y: The target y-coordinate in meters.
258            theta: The target orientation in degrees.
259            wait: If True, the function waits until the movement is completed before returning.
260                    Defaults to False.
261            degrees: If True, the theta value and angle_tolerance are treated as degrees.
262                    Defaults to True.
263            distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters.
264            angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters.
265            timeout: Optional; the maximum time allowed to reach the target, in seconds.
266
267        Returns:
268            GoToId: The unique GoToId identifier for the movement command.
269
270        Raises:
271            TypeError: If the target is not reached and the mobile base is stopped due to an obstacle.
272        """
273        if self.is_off():
274            self._logger.warning("Mobile base is off. Goto not sent.")
275            return
276
277        self._check_goto_parameters(target=[x, y, theta])
278        self._check_optional_goto_parameters(distance_tolerance, angle_tolerance, timeout)
279
280        if degrees:
281            theta = deg2rad(theta)
282            if angle_tolerance is not None:
283                angle_tolerance = deg2rad(angle_tolerance)
284
285        if angle_tolerance is None:
286            angle_tolerance = deg2rad(5.0)
287
288        vector_goal = TargetDirectionCommand(
289            id=self._part_id,
290            direction=DirectionVector(
291                x=FloatValue(value=x),
292                y=FloatValue(value=y),
293                theta=FloatValue(value=theta),
294            ),
295        )
296
297        odometry_goal = OdometryGoal(
298            odometry_goal=vector_goal,
299            distance_tolerance=FloatValue(value=distance_tolerance),
300            angle_tolerance=FloatValue(value=angle_tolerance),
301            timeout=FloatValue(value=timeout),
302        )
303
304        request = GoToRequest(
305            odometry_goal=odometry_goal,
306        )
307
308        response = self._goto_stub.GoToOdometry(request)
309
310        if response.id == -1:
311            self._logger.error(f"Unable to go to requested position x={x}, y={y}, theta={theta}. No command sent.")
312        elif wait:
313            self._wait_goto(response, timeout)
314
315        return response

Send the mobile base to a specified target position.

The (x, y) coordinates define the position in Cartesian space, and theta specifies the orientation in degrees. The zero position is set when the mobile base is started or when the reset_odometry method is called. A timeout can be provided to avoid the mobile base getting stuck. The tolerance values define the acceptable margins for reaching the target position.

Arguments:
  • x: The target x-coordinate in meters.
  • y: The target y-coordinate in meters.
  • theta: The target orientation in degrees.
  • wait: If True, the function waits until the movement is completed before returning. Defaults to False.
  • degrees: If True, the theta value and angle_tolerance are treated as degrees. Defaults to True.
  • distance_tolerance: Optional; the tolerance to the target position to consider the goto finished, in meters.
  • angle_tolerance: Optional; the angle tolerance to the target to consider the goto finished, in meters.
  • timeout: Optional; the maximum time allowed to reach the target, in seconds.
Returns:

GoToId: The unique GoToId identifier for the movement command.

Raises:
  • TypeError: If the target is not reached and the mobile base is stopped due to an obstacle.
def translate_by( self, x: float, y: float, wait: bool = False, distance_tolerance: Optional[float] = 0.05, timeout: float = 100) -> goto_pb2.GoToId:
317    def translate_by(
318        self,
319        x: float,
320        y: float,
321        wait: bool = False,
322        distance_tolerance: Optional[float] = 0.05,
323        timeout: float = 100,
324    ) -> GoToId:
325        """Send a target position relative to the current position of the mobile base.
326
327        The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space.
328
329        Args:
330            x: The desired translation along the x-axis in meters.
331            y: The desired translation along the y-axis in meters.
332            wait:  If True, the function waits until the movement is completed before returning.
333            distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters.
334            timeout: An optional timeout for reaching the target position, in seconds.
335
336        Returns:
337            The GoToId of the movement command, created using the `goto` method.
338        """
339        try:
340            goto = self.get_goto_queue()[-1]
341        except IndexError:
342            goto = self.get_goto_playing()
343
344        if goto.id != -1:
345            odom_request = self._get_goto_request(goto)
346        else:
347            odom_request = None
348
349        angle_tolerance = None
350
351        if odom_request is not None:
352            base_odom = odom_request.request.target
353            angle_tolerance = deg2rad(odom_request.request.angle_tolerance)
354        else:
355            base_odom = self.odometry
356
357        theta_goal = deg2rad(base_odom["theta"])
358        x_goal = base_odom["x"] + (x * np.cos(theta_goal) - y * np.sin(theta_goal))
359        y_goal = base_odom["y"] + (x * np.sin(theta_goal) + y * np.cos(theta_goal))
360
361        return self.goto(
362            x_goal,
363            y_goal,
364            theta_goal,
365            wait=wait,
366            distance_tolerance=distance_tolerance,
367            angle_tolerance=angle_tolerance,
368            degrees=False,
369            timeout=timeout,
370        )

Send a target position relative to the current position of the mobile base.

The (x, y) coordinates specify the desired translation in the mobile base's Cartesian space.

Arguments:
  • x: The desired translation along the x-axis in meters.
  • y: The desired translation along the y-axis in meters.
  • wait: If True, the function waits until the movement is completed before returning.
  • distance_tolerance: Optional; The distance tolerance to the target to consider the goto finished, in meters.
  • timeout: An optional timeout for reaching the target position, in seconds.
Returns:

The GoToId of the movement command, created using the goto method.

def rotate_by( self, theta: float, wait: bool = False, degrees: bool = True, angle_tolerance: Optional[float] = None, timeout: float = 100) -> goto_pb2.GoToId:
372    def rotate_by(
373        self,
374        theta: float,
375        wait: bool = False,
376        degrees: bool = True,
377        angle_tolerance: Optional[float] = None,
378        timeout: float = 100,
379    ) -> GoToId:
380        """Send a target rotation relative to the current rotation of the mobile base.
381
382        The theta parameter defines the desired rotation in degrees.
383
384        Args:
385            theta: The desired rotation in degrees, relative to the current orientation.
386            wait: If True, the function waits until the rotation is completed before returning.
387            degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians.
388            angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished.
389            timeout: An optional timeout for completing the rotation, in seconds.
390        """
391        try:
392            goto = self.get_goto_queue()[-1]
393        except IndexError:
394            goto = self.get_goto_playing()
395
396        if goto.id != -1:
397            odom_request = self._get_goto_request(goto)
398        else:
399            odom_request = None
400
401        distance_tolerance = 0.05
402
403        if odom_request is not None:
404            base_odom = odom_request.request.target
405            if angle_tolerance is None:
406                angle_tolerance = odom_request.request.angle_tolerance
407            distance_tolerance = odom_request.request.distance_tolerance
408        else:
409            base_odom = self.odometry
410
411        if degrees:
412            theta = base_odom["theta"] + theta
413        else:
414            theta = deg2rad(base_odom["theta"]) + theta
415        x = base_odom["x"]
416        y = base_odom["y"]
417
418        return self.goto(
419            x,
420            y,
421            theta,
422            wait=wait,
423            degrees=degrees,
424            distance_tolerance=distance_tolerance,
425            angle_tolerance=angle_tolerance,
426            timeout=timeout,
427        )

Send a target rotation relative to the current rotation of the mobile base.

The theta parameter defines the desired rotation in degrees.

Arguments:
  • theta: The desired rotation in degrees, relative to the current orientation.
  • wait: If True, the function waits until the rotation is completed before returning.
  • degrees: If True, the theta value and angle_tolerance are treated as degrees, otherwise as radians.
  • angle_tolerance: Optional; The angle tolerance to the target to consider the goto finished.
  • timeout: An optional timeout for completing the rotation, in seconds.
def reset_odometry(self) -> None:
429    def reset_odometry(self) -> None:
430        """Reset the odometry.
431
432        This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0).
433        If any goto is being played, stop the goto and the queued ones.
434        """
435        if self.get_goto_playing().id != -1 or len(self.get_goto_queue()) != 0:
436            self._logger.warning(
437                "Odometry reset requested while goto in progress: aborting the current goto and all queued gotos."
438            )
439        self._stub.ResetOdometry(self._part_id)
440        time.sleep(0.05)

Reset the odometry.

This method resets the mobile base's odometry, so that the current position is now (x, y, theta) = (0, 0, 0). If any goto is being played, stop the goto and the queued ones.

def set_goal_speed( self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None:
442    def set_goal_speed(self, vx: float | int = 0, vy: float | int = 0, vtheta: float | int = 0) -> None:
443        """Set the goal speed for the mobile base.
444
445        This method sets the target velocities for the mobile base's movement along the x and y axes, as well as
446        its rotational speed. The actual movement is executed after calling `send_speed_command`.
447
448        Args:
449            vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0.
450            vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0.
451            vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0.
452
453        Raises:
454            TypeError: If any of the velocity values (`vx`, `vy`, `vtheta`) are not of type `float` or `int`.
455
456        Notes:
457            - Use `send_speed_command` after this method to execute the movement.
458            - The velocities will be used to command the mobile base for a short duration (0.2 seconds).
459        """
460        for vel in [vx, vy, vtheta]:
461            if not isinstance(vel, float) | isinstance(vel, int):
462                raise TypeError("goal_speed must be a float or int")
463
464        self._x_vel_goal = vx
465        self._y_vel_goal = vy
466        self._rot_vel_goal = vtheta

Set the goal speed for the mobile base.

This method sets the target velocities for the mobile base's movement along the x and y axes, as well as its rotational speed. The actual movement is executed after calling send_speed_command.

Arguments:
  • vx (float | int, optional): Linear velocity along the x-axis in meters per second. Defaults to 0.
  • vy (float | int, optional): Linear velocity along the y-axis in meters per second. Defaults to 0.
  • vtheta (float | int, optional): Rotational velocity (around the z-axis) in degrees per second. Defaults to 0.
Raises:
  • TypeError: If any of the velocity values (vx, vy, vtheta) are not of type float or int.
Notes:
  • Use send_speed_command after this method to execute the movement.
  • The velocities will be used to command the mobile base for a short duration (0.2 seconds).
def send_speed_command(self) -> None:
468    def send_speed_command(self) -> None:
469        """Send the speed command to the mobile base, based on previously set goal speeds.
470
471        This method sends the velocity commands for the mobile base that were set with `set_goal_speed`.
472        The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code.
473
474        Raises:
475            ValueError: If the absolute value of `x_vel`, `y_vel`, or `rot_vel` exceeds the configured maximum values.
476            Warning: If the mobile base is off, no command is sent, and a warning is logged.
477
478        Notes:
479            - This method is optimal for sending frequent speed instructions to the mobile base.
480            - The goal velocities must be set with `set_goal_speed` before calling this function.
481        """
482        if self.is_off():
483            self._logger.warning(f"{self._part_id.name} is off. speed_command not sent.")
484            return
485        for vel, value in {"x_vel": self._x_vel_goal, "y_vel": self._y_vel_goal}.items():
486            if abs(value) > self._max_xy_vel:
487                raise ValueError(f"The absolute value of {vel} should not be more than {self._max_xy_vel}, got {abs(value)}")
488
489        if abs(self._rot_vel_goal) > self._max_rot_vel:
490            raise ValueError(
491                f"The absolute value of rot_vel should not be more than {self._max_rot_vel}, got {abs(self._rot_vel_goal)}"
492            )
493
494        if self._drive_mode != "cmd_vel":
495            self._set_drive_mode("cmd_vel")
496
497        req = TargetDirectionCommand(
498            direction=DirectionVector(
499                x=FloatValue(value=self._x_vel_goal),
500                y=FloatValue(value=self._y_vel_goal),
501                theta=FloatValue(value=deg2rad(self._rot_vel_goal)),
502            )
503        )
504        self._mobility_stub.SendDirection(req)

Send the speed command to the mobile base, based on previously set goal speeds.

This method sends the velocity commands for the mobile base that were set with set_goal_speed. The command will be executed for a duration of 200ms, which is predefined at the ROS level of the mobile base code.

Raises:
  • ValueError: If the absolute value of x_vel, y_vel, or rot_vel exceeds the configured maximum values.
  • Warning: If the mobile base is off, no command is sent, and a warning is logged.
Notes:
  • This method is optimal for sending frequent speed instructions to the mobile base.
  • The goal velocities must be set with set_goal_speed before calling this function.
def set_max_xy_goto(self, value: float) -> None:
618    def set_max_xy_goto(self, value: float) -> None:
619        """Set the maximum displacement in the x and y directions for the mobile base.
620
621        Args:
622            value: The maximum displacement value to be set, in meters.
623        """
624        self._max_xy_goto = value

Set the maximum displacement in the x and y directions for the mobile base.

Arguments:
  • value: The maximum displacement value to be set, in meters.
def goto_posture( self, common_posture: str = 'default', duration: float = 2, wait: bool = False, wait_for_goto_end: bool = True, interpolation_mode: str = 'minimum_jerk') -> goto_pb2.GoToId:
626    def goto_posture(
627        self,
628        common_posture: str = "default",
629        duration: float = 2,
630        wait: bool = False,
631        wait_for_goto_end: bool = True,
632        interpolation_mode: str = "minimum_jerk",
633    ) -> GoToId:
634        """Mobile base is not affected by goto_posture. No command is sent."""
635        return super().goto_posture(common_posture, duration, wait, wait_for_goto_end, interpolation_mode)

Mobile base is not affected by goto_posture. No command is sent.