reachy2_sdk.media.audio
Reachy Audio module.
Enable access to the microphones and speaker.
1"""Reachy Audio module. 2 3Enable access to the microphones and speaker. 4""" 5 6import logging 7import os 8from io import BytesIO 9from typing import Generator, List 10 11import grpc 12from google.protobuf.empty_pb2 import Empty 13from reachy2_sdk_api.audio_pb2 import AudioFile, AudioFileRequest 14from reachy2_sdk_api.audio_pb2_grpc import AudioServiceStub 15 16 17class Audio: 18 """Audio class manages the microhpones and speaker on the robot. 19 20 It allows to play audio files, and record audio. Please note that the audio files are stored in a 21 temporary folder on the robot and are deleted when the robot is turned off. 22 """ 23 24 def __init__(self, host: str, port: int) -> None: 25 """Set up the audio module. 26 27 This initializes the gRPC channel for communicating with the audio service. 28 29 Args: 30 host: The host address for the gRPC service. 31 port: The port number for the gRPC service. 32 """ 33 self._logger = logging.getLogger(__name__) 34 self._grpc_audio_channel = grpc.insecure_channel(f"{host}:{port}") 35 self._host = host 36 37 self._audio_stub = AudioServiceStub(self._grpc_audio_channel) 38 39 def _validate_extension(self, path: str, valid_extensions: List[str]) -> bool: 40 """Validate the file type and return the file name if valid. 41 42 Args: 43 path: The path to the audio file. 44 45 Returns: 46 The file name if the file type is valid, otherwise None. 47 """ 48 return path.lower().endswith(tuple(valid_extensions)) 49 50 def upload_audio_file(self, path: str) -> bool: 51 """Upload an audio file to the robot. 52 53 This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot 54 and is deleted when the robot is turned off. 55 56 Args: 57 path: The path to the audio file to upload. 58 """ 59 if not self._validate_extension(path, [".wav", ".ogg", ".mp3"]): 60 self._logger.error("Invalid file type. Supported file types are .wav, .ogg, .mp3") 61 return False 62 63 if not os.path.exists(path): 64 self._logger.error(f"File does not exist: {path}") 65 return False 66 67 def generate_requests(file_path: str) -> Generator[AudioFileRequest, None, None]: 68 yield AudioFileRequest(info=AudioFile(path=os.path.basename(file_path))) 69 70 # 64KiB seems to be the size limit. see https://github.com/grpc/grpc.github.io/issues/371 71 CHUNK_SIZE = 64 * 1024 # 64 KB 72 73 with open(file_path, "rb") as file: 74 while True: 75 chunk = file.read(CHUNK_SIZE) 76 if not chunk: 77 break 78 yield AudioFileRequest(chunk_data=chunk) 79 80 response = self._audio_stub.UploadAudioFile(generate_requests(path)) 81 if response.success.value: 82 return True 83 else: 84 self._logger.error(f"Failed to upload file: {response.error}") 85 return False 86 87 def download_audio_file(self, name: str, path: str) -> bool: 88 """Download an audio file from the robot. 89 90 Args: 91 name: The name of the audio file to download. 92 path: The folder to save the downloaded audio file. 93 """ 94 response_iterator = self._audio_stub.DownloadAudioFile(AudioFile(path=name)) 95 96 file_name = None 97 buffer = BytesIO() 98 99 for response in response_iterator: 100 if response.WhichOneof("data") == "info": 101 file_name = response.info.path 102 elif response.WhichOneof("data") == "chunk_data": 103 buffer.write(response.chunk_data) 104 105 if file_name: 106 file_path = os.path.join(path, file_name) 107 with open(file_path, "wb") as file: 108 file.write(buffer.getvalue()) 109 return os.path.exists(file_path) 110 else: 111 return False 112 113 def get_audio_files(self) -> List[str]: 114 """Get audio files from the robot. 115 116 This method retrieves the list of audio files stored on the robot. 117 """ 118 files = self._audio_stub.GetAudioFiles(request=Empty()) 119 120 return [file.path for file in files.files] 121 122 def remove_audio_file(self, name: str) -> bool: 123 """Remove an audio file from the robot. 124 125 This method removes an audio file from the robot. 126 127 Args: 128 name: The name of the audio file to remove. 129 """ 130 response = self._audio_stub.RemoveAudioFile(request=AudioFile(path=name)) 131 if response.success.value: 132 return True 133 else: 134 self._logger.error(f"Failed to remove file: {response.error}") 135 return False 136 137 def play_audio_file(self, name: str) -> None: 138 """Play an audio file on the robot. 139 140 This method plays an audio file on the robot. 141 142 Args: 143 name: The name of the audio file to play. 144 """ 145 self._audio_stub.PlayAudioFile(request=AudioFile(path=name)) 146 147 def stop_playing(self) -> None: 148 """Stop playing audio on the robot. 149 150 This method stops the audio that is currently playing on the robot. 151 """ 152 self._audio_stub.StopPlaying(Empty()) 153 154 def record_audio(self, name: str, duration_secs: float) -> bool: 155 """Record audio on the robot. 156 157 This method records audio on the robot. 158 159 Args: 160 name: name of the audio file. The extension defines the encoding. Ony ogg is supported. 161 duration_secs: duration of the recording in seconds. 162 """ 163 if not self._validate_extension(name, [".ogg"]): 164 self._logger.error("Invalid file type. Supported file type is .ogg") 165 return False 166 167 self._audio_stub.RecordAudioFile(request=AudioFile(path=name, duration=duration_secs)) 168 return True 169 170 def stop_recording(self) -> None: 171 """Stop recording audio on the robot. 172 173 This method stops the audio recording on the robot. 174 """ 175 self._audio_stub.StopRecording(Empty())
18class Audio: 19 """Audio class manages the microhpones and speaker on the robot. 20 21 It allows to play audio files, and record audio. Please note that the audio files are stored in a 22 temporary folder on the robot and are deleted when the robot is turned off. 23 """ 24 25 def __init__(self, host: str, port: int) -> None: 26 """Set up the audio module. 27 28 This initializes the gRPC channel for communicating with the audio service. 29 30 Args: 31 host: The host address for the gRPC service. 32 port: The port number for the gRPC service. 33 """ 34 self._logger = logging.getLogger(__name__) 35 self._grpc_audio_channel = grpc.insecure_channel(f"{host}:{port}") 36 self._host = host 37 38 self._audio_stub = AudioServiceStub(self._grpc_audio_channel) 39 40 def _validate_extension(self, path: str, valid_extensions: List[str]) -> bool: 41 """Validate the file type and return the file name if valid. 42 43 Args: 44 path: The path to the audio file. 45 46 Returns: 47 The file name if the file type is valid, otherwise None. 48 """ 49 return path.lower().endswith(tuple(valid_extensions)) 50 51 def upload_audio_file(self, path: str) -> bool: 52 """Upload an audio file to the robot. 53 54 This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot 55 and is deleted when the robot is turned off. 56 57 Args: 58 path: The path to the audio file to upload. 59 """ 60 if not self._validate_extension(path, [".wav", ".ogg", ".mp3"]): 61 self._logger.error("Invalid file type. Supported file types are .wav, .ogg, .mp3") 62 return False 63 64 if not os.path.exists(path): 65 self._logger.error(f"File does not exist: {path}") 66 return False 67 68 def generate_requests(file_path: str) -> Generator[AudioFileRequest, None, None]: 69 yield AudioFileRequest(info=AudioFile(path=os.path.basename(file_path))) 70 71 # 64KiB seems to be the size limit. see https://github.com/grpc/grpc.github.io/issues/371 72 CHUNK_SIZE = 64 * 1024 # 64 KB 73 74 with open(file_path, "rb") as file: 75 while True: 76 chunk = file.read(CHUNK_SIZE) 77 if not chunk: 78 break 79 yield AudioFileRequest(chunk_data=chunk) 80 81 response = self._audio_stub.UploadAudioFile(generate_requests(path)) 82 if response.success.value: 83 return True 84 else: 85 self._logger.error(f"Failed to upload file: {response.error}") 86 return False 87 88 def download_audio_file(self, name: str, path: str) -> bool: 89 """Download an audio file from the robot. 90 91 Args: 92 name: The name of the audio file to download. 93 path: The folder to save the downloaded audio file. 94 """ 95 response_iterator = self._audio_stub.DownloadAudioFile(AudioFile(path=name)) 96 97 file_name = None 98 buffer = BytesIO() 99 100 for response in response_iterator: 101 if response.WhichOneof("data") == "info": 102 file_name = response.info.path 103 elif response.WhichOneof("data") == "chunk_data": 104 buffer.write(response.chunk_data) 105 106 if file_name: 107 file_path = os.path.join(path, file_name) 108 with open(file_path, "wb") as file: 109 file.write(buffer.getvalue()) 110 return os.path.exists(file_path) 111 else: 112 return False 113 114 def get_audio_files(self) -> List[str]: 115 """Get audio files from the robot. 116 117 This method retrieves the list of audio files stored on the robot. 118 """ 119 files = self._audio_stub.GetAudioFiles(request=Empty()) 120 121 return [file.path for file in files.files] 122 123 def remove_audio_file(self, name: str) -> bool: 124 """Remove an audio file from the robot. 125 126 This method removes an audio file from the robot. 127 128 Args: 129 name: The name of the audio file to remove. 130 """ 131 response = self._audio_stub.RemoveAudioFile(request=AudioFile(path=name)) 132 if response.success.value: 133 return True 134 else: 135 self._logger.error(f"Failed to remove file: {response.error}") 136 return False 137 138 def play_audio_file(self, name: str) -> None: 139 """Play an audio file on the robot. 140 141 This method plays an audio file on the robot. 142 143 Args: 144 name: The name of the audio file to play. 145 """ 146 self._audio_stub.PlayAudioFile(request=AudioFile(path=name)) 147 148 def stop_playing(self) -> None: 149 """Stop playing audio on the robot. 150 151 This method stops the audio that is currently playing on the robot. 152 """ 153 self._audio_stub.StopPlaying(Empty()) 154 155 def record_audio(self, name: str, duration_secs: float) -> bool: 156 """Record audio on the robot. 157 158 This method records audio on the robot. 159 160 Args: 161 name: name of the audio file. The extension defines the encoding. Ony ogg is supported. 162 duration_secs: duration of the recording in seconds. 163 """ 164 if not self._validate_extension(name, [".ogg"]): 165 self._logger.error("Invalid file type. Supported file type is .ogg") 166 return False 167 168 self._audio_stub.RecordAudioFile(request=AudioFile(path=name, duration=duration_secs)) 169 return True 170 171 def stop_recording(self) -> None: 172 """Stop recording audio on the robot. 173 174 This method stops the audio recording on the robot. 175 """ 176 self._audio_stub.StopRecording(Empty())
Audio class manages the microhpones and speaker on the robot.
It allows to play audio files, and record audio. Please note that the audio files are stored in a temporary folder on the robot and are deleted when the robot is turned off.
25 def __init__(self, host: str, port: int) -> None: 26 """Set up the audio module. 27 28 This initializes the gRPC channel for communicating with the audio service. 29 30 Args: 31 host: The host address for the gRPC service. 32 port: The port number for the gRPC service. 33 """ 34 self._logger = logging.getLogger(__name__) 35 self._grpc_audio_channel = grpc.insecure_channel(f"{host}:{port}") 36 self._host = host 37 38 self._audio_stub = AudioServiceStub(self._grpc_audio_channel)
Set up the audio module.
This initializes the gRPC channel for communicating with the audio service.
Arguments:
- host: The host address for the gRPC service.
- port: The port number for the gRPC service.
51 def upload_audio_file(self, path: str) -> bool: 52 """Upload an audio file to the robot. 53 54 This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot 55 and is deleted when the robot is turned off. 56 57 Args: 58 path: The path to the audio file to upload. 59 """ 60 if not self._validate_extension(path, [".wav", ".ogg", ".mp3"]): 61 self._logger.error("Invalid file type. Supported file types are .wav, .ogg, .mp3") 62 return False 63 64 if not os.path.exists(path): 65 self._logger.error(f"File does not exist: {path}") 66 return False 67 68 def generate_requests(file_path: str) -> Generator[AudioFileRequest, None, None]: 69 yield AudioFileRequest(info=AudioFile(path=os.path.basename(file_path))) 70 71 # 64KiB seems to be the size limit. see https://github.com/grpc/grpc.github.io/issues/371 72 CHUNK_SIZE = 64 * 1024 # 64 KB 73 74 with open(file_path, "rb") as file: 75 while True: 76 chunk = file.read(CHUNK_SIZE) 77 if not chunk: 78 break 79 yield AudioFileRequest(chunk_data=chunk) 80 81 response = self._audio_stub.UploadAudioFile(generate_requests(path)) 82 if response.success.value: 83 return True 84 else: 85 self._logger.error(f"Failed to upload file: {response.error}") 86 return False
Upload an audio file to the robot.
This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot and is deleted when the robot is turned off.
Arguments:
- path: The path to the audio file to upload.
88 def download_audio_file(self, name: str, path: str) -> bool: 89 """Download an audio file from the robot. 90 91 Args: 92 name: The name of the audio file to download. 93 path: The folder to save the downloaded audio file. 94 """ 95 response_iterator = self._audio_stub.DownloadAudioFile(AudioFile(path=name)) 96 97 file_name = None 98 buffer = BytesIO() 99 100 for response in response_iterator: 101 if response.WhichOneof("data") == "info": 102 file_name = response.info.path 103 elif response.WhichOneof("data") == "chunk_data": 104 buffer.write(response.chunk_data) 105 106 if file_name: 107 file_path = os.path.join(path, file_name) 108 with open(file_path, "wb") as file: 109 file.write(buffer.getvalue()) 110 return os.path.exists(file_path) 111 else: 112 return False
Download an audio file from the robot.
Arguments:
- name: The name of the audio file to download.
- path: The folder to save the downloaded audio file.
114 def get_audio_files(self) -> List[str]: 115 """Get audio files from the robot. 116 117 This method retrieves the list of audio files stored on the robot. 118 """ 119 files = self._audio_stub.GetAudioFiles(request=Empty()) 120 121 return [file.path for file in files.files]
Get audio files from the robot.
This method retrieves the list of audio files stored on the robot.
123 def remove_audio_file(self, name: str) -> bool: 124 """Remove an audio file from the robot. 125 126 This method removes an audio file from the robot. 127 128 Args: 129 name: The name of the audio file to remove. 130 """ 131 response = self._audio_stub.RemoveAudioFile(request=AudioFile(path=name)) 132 if response.success.value: 133 return True 134 else: 135 self._logger.error(f"Failed to remove file: {response.error}") 136 return False
Remove an audio file from the robot.
This method removes an audio file from the robot.
Arguments:
- name: The name of the audio file to remove.
138 def play_audio_file(self, name: str) -> None: 139 """Play an audio file on the robot. 140 141 This method plays an audio file on the robot. 142 143 Args: 144 name: The name of the audio file to play. 145 """ 146 self._audio_stub.PlayAudioFile(request=AudioFile(path=name))
Play an audio file on the robot.
This method plays an audio file on the robot.
Arguments:
- name: The name of the audio file to play.
148 def stop_playing(self) -> None: 149 """Stop playing audio on the robot. 150 151 This method stops the audio that is currently playing on the robot. 152 """ 153 self._audio_stub.StopPlaying(Empty())
Stop playing audio on the robot.
This method stops the audio that is currently playing on the robot.
155 def record_audio(self, name: str, duration_secs: float) -> bool: 156 """Record audio on the robot. 157 158 This method records audio on the robot. 159 160 Args: 161 name: name of the audio file. The extension defines the encoding. Ony ogg is supported. 162 duration_secs: duration of the recording in seconds. 163 """ 164 if not self._validate_extension(name, [".ogg"]): 165 self._logger.error("Invalid file type. Supported file type is .ogg") 166 return False 167 168 self._audio_stub.RecordAudioFile(request=AudioFile(path=name, duration=duration_secs)) 169 return True
Record audio on the robot.
This method records audio on the robot.
Arguments:
- name: name of the audio file. The extension defines the encoding. Ony ogg is supported.
- duration_secs: duration of the recording in seconds.
171 def stop_recording(self) -> None: 172 """Stop recording audio on the robot. 173 174 This method stops the audio recording on the robot. 175 """ 176 self._audio_stub.StopRecording(Empty())
Stop recording audio on the robot.
This method stops the audio recording on the robot.