reachy2_sdk.media.camera_manager

Reachy Camera Manager module.

Initialize the head and torso cameras if they are available.

  1"""Reachy Camera Manager module.
  2
  3Initialize the head and torso cameras if they are available.
  4"""
  5
  6import logging
  7from typing import Optional
  8
  9import grpc
 10from google.protobuf.empty_pb2 import Empty
 11from reachy2_sdk_api.video_pb2_grpc import VideoServiceStub
 12
 13from .camera import Camera, CameraType, DepthCamera
 14
 15
 16class CameraManager:
 17    """CameraManager class manages the available cameras on the robot.
 18
 19    Provides access to the robot's cameras, including teleoperation and depth cameras.
 20    It handles the initialization of cameras and offers methods to retrieve camera objects for use.
 21    """
 22
 23    def __init__(self, host: str, port: int) -> None:
 24        """Set up the camera manager module.
 25
 26        This initializes the gRPC channel for communicating with the camera service,
 27        sets up logging, and prepares the available cameras.
 28
 29        Args:
 30            host: The host address for the gRPC service.
 31            port: The port number for the gRPC service.
 32        """
 33        self._logger = logging.getLogger(__name__)
 34        self._grpc_video_channel = grpc.insecure_channel(f"{host}:{port}")
 35        self._host = host
 36
 37        self._video_stub = VideoServiceStub(self._grpc_video_channel)
 38
 39        self._teleop: Optional[Camera] = None
 40        self._depth: Optional[DepthCamera] = None
 41        self._setup_cameras()
 42
 43    def __repr__(self) -> str:
 44        """Clean representation of a reachy cameras."""
 45        s = "\n\t".join([str(cam) for cam in [self._depth, self._teleop] if cam is not None])
 46        return f"""<CameraManager intialized_cameras=\n\t{s}\n>"""
 47
 48    def _setup_cameras(self) -> None:
 49        """Initialize cameras based on availability.
 50
 51        This method retrieves the available cameras and sets
 52        up the teleop and depth cameras if they are found.
 53        """
 54        cams = self._video_stub.GetAvailableCameras(Empty())
 55        self._teleop = None
 56        self._depth = None
 57        if len(cams.camera_feat) == 0:
 58            self._logger.warning("There is no available camera.")
 59        else:
 60            self._logger.debug(cams.camera_feat)
 61            for c in cams.camera_feat:
 62                if c.name == CameraType.TELEOP.value:
 63                    self._logger.debug("Teleop Camera initialized.")
 64                    self._teleop = Camera(c, self._video_stub)
 65                elif c.name == CameraType.DEPTH.value:
 66                    self._logger.debug("Depth Camera initialized.")
 67                    self._depth = DepthCamera(c, self._video_stub)
 68                else:
 69                    self._logger.error(f"Camera {c.name} not defined")
 70
 71    def initialize_cameras(self) -> None:
 72        """Manually re-initialize cameras.
 73
 74        This method can be used to reinitialize the camera setup if changes occur
 75        or new cameras are connected.
 76        """
 77        self._setup_cameras()
 78
 79    @property
 80    def teleop(self) -> Optional[Camera]:
 81        """Retrieve the teleop camera.
 82
 83        Returns:
 84            The teleop Camera object if it is initialized; otherwise, logs an error
 85            and returns None.
 86        """
 87        if self._teleop is None:
 88            self._logger.error("Teleop camera is not initialized.")
 89            return None
 90
 91        return self._teleop
 92
 93    @property
 94    def depth(self) -> Optional[DepthCamera]:
 95        """Retrieve the depth camera.
 96
 97        Returns:
 98            The DepthCamera object if it is initialized; otherwise, logs an error
 99            and returns None.
100        """
101        if self._depth is None:
102            self._logger.error("Depth camera is not initialized.")
103            return None
104
105        return self._depth
class CameraManager:
 17class CameraManager:
 18    """CameraManager class manages the available cameras on the robot.
 19
 20    Provides access to the robot's cameras, including teleoperation and depth cameras.
 21    It handles the initialization of cameras and offers methods to retrieve camera objects for use.
 22    """
 23
 24    def __init__(self, host: str, port: int) -> None:
 25        """Set up the camera manager module.
 26
 27        This initializes the gRPC channel for communicating with the camera service,
 28        sets up logging, and prepares the available cameras.
 29
 30        Args:
 31            host: The host address for the gRPC service.
 32            port: The port number for the gRPC service.
 33        """
 34        self._logger = logging.getLogger(__name__)
 35        self._grpc_video_channel = grpc.insecure_channel(f"{host}:{port}")
 36        self._host = host
 37
 38        self._video_stub = VideoServiceStub(self._grpc_video_channel)
 39
 40        self._teleop: Optional[Camera] = None
 41        self._depth: Optional[DepthCamera] = None
 42        self._setup_cameras()
 43
 44    def __repr__(self) -> str:
 45        """Clean representation of a reachy cameras."""
 46        s = "\n\t".join([str(cam) for cam in [self._depth, self._teleop] if cam is not None])
 47        return f"""<CameraManager intialized_cameras=\n\t{s}\n>"""
 48
 49    def _setup_cameras(self) -> None:
 50        """Initialize cameras based on availability.
 51
 52        This method retrieves the available cameras and sets
 53        up the teleop and depth cameras if they are found.
 54        """
 55        cams = self._video_stub.GetAvailableCameras(Empty())
 56        self._teleop = None
 57        self._depth = None
 58        if len(cams.camera_feat) == 0:
 59            self._logger.warning("There is no available camera.")
 60        else:
 61            self._logger.debug(cams.camera_feat)
 62            for c in cams.camera_feat:
 63                if c.name == CameraType.TELEOP.value:
 64                    self._logger.debug("Teleop Camera initialized.")
 65                    self._teleop = Camera(c, self._video_stub)
 66                elif c.name == CameraType.DEPTH.value:
 67                    self._logger.debug("Depth Camera initialized.")
 68                    self._depth = DepthCamera(c, self._video_stub)
 69                else:
 70                    self._logger.error(f"Camera {c.name} not defined")
 71
 72    def initialize_cameras(self) -> None:
 73        """Manually re-initialize cameras.
 74
 75        This method can be used to reinitialize the camera setup if changes occur
 76        or new cameras are connected.
 77        """
 78        self._setup_cameras()
 79
 80    @property
 81    def teleop(self) -> Optional[Camera]:
 82        """Retrieve the teleop camera.
 83
 84        Returns:
 85            The teleop Camera object if it is initialized; otherwise, logs an error
 86            and returns None.
 87        """
 88        if self._teleop is None:
 89            self._logger.error("Teleop camera is not initialized.")
 90            return None
 91
 92        return self._teleop
 93
 94    @property
 95    def depth(self) -> Optional[DepthCamera]:
 96        """Retrieve the depth camera.
 97
 98        Returns:
 99            The DepthCamera object if it is initialized; otherwise, logs an error
100            and returns None.
101        """
102        if self._depth is None:
103            self._logger.error("Depth camera is not initialized.")
104            return None
105
106        return self._depth

CameraManager class manages the available cameras on the robot.

Provides access to the robot's cameras, including teleoperation and depth cameras. It handles the initialization of cameras and offers methods to retrieve camera objects for use.

CameraManager(host: str, port: int)
24    def __init__(self, host: str, port: int) -> None:
25        """Set up the camera manager module.
26
27        This initializes the gRPC channel for communicating with the camera service,
28        sets up logging, and prepares the available cameras.
29
30        Args:
31            host: The host address for the gRPC service.
32            port: The port number for the gRPC service.
33        """
34        self._logger = logging.getLogger(__name__)
35        self._grpc_video_channel = grpc.insecure_channel(f"{host}:{port}")
36        self._host = host
37
38        self._video_stub = VideoServiceStub(self._grpc_video_channel)
39
40        self._teleop: Optional[Camera] = None
41        self._depth: Optional[DepthCamera] = None
42        self._setup_cameras()

Set up the camera manager module.

This initializes the gRPC channel for communicating with the camera service, sets up logging, and prepares the available cameras.

Arguments:
  • host: The host address for the gRPC service.
  • port: The port number for the gRPC service.
def initialize_cameras(self) -> None:
72    def initialize_cameras(self) -> None:
73        """Manually re-initialize cameras.
74
75        This method can be used to reinitialize the camera setup if changes occur
76        or new cameras are connected.
77        """
78        self._setup_cameras()

Manually re-initialize cameras.

This method can be used to reinitialize the camera setup if changes occur or new cameras are connected.

teleop: Optional[reachy2_sdk.media.camera.Camera]
80    @property
81    def teleop(self) -> Optional[Camera]:
82        """Retrieve the teleop camera.
83
84        Returns:
85            The teleop Camera object if it is initialized; otherwise, logs an error
86            and returns None.
87        """
88        if self._teleop is None:
89            self._logger.error("Teleop camera is not initialized.")
90            return None
91
92        return self._teleop

Retrieve the teleop camera.

Returns:

The teleop Camera object if it is initialized; otherwise, logs an error and returns None.

depth: Optional[reachy2_sdk.media.camera.DepthCamera]
 94    @property
 95    def depth(self) -> Optional[DepthCamera]:
 96        """Retrieve the depth camera.
 97
 98        Returns:
 99            The DepthCamera object if it is initialized; otherwise, logs an error
100            and returns None.
101        """
102        if self._depth is None:
103            self._logger.error("Depth camera is not initialized.")
104            return None
105
106        return self._depth

Retrieve the depth camera.

Returns:

The DepthCamera object if it is initialized; otherwise, logs an error and returns None.