reachy2_sdk.media.audio

Reachy Audio module.

Enable access to the microphones and speaker.

  1"""Reachy Audio module.
  2
  3Enable access to the microphones and speaker.
  4"""
  5
  6import logging
  7import os
  8from io import BytesIO
  9from typing import Generator, List
 10
 11import grpc
 12from google.protobuf.empty_pb2 import Empty
 13from reachy2_sdk_api.audio_pb2 import AudioFile, AudioFileRequest
 14from reachy2_sdk_api.audio_pb2_grpc import AudioServiceStub
 15
 16
 17class Audio:
 18    """Audio class manages the microhpones and speaker on the robot.
 19
 20    It allows to play audio files, and record audio. Please note that the audio files are stored in a
 21    temporary folder on the robot and are deleted when the robot is turned off.
 22    """
 23
 24    def __init__(self, host: str, port: int) -> None:
 25        """Set up the audio module.
 26
 27        This initializes the gRPC channel for communicating with the audio service.
 28
 29        Args:
 30            host: The host address for the gRPC service.
 31            port: The port number for the gRPC service.
 32        """
 33        self._logger = logging.getLogger(__name__)
 34        self._grpc_audio_channel = grpc.insecure_channel(f"{host}:{port}")
 35        self._host = host
 36
 37        self._audio_stub = AudioServiceStub(self._grpc_audio_channel)
 38
 39    def _validate_extension(self, path: str, valid_extensions: List[str]) -> bool:
 40        """Validate the file type and return the file name if valid.
 41
 42        Args:
 43            path: The path to the audio file.
 44
 45        Returns:
 46            The file name if the file type is valid, otherwise None.
 47        """
 48        return path.lower().endswith(tuple(valid_extensions))
 49
 50    def upload_audio_file(self, path: str) -> bool:
 51        """Upload an audio file to the robot.
 52
 53        This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot
 54        and is deleted when the robot is turned off.
 55
 56        Args:
 57            path: The path to the audio file to upload.
 58        """
 59        if not self._validate_extension(path, [".wav", ".ogg", ".mp3"]):
 60            self._logger.error("Invalid file type. Supported file types are .wav, .ogg, .mp3")
 61            return False
 62
 63        if not os.path.exists(path):
 64            self._logger.error(f"File does not exist: {path}")
 65            return False
 66
 67        def generate_requests(file_path: str) -> Generator[AudioFileRequest, None, None]:
 68            yield AudioFileRequest(info=AudioFile(path=os.path.basename(file_path)))
 69
 70            # 64KiB seems to be the size limit. see https://github.com/grpc/grpc.github.io/issues/371
 71            CHUNK_SIZE = 64 * 1024  # 64 KB
 72
 73            with open(file_path, "rb") as file:
 74                while True:
 75                    chunk = file.read(CHUNK_SIZE)
 76                    if not chunk:
 77                        break
 78                    yield AudioFileRequest(chunk_data=chunk)
 79
 80        response = self._audio_stub.UploadAudioFile(generate_requests(path))
 81        if response.success.value:
 82            return True
 83        else:
 84            self._logger.error(f"Failed to upload file: {response.error}")
 85            return False
 86
 87    def download_audio_file(self, name: str, path: str) -> bool:
 88        """Download an audio file from the robot.
 89
 90        Args:
 91            name: The name of the audio file to download.
 92            path: The folder to save the downloaded audio file.
 93        """
 94        response_iterator = self._audio_stub.DownloadAudioFile(AudioFile(path=name))
 95
 96        file_name = None
 97        buffer = BytesIO()
 98
 99        for response in response_iterator:
100            if response.WhichOneof("data") == "info":
101                file_name = response.info.path
102            elif response.WhichOneof("data") == "chunk_data":
103                buffer.write(response.chunk_data)
104
105        if file_name:
106            file_path = os.path.join(path, file_name)
107            with open(file_path, "wb") as file:
108                file.write(buffer.getvalue())
109            return os.path.exists(file_path)
110        else:
111            return False
112
113    def get_audio_files(self) -> List[str]:
114        """Get audio files from the robot.
115
116        This method retrieves the list of audio files stored on the robot.
117        """
118        files = self._audio_stub.GetAudioFiles(request=Empty())
119
120        return [file.path for file in files.files]
121
122    def remove_audio_file(self, name: str) -> bool:
123        """Remove an audio file from the robot.
124
125        This method removes an audio file from the robot.
126
127        Args:
128            name: The name of the audio file to remove.
129        """
130        response = self._audio_stub.RemoveAudioFile(request=AudioFile(path=name))
131        if response.success.value:
132            return True
133        else:
134            self._logger.error(f"Failed to remove file: {response.error}")
135            return False
136
137    def play_audio_file(self, name: str) -> None:
138        """Play an audio file on the robot.
139
140        This method plays an audio file on the robot.
141
142        Args:
143            name: The name of the audio file to play.
144        """
145        self._audio_stub.PlayAudioFile(request=AudioFile(path=name))
146
147    def stop_playing(self) -> None:
148        """Stop playing audio on the robot.
149
150        This method stops the audio that is currently playing on the robot.
151        """
152        self._audio_stub.StopPlaying(Empty())
153
154    def record_audio(self, name: str, duration_secs: float) -> bool:
155        """Record audio on the robot.
156
157        This method records audio on the robot.
158
159        Args:
160            name: name of the audio file. The extension defines the encoding. Ony ogg is supported.
161            duration_secs: duration of the recording in seconds.
162        """
163        if not self._validate_extension(name, [".ogg"]):
164            self._logger.error("Invalid file type. Supported file type is .ogg")
165            return False
166
167        self._audio_stub.RecordAudioFile(request=AudioFile(path=name, duration=duration_secs))
168        return True
169
170    def stop_recording(self) -> None:
171        """Stop recording audio on the robot.
172
173        This method stops the audio recording on the robot.
174        """
175        self._audio_stub.StopRecording(Empty())
class Audio:
 18class Audio:
 19    """Audio class manages the microhpones and speaker on the robot.
 20
 21    It allows to play audio files, and record audio. Please note that the audio files are stored in a
 22    temporary folder on the robot and are deleted when the robot is turned off.
 23    """
 24
 25    def __init__(self, host: str, port: int) -> None:
 26        """Set up the audio module.
 27
 28        This initializes the gRPC channel for communicating with the audio service.
 29
 30        Args:
 31            host: The host address for the gRPC service.
 32            port: The port number for the gRPC service.
 33        """
 34        self._logger = logging.getLogger(__name__)
 35        self._grpc_audio_channel = grpc.insecure_channel(f"{host}:{port}")
 36        self._host = host
 37
 38        self._audio_stub = AudioServiceStub(self._grpc_audio_channel)
 39
 40    def _validate_extension(self, path: str, valid_extensions: List[str]) -> bool:
 41        """Validate the file type and return the file name if valid.
 42
 43        Args:
 44            path: The path to the audio file.
 45
 46        Returns:
 47            The file name if the file type is valid, otherwise None.
 48        """
 49        return path.lower().endswith(tuple(valid_extensions))
 50
 51    def upload_audio_file(self, path: str) -> bool:
 52        """Upload an audio file to the robot.
 53
 54        This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot
 55        and is deleted when the robot is turned off.
 56
 57        Args:
 58            path: The path to the audio file to upload.
 59        """
 60        if not self._validate_extension(path, [".wav", ".ogg", ".mp3"]):
 61            self._logger.error("Invalid file type. Supported file types are .wav, .ogg, .mp3")
 62            return False
 63
 64        if not os.path.exists(path):
 65            self._logger.error(f"File does not exist: {path}")
 66            return False
 67
 68        def generate_requests(file_path: str) -> Generator[AudioFileRequest, None, None]:
 69            yield AudioFileRequest(info=AudioFile(path=os.path.basename(file_path)))
 70
 71            # 64KiB seems to be the size limit. see https://github.com/grpc/grpc.github.io/issues/371
 72            CHUNK_SIZE = 64 * 1024  # 64 KB
 73
 74            with open(file_path, "rb") as file:
 75                while True:
 76                    chunk = file.read(CHUNK_SIZE)
 77                    if not chunk:
 78                        break
 79                    yield AudioFileRequest(chunk_data=chunk)
 80
 81        response = self._audio_stub.UploadAudioFile(generate_requests(path))
 82        if response.success.value:
 83            return True
 84        else:
 85            self._logger.error(f"Failed to upload file: {response.error}")
 86            return False
 87
 88    def download_audio_file(self, name: str, path: str) -> bool:
 89        """Download an audio file from the robot.
 90
 91        Args:
 92            name: The name of the audio file to download.
 93            path: The folder to save the downloaded audio file.
 94        """
 95        response_iterator = self._audio_stub.DownloadAudioFile(AudioFile(path=name))
 96
 97        file_name = None
 98        buffer = BytesIO()
 99
100        for response in response_iterator:
101            if response.WhichOneof("data") == "info":
102                file_name = response.info.path
103            elif response.WhichOneof("data") == "chunk_data":
104                buffer.write(response.chunk_data)
105
106        if file_name:
107            file_path = os.path.join(path, file_name)
108            with open(file_path, "wb") as file:
109                file.write(buffer.getvalue())
110            return os.path.exists(file_path)
111        else:
112            return False
113
114    def get_audio_files(self) -> List[str]:
115        """Get audio files from the robot.
116
117        This method retrieves the list of audio files stored on the robot.
118        """
119        files = self._audio_stub.GetAudioFiles(request=Empty())
120
121        return [file.path for file in files.files]
122
123    def remove_audio_file(self, name: str) -> bool:
124        """Remove an audio file from the robot.
125
126        This method removes an audio file from the robot.
127
128        Args:
129            name: The name of the audio file to remove.
130        """
131        response = self._audio_stub.RemoveAudioFile(request=AudioFile(path=name))
132        if response.success.value:
133            return True
134        else:
135            self._logger.error(f"Failed to remove file: {response.error}")
136            return False
137
138    def play_audio_file(self, name: str) -> None:
139        """Play an audio file on the robot.
140
141        This method plays an audio file on the robot.
142
143        Args:
144            name: The name of the audio file to play.
145        """
146        self._audio_stub.PlayAudioFile(request=AudioFile(path=name))
147
148    def stop_playing(self) -> None:
149        """Stop playing audio on the robot.
150
151        This method stops the audio that is currently playing on the robot.
152        """
153        self._audio_stub.StopPlaying(Empty())
154
155    def record_audio(self, name: str, duration_secs: float) -> bool:
156        """Record audio on the robot.
157
158        This method records audio on the robot.
159
160        Args:
161            name: name of the audio file. The extension defines the encoding. Ony ogg is supported.
162            duration_secs: duration of the recording in seconds.
163        """
164        if not self._validate_extension(name, [".ogg"]):
165            self._logger.error("Invalid file type. Supported file type is .ogg")
166            return False
167
168        self._audio_stub.RecordAudioFile(request=AudioFile(path=name, duration=duration_secs))
169        return True
170
171    def stop_recording(self) -> None:
172        """Stop recording audio on the robot.
173
174        This method stops the audio recording on the robot.
175        """
176        self._audio_stub.StopRecording(Empty())

Audio class manages the microhpones and speaker on the robot.

It allows to play audio files, and record audio. Please note that the audio files are stored in a temporary folder on the robot and are deleted when the robot is turned off.

Audio(host: str, port: int)
25    def __init__(self, host: str, port: int) -> None:
26        """Set up the audio module.
27
28        This initializes the gRPC channel for communicating with the audio service.
29
30        Args:
31            host: The host address for the gRPC service.
32            port: The port number for the gRPC service.
33        """
34        self._logger = logging.getLogger(__name__)
35        self._grpc_audio_channel = grpc.insecure_channel(f"{host}:{port}")
36        self._host = host
37
38        self._audio_stub = AudioServiceStub(self._grpc_audio_channel)

Set up the audio module.

This initializes the gRPC channel for communicating with the audio service.

Arguments:
  • host: The host address for the gRPC service.
  • port: The port number for the gRPC service.
def upload_audio_file(self, path: str) -> bool:
51    def upload_audio_file(self, path: str) -> bool:
52        """Upload an audio file to the robot.
53
54        This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot
55        and is deleted when the robot is turned off.
56
57        Args:
58            path: The path to the audio file to upload.
59        """
60        if not self._validate_extension(path, [".wav", ".ogg", ".mp3"]):
61            self._logger.error("Invalid file type. Supported file types are .wav, .ogg, .mp3")
62            return False
63
64        if not os.path.exists(path):
65            self._logger.error(f"File does not exist: {path}")
66            return False
67
68        def generate_requests(file_path: str) -> Generator[AudioFileRequest, None, None]:
69            yield AudioFileRequest(info=AudioFile(path=os.path.basename(file_path)))
70
71            # 64KiB seems to be the size limit. see https://github.com/grpc/grpc.github.io/issues/371
72            CHUNK_SIZE = 64 * 1024  # 64 KB
73
74            with open(file_path, "rb") as file:
75                while True:
76                    chunk = file.read(CHUNK_SIZE)
77                    if not chunk:
78                        break
79                    yield AudioFileRequest(chunk_data=chunk)
80
81        response = self._audio_stub.UploadAudioFile(generate_requests(path))
82        if response.success.value:
83            return True
84        else:
85            self._logger.error(f"Failed to upload file: {response.error}")
86            return False

Upload an audio file to the robot.

This method uploads an audio file to the robot. The audio file is stored in a temporary folder on the robot and is deleted when the robot is turned off.

Arguments:
  • path: The path to the audio file to upload.
def download_audio_file(self, name: str, path: str) -> bool:
 88    def download_audio_file(self, name: str, path: str) -> bool:
 89        """Download an audio file from the robot.
 90
 91        Args:
 92            name: The name of the audio file to download.
 93            path: The folder to save the downloaded audio file.
 94        """
 95        response_iterator = self._audio_stub.DownloadAudioFile(AudioFile(path=name))
 96
 97        file_name = None
 98        buffer = BytesIO()
 99
100        for response in response_iterator:
101            if response.WhichOneof("data") == "info":
102                file_name = response.info.path
103            elif response.WhichOneof("data") == "chunk_data":
104                buffer.write(response.chunk_data)
105
106        if file_name:
107            file_path = os.path.join(path, file_name)
108            with open(file_path, "wb") as file:
109                file.write(buffer.getvalue())
110            return os.path.exists(file_path)
111        else:
112            return False

Download an audio file from the robot.

Arguments:
  • name: The name of the audio file to download.
  • path: The folder to save the downloaded audio file.
def get_audio_files(self) -> List[str]:
114    def get_audio_files(self) -> List[str]:
115        """Get audio files from the robot.
116
117        This method retrieves the list of audio files stored on the robot.
118        """
119        files = self._audio_stub.GetAudioFiles(request=Empty())
120
121        return [file.path for file in files.files]

Get audio files from the robot.

This method retrieves the list of audio files stored on the robot.

def remove_audio_file(self, name: str) -> bool:
123    def remove_audio_file(self, name: str) -> bool:
124        """Remove an audio file from the robot.
125
126        This method removes an audio file from the robot.
127
128        Args:
129            name: The name of the audio file to remove.
130        """
131        response = self._audio_stub.RemoveAudioFile(request=AudioFile(path=name))
132        if response.success.value:
133            return True
134        else:
135            self._logger.error(f"Failed to remove file: {response.error}")
136            return False

Remove an audio file from the robot.

This method removes an audio file from the robot.

Arguments:
  • name: The name of the audio file to remove.
def play_audio_file(self, name: str) -> None:
138    def play_audio_file(self, name: str) -> None:
139        """Play an audio file on the robot.
140
141        This method plays an audio file on the robot.
142
143        Args:
144            name: The name of the audio file to play.
145        """
146        self._audio_stub.PlayAudioFile(request=AudioFile(path=name))

Play an audio file on the robot.

This method plays an audio file on the robot.

Arguments:
  • name: The name of the audio file to play.
def stop_playing(self) -> None:
148    def stop_playing(self) -> None:
149        """Stop playing audio on the robot.
150
151        This method stops the audio that is currently playing on the robot.
152        """
153        self._audio_stub.StopPlaying(Empty())

Stop playing audio on the robot.

This method stops the audio that is currently playing on the robot.

def record_audio(self, name: str, duration_secs: float) -> bool:
155    def record_audio(self, name: str, duration_secs: float) -> bool:
156        """Record audio on the robot.
157
158        This method records audio on the robot.
159
160        Args:
161            name: name of the audio file. The extension defines the encoding. Ony ogg is supported.
162            duration_secs: duration of the recording in seconds.
163        """
164        if not self._validate_extension(name, [".ogg"]):
165            self._logger.error("Invalid file type. Supported file type is .ogg")
166            return False
167
168        self._audio_stub.RecordAudioFile(request=AudioFile(path=name, duration=duration_secs))
169        return True

Record audio on the robot.

This method records audio on the robot.

Arguments:
  • name: name of the audio file. The extension defines the encoding. Ony ogg is supported.
  • duration_secs: duration of the recording in seconds.
def stop_recording(self) -> None:
171    def stop_recording(self) -> None:
172        """Stop recording audio on the robot.
173
174        This method stops the audio recording on the robot.
175        """
176        self._audio_stub.StopRecording(Empty())

Stop recording audio on the robot.

This method stops the audio recording on the robot.