reachy2_sdk.media.camera_manager
Reachy Camera Manager module.
Initialize the head and torso cameras if they are available.
1"""Reachy Camera Manager module. 2 3Initialize the head and torso cameras if they are available. 4""" 5 6import logging 7from typing import Optional 8 9import grpc 10from google.protobuf.empty_pb2 import Empty 11from reachy2_sdk_api.video_pb2_grpc import VideoServiceStub 12 13from .camera import Camera, CameraType, DepthCamera 14 15 16class CameraManager: 17 """CameraManager class manages the available cameras on the robot. 18 19 Provides access to the robot's cameras, including teleoperation and depth cameras. 20 It handles the initialization of cameras and offers methods to retrieve camera objects for use. 21 """ 22 23 def __init__(self, host: str, port: int) -> None: 24 """Set up the camera manager module. 25 26 This initializes the gRPC channel for communicating with the camera service, 27 sets up logging, and prepares the available cameras. 28 29 Args: 30 host: The host address for the gRPC service. 31 port: The port number for the gRPC service. 32 """ 33 self._logger = logging.getLogger(__name__) 34 self._grpc_video_channel = grpc.insecure_channel(f"{host}:{port}") 35 self._host = host 36 37 self._video_stub = VideoServiceStub(self._grpc_video_channel) 38 39 self._teleop: Optional[Camera] = None 40 self._depth: Optional[DepthCamera] = None 41 self._setup_cameras() 42 43 def __repr__(self) -> str: 44 """Clean representation of a reachy cameras.""" 45 s = "\n\t".join([str(cam) for cam in [self._depth, self._teleop] if cam is not None]) 46 return f"""<CameraManager intialized_cameras=\n\t{s}\n>""" 47 48 def _setup_cameras(self) -> None: 49 """Initialize cameras based on availability. 50 51 This method retrieves the available cameras and sets 52 up the teleop and depth cameras if they are found. 53 """ 54 cams = self._video_stub.GetAvailableCameras(Empty()) 55 self._teleop = None 56 self._depth = None 57 if len(cams.camera_feat) == 0: 58 self._logger.warning("There is no available camera.") 59 else: 60 self._logger.debug(cams.camera_feat) 61 for c in cams.camera_feat: 62 if c.name == CameraType.TELEOP.value: 63 self._logger.debug("Teleop Camera initialized.") 64 self._teleop = Camera(c, self._video_stub) 65 elif c.name == CameraType.DEPTH.value: 66 self._logger.debug("Depth Camera initialized.") 67 self._depth = DepthCamera(c, self._video_stub) 68 else: 69 self._logger.error(f"Camera {c.name} not defined") 70 71 def initialize_cameras(self) -> None: 72 """Manually re-initialize cameras. 73 74 This method can be used to reinitialize the camera setup if changes occur 75 or new cameras are connected. 76 """ 77 self._setup_cameras() 78 79 @property 80 def teleop(self) -> Optional[Camera]: 81 """Retrieve the teleop camera. 82 83 Returns: 84 The teleop Camera object if it is initialized; otherwise, logs an error 85 and returns None. 86 """ 87 if self._teleop is None: 88 self._logger.error("Teleop camera is not initialized.") 89 return None 90 91 return self._teleop 92 93 @property 94 def depth(self) -> Optional[DepthCamera]: 95 """Retrieve the depth camera. 96 97 Returns: 98 The DepthCamera object if it is initialized; otherwise, logs an error 99 and returns None. 100 """ 101 if self._depth is None: 102 self._logger.error("Depth camera is not initialized.") 103 return None 104 105 return self._depth
17class CameraManager: 18 """CameraManager class manages the available cameras on the robot. 19 20 Provides access to the robot's cameras, including teleoperation and depth cameras. 21 It handles the initialization of cameras and offers methods to retrieve camera objects for use. 22 """ 23 24 def __init__(self, host: str, port: int) -> None: 25 """Set up the camera manager module. 26 27 This initializes the gRPC channel for communicating with the camera service, 28 sets up logging, and prepares the available cameras. 29 30 Args: 31 host: The host address for the gRPC service. 32 port: The port number for the gRPC service. 33 """ 34 self._logger = logging.getLogger(__name__) 35 self._grpc_video_channel = grpc.insecure_channel(f"{host}:{port}") 36 self._host = host 37 38 self._video_stub = VideoServiceStub(self._grpc_video_channel) 39 40 self._teleop: Optional[Camera] = None 41 self._depth: Optional[DepthCamera] = None 42 self._setup_cameras() 43 44 def __repr__(self) -> str: 45 """Clean representation of a reachy cameras.""" 46 s = "\n\t".join([str(cam) for cam in [self._depth, self._teleop] if cam is not None]) 47 return f"""<CameraManager intialized_cameras=\n\t{s}\n>""" 48 49 def _setup_cameras(self) -> None: 50 """Initialize cameras based on availability. 51 52 This method retrieves the available cameras and sets 53 up the teleop and depth cameras if they are found. 54 """ 55 cams = self._video_stub.GetAvailableCameras(Empty()) 56 self._teleop = None 57 self._depth = None 58 if len(cams.camera_feat) == 0: 59 self._logger.warning("There is no available camera.") 60 else: 61 self._logger.debug(cams.camera_feat) 62 for c in cams.camera_feat: 63 if c.name == CameraType.TELEOP.value: 64 self._logger.debug("Teleop Camera initialized.") 65 self._teleop = Camera(c, self._video_stub) 66 elif c.name == CameraType.DEPTH.value: 67 self._logger.debug("Depth Camera initialized.") 68 self._depth = DepthCamera(c, self._video_stub) 69 else: 70 self._logger.error(f"Camera {c.name} not defined") 71 72 def initialize_cameras(self) -> None: 73 """Manually re-initialize cameras. 74 75 This method can be used to reinitialize the camera setup if changes occur 76 or new cameras are connected. 77 """ 78 self._setup_cameras() 79 80 @property 81 def teleop(self) -> Optional[Camera]: 82 """Retrieve the teleop camera. 83 84 Returns: 85 The teleop Camera object if it is initialized; otherwise, logs an error 86 and returns None. 87 """ 88 if self._teleop is None: 89 self._logger.error("Teleop camera is not initialized.") 90 return None 91 92 return self._teleop 93 94 @property 95 def depth(self) -> Optional[DepthCamera]: 96 """Retrieve the depth camera. 97 98 Returns: 99 The DepthCamera object if it is initialized; otherwise, logs an error 100 and returns None. 101 """ 102 if self._depth is None: 103 self._logger.error("Depth camera is not initialized.") 104 return None 105 106 return self._depth
CameraManager class manages the available cameras on the robot.
Provides access to the robot's cameras, including teleoperation and depth cameras. It handles the initialization of cameras and offers methods to retrieve camera objects for use.
24 def __init__(self, host: str, port: int) -> None: 25 """Set up the camera manager module. 26 27 This initializes the gRPC channel for communicating with the camera service, 28 sets up logging, and prepares the available cameras. 29 30 Args: 31 host: The host address for the gRPC service. 32 port: The port number for the gRPC service. 33 """ 34 self._logger = logging.getLogger(__name__) 35 self._grpc_video_channel = grpc.insecure_channel(f"{host}:{port}") 36 self._host = host 37 38 self._video_stub = VideoServiceStub(self._grpc_video_channel) 39 40 self._teleop: Optional[Camera] = None 41 self._depth: Optional[DepthCamera] = None 42 self._setup_cameras()
Set up the camera manager module.
This initializes the gRPC channel for communicating with the camera service, sets up logging, and prepares the available cameras.
Arguments:
- host: The host address for the gRPC service.
- port: The port number for the gRPC service.
72 def initialize_cameras(self) -> None: 73 """Manually re-initialize cameras. 74 75 This method can be used to reinitialize the camera setup if changes occur 76 or new cameras are connected. 77 """ 78 self._setup_cameras()
Manually re-initialize cameras.
This method can be used to reinitialize the camera setup if changes occur or new cameras are connected.
80 @property 81 def teleop(self) -> Optional[Camera]: 82 """Retrieve the teleop camera. 83 84 Returns: 85 The teleop Camera object if it is initialized; otherwise, logs an error 86 and returns None. 87 """ 88 if self._teleop is None: 89 self._logger.error("Teleop camera is not initialized.") 90 return None 91 92 return self._teleop
Retrieve the teleop camera.
Returns:
The teleop Camera object if it is initialized; otherwise, logs an error and returns None.
94 @property 95 def depth(self) -> Optional[DepthCamera]: 96 """Retrieve the depth camera. 97 98 Returns: 99 The DepthCamera object if it is initialized; otherwise, logs an error 100 and returns None. 101 """ 102 if self._depth is None: 103 self._logger.error("Depth camera is not initialized.") 104 return None 105 106 return self._depth
Retrieve the depth camera.
Returns:
The DepthCamera object if it is initialized; otherwise, logs an error and returns None.