reachy2_sdk.parts.hand

Reachy Hand module.

Handles all specific methods to a Hand.

  1"""Reachy Hand module.
  2
  3Handles all specific methods to a Hand.
  4"""
  5
  6import time
  7from abc import abstractmethod
  8from threading import Thread
  9from typing import Dict, Optional
 10
 11import grpc
 12import numpy as np
 13from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
 14from reachy2_sdk_api.hand_pb2 import Hand as Hand_proto
 15from reachy2_sdk_api.hand_pb2 import HandPositionRequest, HandStatus
 16from reachy2_sdk_api.hand_pb2_grpc import HandServiceStub
 17
 18from ..grippers.gripper_joint import GripperJoint
 19from .goto_based_part import IGoToBasedPart
 20from .part import Part
 21
 22
 23class Hand(Part, IGoToBasedPart):
 24    """Class for controlling the Reachy's hand.
 25
 26    The `Hand` class provides methods to control the gripper of Reachy, including opening and closing
 27    the hand, setting the goal position, and checking the hand's state. It also manages the hand's
 28    compliance status (whether it is stiff or free).
 29    It is an abstract class that should be subclassed to implement specific behaviors for different grippers.
 30
 31    Attributes:
 32        opening: The opening of the hand as a percentage (0-100), rounded to two decimal places.
 33        present_position: The current position of the hand in degrees.
 34        goal_position: The target goal position of the hand in degrees.
 35    """
 36
 37    def __init__(
 38        self,
 39        hand_msg: Hand_proto,
 40        grpc_channel: grpc.Channel,
 41        goto_stub: GoToServiceStub,
 42    ) -> None:
 43        """Initialize the Hand component.
 44
 45        Sets up the necessary attributes and configuration for the hand, including the gRPC
 46        stub and initial state.
 47
 48        Args:
 49            hand_msg: The Hand_proto object containing the configuration details for the hand.
 50            grpc_channel: The gRPC channel used to communicate with the hand's gRPC service.
 51            goto_stub: The gRPC stub for controlling goto movements.
 52        """
 53        super().__init__(hand_msg, grpc_channel, HandServiceStub(grpc_channel))
 54        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
 55        self._stub = HandServiceStub(grpc_channel)
 56
 57        self._last_goto_checked: Optional[int] = None
 58        self._joints: Dict[str, GripperJoint] = {}
 59
 60        self._thread_check_position: Optional[Thread] = None
 61        self._cancel_check = False
 62
 63    def _set_speed_limits(self, value: int) -> None:
 64        """Set the speed limits for the hand.
 65
 66        Args:
 67            value: The speed limit value to be set, as a percentage (0-100) of the maximum allowed speed,
 68                represented as an integer.
 69        """
 70        return super()._set_speed_limits(value)
 71
 72    def is_on(self) -> bool:
 73        """Check if the hand is stiff.
 74
 75        Returns:
 76            `True` if the hand is on (not compliant), `False` otherwise.
 77        """
 78        for j in self._joints.values():
 79            if not j.is_on():
 80                return False
 81        return True
 82
 83    def is_off(self) -> bool:
 84        """Check if the hand is compliant.
 85
 86        Returns:
 87            `True` if the hand is off (compliant), `False` otherwise.
 88        """
 89        for j in self._joints.values():
 90            if j.is_on():
 91                return False
 92        return True
 93
 94    def is_moving(self) -> bool:
 95        """Check if the hand is currently moving.
 96
 97        Returns:
 98            `True` if any joint of the hand is moving, `False` otherwise.
 99        """
100        goto_playing = self.get_goto_playing()
101        if goto_playing.id != -1 and goto_playing.id != self._last_goto_checked:
102            self._last_goto_checked = goto_playing.id
103            for j in self._joints.values():
104                j._is_moving = True
105                j._check_joint_movement()
106        for j in self._joints.values():
107            if j.is_moving():
108                return True
109        return False
110
111    def open(self) -> None:
112        """Open the hand.
113
114        Raises:
115            RuntimeError: If the gripper is off and the open request cannot be sent.
116        """
117        if not self.is_on():
118            raise RuntimeError("Gripper is off. Open request not sent.")
119        self._stub.OpenHand(self._part_id)
120        for j in self._joints.values():
121            j._is_moving = True
122
123    def close(self) -> None:
124        """Close the hand.
125
126        Raises:
127            RuntimeError: If the gripper is off and the close request cannot be sent.
128        """
129        if not self.is_on():
130            raise RuntimeError("Gripper is off. Close request not sent.")
131        self._stub.CloseHand(self._part_id)
132        for j in self._joints.values():
133            j._is_moving = True
134
135    def send_goal_positions(self, check_positions: bool = False) -> None:
136        """Send the goal positions to the hand's joints.
137
138        If any goal position has been specified for any of the gripper's joints, sends them to the robot.
139        If the hand is off, the command is not sent.
140
141        Args :
142            check_positions: A boolean indicating whether to check the positions after sending the command.
143                Defaults to True.
144        """
145        command = self._get_goal_positions_message()
146        if command is not None:
147            self._stub.SetHandPosition(command)
148            self._clean_outgoing_goal_positions()
149            if check_positions:
150                self._post_send_goal_positions()
151
152    @abstractmethod
153    def _get_goal_positions_message(self) -> Optional[HandPositionRequest]:
154        """Get the HandPositionRequest message to send the goal positions to the actuator."""
155
156    def _clean_outgoing_goal_positions(self) -> None:
157        """Clean the outgoing goal positions."""
158        for j in self._joints.values():
159            j._outgoing_goal_positions = None
160
161    def _post_send_goal_positions(self) -> None:
162        """Start a background thread to check the goal positions after sending them.
163
164        This method stops any ongoing position check thread and starts a new thread
165        to monitor the current positions of the joints relative to their last goal positions.
166        """
167        self._cancel_check = True
168        if self._thread_check_position is not None and self._thread_check_position.is_alive():
169            self._thread_check_position.join()
170        self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True)
171        self._thread_check_position.start()
172
173    def _check_goal_positions(self) -> None:
174        """Monitor the joint positions to check if they reach the specified goals.
175
176        This method checks the current positions of the joints and compares them to
177        the goal positions. If a position is significantly different from the goal after 1 second,
178        a warning is logged indicating that the position may be unreachable.
179        """
180        self._cancel_check = False
181        t1 = time.time()
182        while time.time() - t1 < 1:
183            time.sleep(0.0001)
184            if self._cancel_check:
185                # in case of multiple send_goal_positions we'll check the next call
186                return
187
188        for joint_name, joint in self._joints.items():
189            # precision is low we are looking for unreachable positions
190            if not np.isclose(joint.present_position, joint.goal_position, atol=1):
191                self._logger.warning(
192                    f"Required goal position ({round(joint.goal_position, 2)}) "
193                    f"for {self._part_id.name}.{joint_name} is unreachable."
194                    f"\nCurrent position is ({round(joint.present_position, 2)})."
195                )
196
197    def _update_audit_status(self, new_status: HandStatus) -> None:
198        """Update the audit status with the new status received from the gRPC server.
199
200        Args:
201            new_status: A HandStatus object representing the new status of the hand.
202        """
203        pass  # pragma: no cover
 24class Hand(Part, IGoToBasedPart):
 25    """Class for controlling the Reachy's hand.
 26
 27    The `Hand` class provides methods to control the gripper of Reachy, including opening and closing
 28    the hand, setting the goal position, and checking the hand's state. It also manages the hand's
 29    compliance status (whether it is stiff or free).
 30    It is an abstract class that should be subclassed to implement specific behaviors for different grippers.
 31
 32    Attributes:
 33        opening: The opening of the hand as a percentage (0-100), rounded to two decimal places.
 34        present_position: The current position of the hand in degrees.
 35        goal_position: The target goal position of the hand in degrees.
 36    """
 37
 38    def __init__(
 39        self,
 40        hand_msg: Hand_proto,
 41        grpc_channel: grpc.Channel,
 42        goto_stub: GoToServiceStub,
 43    ) -> None:
 44        """Initialize the Hand component.
 45
 46        Sets up the necessary attributes and configuration for the hand, including the gRPC
 47        stub and initial state.
 48
 49        Args:
 50            hand_msg: The Hand_proto object containing the configuration details for the hand.
 51            grpc_channel: The gRPC channel used to communicate with the hand's gRPC service.
 52            goto_stub: The gRPC stub for controlling goto movements.
 53        """
 54        super().__init__(hand_msg, grpc_channel, HandServiceStub(grpc_channel))
 55        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
 56        self._stub = HandServiceStub(grpc_channel)
 57
 58        self._last_goto_checked: Optional[int] = None
 59        self._joints: Dict[str, GripperJoint] = {}
 60
 61        self._thread_check_position: Optional[Thread] = None
 62        self._cancel_check = False
 63
 64    def _set_speed_limits(self, value: int) -> None:
 65        """Set the speed limits for the hand.
 66
 67        Args:
 68            value: The speed limit value to be set, as a percentage (0-100) of the maximum allowed speed,
 69                represented as an integer.
 70        """
 71        return super()._set_speed_limits(value)
 72
 73    def is_on(self) -> bool:
 74        """Check if the hand is stiff.
 75
 76        Returns:
 77            `True` if the hand is on (not compliant), `False` otherwise.
 78        """
 79        for j in self._joints.values():
 80            if not j.is_on():
 81                return False
 82        return True
 83
 84    def is_off(self) -> bool:
 85        """Check if the hand is compliant.
 86
 87        Returns:
 88            `True` if the hand is off (compliant), `False` otherwise.
 89        """
 90        for j in self._joints.values():
 91            if j.is_on():
 92                return False
 93        return True
 94
 95    def is_moving(self) -> bool:
 96        """Check if the hand is currently moving.
 97
 98        Returns:
 99            `True` if any joint of the hand is moving, `False` otherwise.
100        """
101        goto_playing = self.get_goto_playing()
102        if goto_playing.id != -1 and goto_playing.id != self._last_goto_checked:
103            self._last_goto_checked = goto_playing.id
104            for j in self._joints.values():
105                j._is_moving = True
106                j._check_joint_movement()
107        for j in self._joints.values():
108            if j.is_moving():
109                return True
110        return False
111
112    def open(self) -> None:
113        """Open the hand.
114
115        Raises:
116            RuntimeError: If the gripper is off and the open request cannot be sent.
117        """
118        if not self.is_on():
119            raise RuntimeError("Gripper is off. Open request not sent.")
120        self._stub.OpenHand(self._part_id)
121        for j in self._joints.values():
122            j._is_moving = True
123
124    def close(self) -> None:
125        """Close the hand.
126
127        Raises:
128            RuntimeError: If the gripper is off and the close request cannot be sent.
129        """
130        if not self.is_on():
131            raise RuntimeError("Gripper is off. Close request not sent.")
132        self._stub.CloseHand(self._part_id)
133        for j in self._joints.values():
134            j._is_moving = True
135
136    def send_goal_positions(self, check_positions: bool = False) -> None:
137        """Send the goal positions to the hand's joints.
138
139        If any goal position has been specified for any of the gripper's joints, sends them to the robot.
140        If the hand is off, the command is not sent.
141
142        Args :
143            check_positions: A boolean indicating whether to check the positions after sending the command.
144                Defaults to True.
145        """
146        command = self._get_goal_positions_message()
147        if command is not None:
148            self._stub.SetHandPosition(command)
149            self._clean_outgoing_goal_positions()
150            if check_positions:
151                self._post_send_goal_positions()
152
153    @abstractmethod
154    def _get_goal_positions_message(self) -> Optional[HandPositionRequest]:
155        """Get the HandPositionRequest message to send the goal positions to the actuator."""
156
157    def _clean_outgoing_goal_positions(self) -> None:
158        """Clean the outgoing goal positions."""
159        for j in self._joints.values():
160            j._outgoing_goal_positions = None
161
162    def _post_send_goal_positions(self) -> None:
163        """Start a background thread to check the goal positions after sending them.
164
165        This method stops any ongoing position check thread and starts a new thread
166        to monitor the current positions of the joints relative to their last goal positions.
167        """
168        self._cancel_check = True
169        if self._thread_check_position is not None and self._thread_check_position.is_alive():
170            self._thread_check_position.join()
171        self._thread_check_position = Thread(target=self._check_goal_positions, daemon=True)
172        self._thread_check_position.start()
173
174    def _check_goal_positions(self) -> None:
175        """Monitor the joint positions to check if they reach the specified goals.
176
177        This method checks the current positions of the joints and compares them to
178        the goal positions. If a position is significantly different from the goal after 1 second,
179        a warning is logged indicating that the position may be unreachable.
180        """
181        self._cancel_check = False
182        t1 = time.time()
183        while time.time() - t1 < 1:
184            time.sleep(0.0001)
185            if self._cancel_check:
186                # in case of multiple send_goal_positions we'll check the next call
187                return
188
189        for joint_name, joint in self._joints.items():
190            # precision is low we are looking for unreachable positions
191            if not np.isclose(joint.present_position, joint.goal_position, atol=1):
192                self._logger.warning(
193                    f"Required goal position ({round(joint.goal_position, 2)}) "
194                    f"for {self._part_id.name}.{joint_name} is unreachable."
195                    f"\nCurrent position is ({round(joint.present_position, 2)})."
196                )
197
198    def _update_audit_status(self, new_status: HandStatus) -> None:
199        """Update the audit status with the new status received from the gRPC server.
200
201        Args:
202            new_status: A HandStatus object representing the new status of the hand.
203        """
204        pass  # pragma: no cover

Class for controlling the Reachy's hand.

The Hand class provides methods to control the gripper of Reachy, including opening and closing the hand, setting the goal position, and checking the hand's state. It also manages the hand's compliance status (whether it is stiff or free). It is an abstract class that should be subclassed to implement specific behaviors for different grippers.

Attributes:
  • opening: The opening of the hand as a percentage (0-100), rounded to two decimal places.
  • present_position: The current position of the hand in degrees.
  • goal_position: The target goal position of the hand in degrees.
Hand( hand_msg: hand_pb2.Hand, grpc_channel: grpc.Channel, goto_stub: reachy2_sdk_api.goto_pb2_grpc.GoToServiceStub)
38    def __init__(
39        self,
40        hand_msg: Hand_proto,
41        grpc_channel: grpc.Channel,
42        goto_stub: GoToServiceStub,
43    ) -> None:
44        """Initialize the Hand component.
45
46        Sets up the necessary attributes and configuration for the hand, including the gRPC
47        stub and initial state.
48
49        Args:
50            hand_msg: The Hand_proto object containing the configuration details for the hand.
51            grpc_channel: The gRPC channel used to communicate with the hand's gRPC service.
52            goto_stub: The gRPC stub for controlling goto movements.
53        """
54        super().__init__(hand_msg, grpc_channel, HandServiceStub(grpc_channel))
55        IGoToBasedPart.__init__(self, self._part_id, goto_stub)
56        self._stub = HandServiceStub(grpc_channel)
57
58        self._last_goto_checked: Optional[int] = None
59        self._joints: Dict[str, GripperJoint] = {}
60
61        self._thread_check_position: Optional[Thread] = None
62        self._cancel_check = False

Initialize the Hand component.

Sets up the necessary attributes and configuration for the hand, including the gRPC stub and initial state.

Arguments:
  • hand_msg: The Hand_proto object containing the configuration details for the hand.
  • grpc_channel: The gRPC channel used to communicate with the hand's gRPC service.
  • goto_stub: The gRPC stub for controlling goto movements.
def is_on(self) -> bool:
73    def is_on(self) -> bool:
74        """Check if the hand is stiff.
75
76        Returns:
77            `True` if the hand is on (not compliant), `False` otherwise.
78        """
79        for j in self._joints.values():
80            if not j.is_on():
81                return False
82        return True

Check if the hand is stiff.

Returns:

True if the hand is on (not compliant), False otherwise.

def is_off(self) -> bool:
84    def is_off(self) -> bool:
85        """Check if the hand is compliant.
86
87        Returns:
88            `True` if the hand is off (compliant), `False` otherwise.
89        """
90        for j in self._joints.values():
91            if j.is_on():
92                return False
93        return True

Check if the hand is compliant.

Returns:

True if the hand is off (compliant), False otherwise.

def is_moving(self) -> bool:
 95    def is_moving(self) -> bool:
 96        """Check if the hand is currently moving.
 97
 98        Returns:
 99            `True` if any joint of the hand is moving, `False` otherwise.
100        """
101        goto_playing = self.get_goto_playing()
102        if goto_playing.id != -1 and goto_playing.id != self._last_goto_checked:
103            self._last_goto_checked = goto_playing.id
104            for j in self._joints.values():
105                j._is_moving = True
106                j._check_joint_movement()
107        for j in self._joints.values():
108            if j.is_moving():
109                return True
110        return False

Check if the hand is currently moving.

Returns:

True if any joint of the hand is moving, False otherwise.

def open(self) -> None:
112    def open(self) -> None:
113        """Open the hand.
114
115        Raises:
116            RuntimeError: If the gripper is off and the open request cannot be sent.
117        """
118        if not self.is_on():
119            raise RuntimeError("Gripper is off. Open request not sent.")
120        self._stub.OpenHand(self._part_id)
121        for j in self._joints.values():
122            j._is_moving = True

Open the hand.

Raises:
  • RuntimeError: If the gripper is off and the open request cannot be sent.
def close(self) -> None:
124    def close(self) -> None:
125        """Close the hand.
126
127        Raises:
128            RuntimeError: If the gripper is off and the close request cannot be sent.
129        """
130        if not self.is_on():
131            raise RuntimeError("Gripper is off. Close request not sent.")
132        self._stub.CloseHand(self._part_id)
133        for j in self._joints.values():
134            j._is_moving = True

Close the hand.

Raises:
  • RuntimeError: If the gripper is off and the close request cannot be sent.
def send_goal_positions(self, check_positions: bool = False) -> None:
136    def send_goal_positions(self, check_positions: bool = False) -> None:
137        """Send the goal positions to the hand's joints.
138
139        If any goal position has been specified for any of the gripper's joints, sends them to the robot.
140        If the hand is off, the command is not sent.
141
142        Args :
143            check_positions: A boolean indicating whether to check the positions after sending the command.
144                Defaults to True.
145        """
146        command = self._get_goal_positions_message()
147        if command is not None:
148            self._stub.SetHandPosition(command)
149            self._clean_outgoing_goal_positions()
150            if check_positions:
151                self._post_send_goal_positions()

Send the goal positions to the hand's joints.

If any goal position has been specified for any of the gripper's joints, sends them to the robot. If the hand is off, the command is not sent.

Args :

check_positions: A boolean indicating whether to check the positions after sending the command. Defaults to True.