reachy2_sdk.parts.joints_based_part

Reachy JointsBasedPart module.

Handles all specific methods to parts composed of controllable joints.

  1"""Reachy JointsBasedPart module.
  2
  3Handles all specific methods to parts composed of controllable joints.
  4"""
  5
  6from abc import abstractmethod
  7from typing import List
  8
  9import grpc
 10from reachy2_sdk_api.arm_pb2 import Arm as Arm_proto
 11from reachy2_sdk_api.arm_pb2 import (
 12    ArmComponentsCommands,
 13    SpeedLimitRequest,
 14    TorqueLimitRequest,
 15)
 16from reachy2_sdk_api.arm_pb2_grpc import ArmServiceStub
 17from reachy2_sdk_api.head_pb2 import Head as Head_proto
 18from reachy2_sdk_api.head_pb2 import HeadComponentsCommands
 19from reachy2_sdk_api.head_pb2_grpc import HeadServiceStub
 20
 21from ..orbita.orbita_joint import OrbitaJoint
 22from ..utils.custom_dict import CustomDict
 23from .part import Part
 24
 25
 26class JointsBasedPart(Part):
 27    """Base class for parts of the robot composed of controllable joints.
 28
 29    The `JointsBasedPart` class serves as a base for parts of the robot that consist of multiple joints,
 30    such as arms and heads. This class provides common functionality for controlling joints, setting speed
 31    and torque limits, and managing joint positions.
 32    """
 33
 34    def __init__(
 35        self,
 36        proto_msg: Arm_proto | Head_proto,
 37        grpc_channel: grpc.Channel,
 38        stub: ArmServiceStub | HeadServiceStub,
 39    ) -> None:
 40        """Initialize the JointsBasedPart with its common attributes.
 41
 42        Sets up the gRPC communication channel and service stub for controlling the joint-based
 43        part of the robot, such as an arm or head.
 44
 45        Args:
 46            proto_msg: A protocol message representing the part's configuration. It can be an
 47                Arm_proto or Head_proto object.
 48            grpc_channel: The gRPC channel used to communicate with the corresponding service.
 49            stub: The service stub for the gRPC communication, which can be an ArmServiceStub or
 50                HeadServiceStub, depending on the part type.
 51        """
 52        super().__init__(proto_msg, grpc_channel, stub)
 53
 54    @property
 55    def joints(self) -> CustomDict[str, OrbitaJoint]:
 56        """Get all the arm's joints.
 57
 58        Returns:
 59            A dictionary of all the arm's joints, with joint names as keys and joint objects as values.
 60        """
 61        _joints: CustomDict[str, OrbitaJoint] = CustomDict({})
 62        for actuator_name, actuator in self._actuators.items():
 63            for joint in actuator._joints.values():
 64                if hasattr(joint, "_axis_type"):
 65                    _joints[actuator_name + "." + joint._axis_type] = joint
 66                else:
 67                    _joints[actuator_name] = joint
 68        return _joints
 69
 70    @abstractmethod
 71    def get_current_positions(self) -> List[float]:
 72        """Get the current positions of all joints.
 73
 74        Returns:
 75            A list of float values representing the present positions in degrees of the arm's joints.
 76        """
 77        pass  # pragma: no cover
 78
 79    def send_goal_positions(self, check_positions: bool = False) -> None:
 80        """Send goal positions to the part's joints.
 81
 82        If goal positions have been specified for any joint of the part, sends them to the robot.
 83
 84        Args :
 85            check_positions: A boolean indicating whether to check the positions after sending the command.
 86                Defaults to True.
 87        """
 88        if self.is_off():
 89            self._logger.warning(f"{self._part_id.name} is off. Goal positions not sent.")
 90            return
 91        self._stub.SendComponentsCommands(self._get_goal_positions_message())
 92        self._clean_outgoing_goal_positions()
 93        if check_positions:
 94            self._post_send_goal_positions()
 95
 96    @abstractmethod
 97    def _get_goal_positions_message(self) -> HeadComponentsCommands | ArmComponentsCommands:
 98        """Get the message to send the goal positions to the actuator."""
 99        pass  # pragma: no cover
100
101    @abstractmethod
102    def _clean_outgoing_goal_positions(self) -> None:
103        """Get the message to send the goal positions to the actuator."""
104        pass  # pragma: no cover
105
106    @abstractmethod
107    def _post_send_goal_positions(self) -> None:
108        """Monitor the joint positions to check if they reach the specified goals."""
109        pass  # pragma: no cover
110
111    def set_torque_limits(self, torque_limit: int) -> None:
112        """Set the torque limit as a percentage of the maximum torque for all motors of the part.
113
114        Args:
115            torque_limit: The desired torque limit as a percentage (0-100) of the maximum torque. Can be
116                specified as a float or int.
117        """
118        if not isinstance(torque_limit, float | int):
119            raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}")
120        if not (0 <= torque_limit <= 100):
121            raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")
122        req = TorqueLimitRequest(
123            id=self._part_id,
124            limit=torque_limit,
125        )
126        self._stub.SetTorqueLimit(req)
127
128    def set_speed_limits(self, speed_limit: int) -> None:
129        """Set the speed limit as a percentage of the maximum speed for all motors of the part.
130
131        Args:
132            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
133                specified as a float or int.
134        """
135        if not isinstance(speed_limit, float | int):
136            raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}")
137        if not (0 <= speed_limit <= 100):
138            raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")
139        req = SpeedLimitRequest(
140            id=self._part_id,
141            limit=speed_limit,
142        )
143        self._stub.SetSpeedLimit(req)
144
145    def _set_speed_limits(self, speed_limit: int) -> None:
146        """Set the speed limit as a percentage of the maximum speed for all motors of the part.
147
148        Args:
149            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
150                specified as a float or int.
151        """
152        return self.set_speed_limits(speed_limit)
class JointsBasedPart(reachy2_sdk.parts.part.Part):
 27class JointsBasedPart(Part):
 28    """Base class for parts of the robot composed of controllable joints.
 29
 30    The `JointsBasedPart` class serves as a base for parts of the robot that consist of multiple joints,
 31    such as arms and heads. This class provides common functionality for controlling joints, setting speed
 32    and torque limits, and managing joint positions.
 33    """
 34
 35    def __init__(
 36        self,
 37        proto_msg: Arm_proto | Head_proto,
 38        grpc_channel: grpc.Channel,
 39        stub: ArmServiceStub | HeadServiceStub,
 40    ) -> None:
 41        """Initialize the JointsBasedPart with its common attributes.
 42
 43        Sets up the gRPC communication channel and service stub for controlling the joint-based
 44        part of the robot, such as an arm or head.
 45
 46        Args:
 47            proto_msg: A protocol message representing the part's configuration. It can be an
 48                Arm_proto or Head_proto object.
 49            grpc_channel: The gRPC channel used to communicate with the corresponding service.
 50            stub: The service stub for the gRPC communication, which can be an ArmServiceStub or
 51                HeadServiceStub, depending on the part type.
 52        """
 53        super().__init__(proto_msg, grpc_channel, stub)
 54
 55    @property
 56    def joints(self) -> CustomDict[str, OrbitaJoint]:
 57        """Get all the arm's joints.
 58
 59        Returns:
 60            A dictionary of all the arm's joints, with joint names as keys and joint objects as values.
 61        """
 62        _joints: CustomDict[str, OrbitaJoint] = CustomDict({})
 63        for actuator_name, actuator in self._actuators.items():
 64            for joint in actuator._joints.values():
 65                if hasattr(joint, "_axis_type"):
 66                    _joints[actuator_name + "." + joint._axis_type] = joint
 67                else:
 68                    _joints[actuator_name] = joint
 69        return _joints
 70
 71    @abstractmethod
 72    def get_current_positions(self) -> List[float]:
 73        """Get the current positions of all joints.
 74
 75        Returns:
 76            A list of float values representing the present positions in degrees of the arm's joints.
 77        """
 78        pass  # pragma: no cover
 79
 80    def send_goal_positions(self, check_positions: bool = False) -> None:
 81        """Send goal positions to the part's joints.
 82
 83        If goal positions have been specified for any joint of the part, sends them to the robot.
 84
 85        Args :
 86            check_positions: A boolean indicating whether to check the positions after sending the command.
 87                Defaults to True.
 88        """
 89        if self.is_off():
 90            self._logger.warning(f"{self._part_id.name} is off. Goal positions not sent.")
 91            return
 92        self._stub.SendComponentsCommands(self._get_goal_positions_message())
 93        self._clean_outgoing_goal_positions()
 94        if check_positions:
 95            self._post_send_goal_positions()
 96
 97    @abstractmethod
 98    def _get_goal_positions_message(self) -> HeadComponentsCommands | ArmComponentsCommands:
 99        """Get the message to send the goal positions to the actuator."""
100        pass  # pragma: no cover
101
102    @abstractmethod
103    def _clean_outgoing_goal_positions(self) -> None:
104        """Get the message to send the goal positions to the actuator."""
105        pass  # pragma: no cover
106
107    @abstractmethod
108    def _post_send_goal_positions(self) -> None:
109        """Monitor the joint positions to check if they reach the specified goals."""
110        pass  # pragma: no cover
111
112    def set_torque_limits(self, torque_limit: int) -> None:
113        """Set the torque limit as a percentage of the maximum torque for all motors of the part.
114
115        Args:
116            torque_limit: The desired torque limit as a percentage (0-100) of the maximum torque. Can be
117                specified as a float or int.
118        """
119        if not isinstance(torque_limit, float | int):
120            raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}")
121        if not (0 <= torque_limit <= 100):
122            raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")
123        req = TorqueLimitRequest(
124            id=self._part_id,
125            limit=torque_limit,
126        )
127        self._stub.SetTorqueLimit(req)
128
129    def set_speed_limits(self, speed_limit: int) -> None:
130        """Set the speed limit as a percentage of the maximum speed for all motors of the part.
131
132        Args:
133            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
134                specified as a float or int.
135        """
136        if not isinstance(speed_limit, float | int):
137            raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}")
138        if not (0 <= speed_limit <= 100):
139            raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")
140        req = SpeedLimitRequest(
141            id=self._part_id,
142            limit=speed_limit,
143        )
144        self._stub.SetSpeedLimit(req)
145
146    def _set_speed_limits(self, speed_limit: int) -> None:
147        """Set the speed limit as a percentage of the maximum speed for all motors of the part.
148
149        Args:
150            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
151                specified as a float or int.
152        """
153        return self.set_speed_limits(speed_limit)

Base class for parts of the robot composed of controllable joints.

The JointsBasedPart class serves as a base for parts of the robot that consist of multiple joints, such as arms and heads. This class provides common functionality for controlling joints, setting speed and torque limits, and managing joint positions.

JointsBasedPart( proto_msg: arm_pb2.Arm | head_pb2.Head, grpc_channel: grpc.Channel, stub: reachy2_sdk_api.arm_pb2_grpc.ArmServiceStub | reachy2_sdk_api.head_pb2_grpc.HeadServiceStub)
35    def __init__(
36        self,
37        proto_msg: Arm_proto | Head_proto,
38        grpc_channel: grpc.Channel,
39        stub: ArmServiceStub | HeadServiceStub,
40    ) -> None:
41        """Initialize the JointsBasedPart with its common attributes.
42
43        Sets up the gRPC communication channel and service stub for controlling the joint-based
44        part of the robot, such as an arm or head.
45
46        Args:
47            proto_msg: A protocol message representing the part's configuration. It can be an
48                Arm_proto or Head_proto object.
49            grpc_channel: The gRPC channel used to communicate with the corresponding service.
50            stub: The service stub for the gRPC communication, which can be an ArmServiceStub or
51                HeadServiceStub, depending on the part type.
52        """
53        super().__init__(proto_msg, grpc_channel, stub)

Initialize the JointsBasedPart with its common attributes.

Sets up the gRPC communication channel and service stub for controlling the joint-based part of the robot, such as an arm or head.

Arguments:
  • proto_msg: A protocol message representing the part's configuration. It can be an Arm_proto or Head_proto object.
  • grpc_channel: The gRPC channel used to communicate with the corresponding service.
  • stub: The service stub for the gRPC communication, which can be an ArmServiceStub or HeadServiceStub, depending on the part type.
55    @property
56    def joints(self) -> CustomDict[str, OrbitaJoint]:
57        """Get all the arm's joints.
58
59        Returns:
60            A dictionary of all the arm's joints, with joint names as keys and joint objects as values.
61        """
62        _joints: CustomDict[str, OrbitaJoint] = CustomDict({})
63        for actuator_name, actuator in self._actuators.items():
64            for joint in actuator._joints.values():
65                if hasattr(joint, "_axis_type"):
66                    _joints[actuator_name + "." + joint._axis_type] = joint
67                else:
68                    _joints[actuator_name] = joint
69        return _joints

Get all the arm's joints.

Returns:

A dictionary of all the arm's joints, with joint names as keys and joint objects as values.

@abstractmethod
def get_current_positions(self) -> List[float]:
71    @abstractmethod
72    def get_current_positions(self) -> List[float]:
73        """Get the current positions of all joints.
74
75        Returns:
76            A list of float values representing the present positions in degrees of the arm's joints.
77        """
78        pass  # pragma: no cover

Get the current positions of all joints.

Returns:

A list of float values representing the present positions in degrees of the arm's joints.

def send_goal_positions(self, check_positions: bool = False) -> None:
80    def send_goal_positions(self, check_positions: bool = False) -> None:
81        """Send goal positions to the part's joints.
82
83        If goal positions have been specified for any joint of the part, sends them to the robot.
84
85        Args :
86            check_positions: A boolean indicating whether to check the positions after sending the command.
87                Defaults to True.
88        """
89        if self.is_off():
90            self._logger.warning(f"{self._part_id.name} is off. Goal positions not sent.")
91            return
92        self._stub.SendComponentsCommands(self._get_goal_positions_message())
93        self._clean_outgoing_goal_positions()
94        if check_positions:
95            self._post_send_goal_positions()

Send goal positions to the part's joints.

If goal positions have been specified for any joint of the part, sends them to the robot.

Args :

check_positions: A boolean indicating whether to check the positions after sending the command. Defaults to True.

def set_torque_limits(self, torque_limit: int) -> None:
112    def set_torque_limits(self, torque_limit: int) -> None:
113        """Set the torque limit as a percentage of the maximum torque for all motors of the part.
114
115        Args:
116            torque_limit: The desired torque limit as a percentage (0-100) of the maximum torque. Can be
117                specified as a float or int.
118        """
119        if not isinstance(torque_limit, float | int):
120            raise TypeError(f"Expected one of: float, int for torque_limit, got {type(torque_limit).__name__}")
121        if not (0 <= torque_limit <= 100):
122            raise ValueError(f"torque_limit must be in [0, 100], got {torque_limit}.")
123        req = TorqueLimitRequest(
124            id=self._part_id,
125            limit=torque_limit,
126        )
127        self._stub.SetTorqueLimit(req)

Set the torque limit as a percentage of the maximum torque for all motors of the part.

Arguments:
  • torque_limit: The desired torque limit as a percentage (0-100) of the maximum torque. Can be specified as a float or int.
def set_speed_limits(self, speed_limit: int) -> None:
129    def set_speed_limits(self, speed_limit: int) -> None:
130        """Set the speed limit as a percentage of the maximum speed for all motors of the part.
131
132        Args:
133            speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be
134                specified as a float or int.
135        """
136        if not isinstance(speed_limit, float | int):
137            raise TypeError(f"Expected one of: float, int for speed_limit, got {type(speed_limit).__name__}")
138        if not (0 <= speed_limit <= 100):
139            raise ValueError(f"speed_limit must be in [0, 100], got {speed_limit}.")
140        req = SpeedLimitRequest(
141            id=self._part_id,
142            limit=speed_limit,
143        )
144        self._stub.SetSpeedLimit(req)

Set the speed limit as a percentage of the maximum speed for all motors of the part.

Arguments:
  • speed_limit: The desired speed limit as a percentage (0-100) of the maximum speed. Can be specified as a float or int.