1use std::{any::Any, collections::HashMap, f32::consts::E, sync::Arc, time::Duration};
2
3use super::pb::{
4 poulpe_multiplexer_client::PoulpeMultiplexerClient, PoulpeCommand, PoulpeCommands, PoulpeState,
5 StateStreamRequest,
6};
7use prost_types::Timestamp;
8use tokio::{
9 runtime::{Builder, Handle, Runtime},
10 sync::RwLock,
11 time::sleep,
12};
13use tonic::{transport::Uri, Request};
14
15use poulpe_ethercat_controller::{register::BoardStatus, state_machine::CiA402State};
16
17#[derive(Debug)]
18enum Command {
19 EmergencyStop(bool),
20 Compliancy(bool),
21 ModeOfOperation(u32),
22 TargetPosition(Vec<f32>),
23 TargetVelocity(Vec<f32>),
24 TargetTorque(Vec<f32>),
25 VelocityLimit(Vec<f32>),
26 TorqueLimit(Vec<f32>),
27}
28
29#[derive(Debug)]
30pub struct PoulpeRemoteClient {
31 pub ids: Vec<u16>,
32 pub names: Vec<String>,
33 rt: Arc<Runtime>,
34 addr: Uri,
35 state: Arc<RwLock<HashMap<u16, PoulpeState>>>,
36 command_buff: Arc<RwLock<HashMap<u16, Vec<Command>>>>,
37}
38
39impl PoulpeRemoteClient {
40 pub fn connect_with_name(
52 addr: Uri,
53 poulpe_names: Vec<String>,
54 update_period: Duration,
55 ) -> Result<Self, std::io::Error> {
56 let id_client = PoulpeIdClient::new(addr.clone());
58 let (all_ids, all_names) = match id_client.get_slaves() {
59 Ok((ids, names)) => (ids, names),
60 Err(e) => {
61 log::error!("Failed to connect to the server: {}", e);
62 return Err(std::io::Error::new(
63 std::io::ErrorKind::ConnectionRefused,
64 "Error in connecting to the server! Check if server is up!!!",
65 ));
66 }
67 };
68
69 let mut poulpe_ids = vec![];
71 for name in poulpe_names {
72 let id = all_names.iter().position(|n| n == &name);
74 match id {
75 Some(id) => poulpe_ids.push(all_ids[id]),
76 None => {
77 log::error!("Invalid poulpe name: {}", name);
78 return Err(std::io::Error::new(
79 std::io::ErrorKind::InvalidInput,
80 "Invalid poulpe name",
81 ));
82 }
83 }
84 }
85
86 PoulpeRemoteClient::connect(addr, poulpe_ids, update_period)
88 }
89
90 pub fn connect(
102 addr: Uri,
103 poulpe_ids: Vec<u16>,
104 update_period: Duration,
105 ) -> Result<Self, std::io::Error> {
106 let state = Arc::new(RwLock::new(HashMap::new()));
107 let state_lock = Arc::clone(&state);
108
109 let command_buff = Arc::new(RwLock::new(HashMap::new()));
110 let command_buff_lock = Arc::clone(&command_buff);
111
112 let rt = Arc::new(Builder::new_multi_thread().enable_all().build().unwrap());
114
115 let mut names = vec![];
117
118 let client = PoulpeIdClient::new(addr.clone());
120
121 let url = addr.to_string();
122 let ids = poulpe_ids.clone();
123 match client.get_slaves() {
126 Ok((available_ids, available_names)) => {
127 let mut common_ids = available_ids.clone();
128 let mut common_names = available_names.clone();
129 let mut retain_inds = vec![];
132 for id in poulpe_ids.iter() {
133 if let Some(ind) = available_ids.iter().position(|&x| x == *id) {
134 retain_inds.push(ind);
135 }
136 }
137 common_ids = retain_inds.iter().map(|&i| available_ids[i]).collect();
138 common_names = retain_inds
139 .iter()
140 .map(|&i| available_names[i].clone())
141 .collect();
142 if common_ids.len() != poulpe_ids.len() || common_names.len() != poulpe_ids.len() {
143 log::error!(
144 "Invalid poulpe_ids: {:?}, available_ids: {:?}",
145 poulpe_ids,
146 available_ids
147 );
148 return Err(std::io::Error::new(
149 std::io::ErrorKind::InvalidInput,
150 "Invalid poulpe_ids",
151 ));
152 }
153 names = common_names.clone();
155 }
156 Err(e) => {
157 log::error!(
158 "Error in connecting to the server! Check if server is up!!!\n {:?}",
159 e
160 );
161 return Err(std::io::Error::new(
162 std::io::ErrorKind::ConnectionRefused,
163 "Error in connecting to the server! Check if server is up!!!",
164 ));
165 }
166 }
167
168 rt.spawn(async move {
170 let mut client = match PoulpeMultiplexerClient::connect(url).await {
172 Ok(client) => client,
173 Err(e) => {
174 log::error!(
175 "Error in connecting to the server! Check if server is up!!!\n {:?}",
176 e
177 );
178 return;
179 }
180 };
181
182 let state_request = Request::new(StateStreamRequest {
184 ids: poulpe_ids.iter().map(|&id| id as i32).collect(),
185 update_period: update_period.as_secs_f32(),
186 });
187
188 let mut state_stream = client.get_states(state_request).await.unwrap().into_inner();
190
191 let command_stream = async_stream::stream! {
193 let mut interval = tokio::time::interval(update_period / 2);
195
196 loop {
197 interval.tick().await;
199 let mut cmd_map = command_buff_lock.write().await;
200 if let Some(commands) = extract_commands(&mut cmd_map) {
201 yield commands;
202 }
203 }
204 };
205
206 tokio::select! {
208 _ = async {
210 while let Some(poulpe_state) = state_stream.message().await.unwrap() {
211 log::debug!("Update state with {:?}", poulpe_state);
212 let mut state_buff = state_lock.write().await;
213 for s in poulpe_state.states {
214 state_buff.insert(s.id as u16, s);
215 }
216 }
217 } => {},
218
219 result = client.get_commands(Request::new(command_stream)) => {
221 match result {
222 Ok(_) => log::info!("Command stream ended"),
223 Err(e) => log::error!("Error in command stream: {:?}", e),
224 }
225 },
226 }
227 });
228
229 Ok(PoulpeRemoteClient {
230 ids,
231 names,
232 rt,
233 addr,
234 state,
235 command_buff,
236 })
237 }
238
239 pub fn get_poulpe_ids_sync(
241 &self,
242 ) -> Result<(Vec<u16>, Vec<String>), Box<dyn std::error::Error>> {
243 self.rt.block_on(async {
244 let mut client = PoulpeMultiplexerClient::connect(self.addr.to_string()).await?;
245 get_poulpe_ids_async(&mut client).await
246 })
247 }
248
249 pub fn get_poulpe_ids(&self) -> Vec<u16> {
251 self.rt
252 .block_on(self.state.read())
253 .keys()
254 .cloned()
255 .collect()
256 }
257
258 fn get_state_property<T, F>(&self, slave_id: u16, f: F, _default: T) -> Result<T, ()>
263 where
264 F: Fn(&PoulpeState) -> T,
265 {
266 let state = self.rt.block_on(self.state.read());
267 let state = state.get(&slave_id).ok_or_else(|| {
268 log::warn!("No state found for slave {}", slave_id);
269 })?;
270
271 if let Some(ts) = &state.published_timestamp {
272 if let Ok(systime) = std::time::SystemTime::try_from(ts.clone()) {
273 if systime.elapsed().unwrap().as_millis() > 1000 {
274 log::error!(
275 "State is older than 1s for slave {}, server maybe down!",
276 slave_id
277 );
278 #[cfg(feature = "stop_client_on_server_timeout")]
280 std::process::exit(10);
281 return Err(());
282 }
283 } else {
284 log::warn!("Cannot parse the timestamp, discarding message!");
285 #[cfg(feature = "stop_client_on_server_timeout")]
287 std::process::exit(10);
288 return Err(());
289 }
290 } else {
291 log::warn!("No timestamp found for slave {}", slave_id);
292 }
293
294 Ok(f(state))
295 }
296
297 pub fn get_position_actual_value(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
299 self.get_state_property(slave_id, |state| state.actual_position.clone(), vec![])
300 }
301
302 pub fn get_velocity_actual_value(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
304 self.get_state_property(slave_id, |state| state.actual_velocity.clone(), vec![])
305 }
306
307 pub fn get_torque_actual_value(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
309 self.get_state_property(slave_id, |state| state.actual_torque.clone(), vec![])
310 }
311
312 pub fn is_on(&self, slave_id: u16) -> Result<bool, ()> {
314 self.get_state_property(slave_id, |state| state.compliant, false)
315 }
316
317 pub fn get_target_position(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
319 self.get_state_property(
320 slave_id,
321 |state| state.requested_target_position.clone(),
322 vec![],
323 )
324 }
325
326 pub fn get_motor_temperatures(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
328 self.get_state_property(slave_id, |state| state.motor_temperatures.clone(), vec![])
329 }
330 pub fn get_board_temperatures(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
332 self.get_state_property(slave_id, |state| state.board_temperatures.clone(), vec![])
333 }
334
335 pub fn get_mode_of_operation(&self, slave_id: u16) -> Result<u32, ()> {
339 self.get_state_property(slave_id, |state| state.mode_of_operation as u32, 255)
340 }
341
342 pub fn get_state(&self, slave_id: u16) -> Result<u32, ()> {
344 match BoardStatus::from_cia402_to_board_status(
346 self.get_state_property(slave_id, |state| state.state, 255)?,
347 self.get_state_property(slave_id, |state| state.error_codes.clone(), vec![])?,
348 ) {
349 Ok(s) => Ok(s as u32),
350 Err(_) => Err(()),
351 }
352 }
354 pub fn get_cia402_state(&self, slave_id: u16) -> Result<u32, ()> {
356 self.get_state_property(slave_id, |state| state.state, 255)
357 }
358
359 pub fn get_torque_state(&self, slave_id: u16) -> Result<bool, ()> {
361 self.get_state_property(slave_id, |state| state.compliant, false)
362 }
363
364 pub fn get_axis_sensors(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
366 self.get_state_property(slave_id, |state| state.axis_sensors.clone(), vec![])
367 }
368
369 pub fn get_axis_sensor_zeros(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
371 self.get_state_property(slave_id, |state| state.axis_sensor_zeros.clone(), vec![])
372 }
373
374 pub fn get_error_codes(&self, slave_id: u16) -> Result<Vec<i32>, ()> {
376 self.get_state_property(slave_id, |state| state.error_codes.clone(), vec![])
377 }
378
379 pub fn get_velocity_limit(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
381 self.get_state_property(
382 slave_id,
383 |state| state.requested_velocity_limit.clone(),
384 vec![],
385 )
386 }
387 pub fn get_torque_limit(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
389 self.get_state_property(
390 slave_id,
391 |state| state.requested_torque_limit.clone(),
392 vec![],
393 )
394 }
395
396 fn push_command(&mut self, slave_id: u16, command: Command) -> Result<(), ()> {
398 self.rt
399 .block_on(self.command_buff.write())
400 .entry(slave_id)
401 .or_insert_with(Vec::new)
402 .push(command);
403 Ok(())
404 }
405
406 pub fn turn_on(&mut self, slave_id: u16) {
408 self.push_command(slave_id, Command::Compliancy(false));
409 }
410
411 pub fn turn_off(&mut self, slave_id: u16) {
413 self.push_command(slave_id, Command::Compliancy(true));
414 }
415
416 pub fn set_mode_of_operation(&mut self, slave_id: u16, mode: u32) {
420 self.push_command(slave_id, Command::ModeOfOperation(mode));
421 }
422
423 pub fn set_target_position(&mut self, slave_id: u16, target_position: Vec<f32>) {
425 self.push_command(slave_id, Command::TargetPosition(target_position));
426 }
427 pub fn set_target_velocity(&mut self, slave_id: u16, target_velocity: Vec<f32>) {
429 self.push_command(slave_id, Command::TargetVelocity(target_velocity));
430 }
431 pub fn set_target_torque(&mut self, slave_id: u16, target_torque: Vec<f32>) {
433 self.push_command(slave_id, Command::TargetTorque(target_torque));
434 }
435
436 pub fn set_velocity_limit(&mut self, slave_id: u16, velocity_limit: Vec<f32>) {
438 self.push_command(slave_id, Command::VelocityLimit(velocity_limit));
439 }
440 pub fn set_torque_limit(&mut self, slave_id: u16, torque_limit: Vec<f32>) {
442 self.push_command(slave_id, Command::TorqueLimit(torque_limit));
443 }
444 pub fn emergency_stop(&mut self, slave_id: u16) {
446 self.push_command(slave_id, Command::EmergencyStop(true));
447 }
448}
449
450fn extract_commands(buff: &mut HashMap<u16, Vec<Command>>) -> Option<PoulpeCommands> {
454 if buff.is_empty() {
455 return None;
456 }
457
458 let mut commands = vec![];
459
460 for (&id, cmds) in buff.iter() {
461 let mut poulpe_cmd = PoulpeCommand {
462 id: id.into(),
463 ..Default::default()
464 };
465 for cmd in cmds {
466 match cmd {
467 Command::EmergencyStop(stop) => poulpe_cmd.emergency_stop = Some(*stop),
468 Command::Compliancy(comp) => poulpe_cmd.compliancy = Some(*comp),
469 Command::ModeOfOperation(mode) => poulpe_cmd.mode_of_operation = *mode as i32,
470 Command::TargetPosition(pos) => {
471 if pos.len() != 0 {
472 poulpe_cmd.target_position.extend(pos.iter().cloned());
473 }
474 }
475 Command::TargetVelocity(vel) => {
476 if vel.len() != 0 {
477 poulpe_cmd.target_velocity.extend(vel.iter().cloned());
478 }
479 }
480 Command::TargetTorque(torque) => {
481 if torque.len() != 0 {
482 poulpe_cmd.target_torque.extend(torque.iter().cloned());
483 }
484 }
485 Command::VelocityLimit(vel) => {
486 if vel.len() != 0 {
487 poulpe_cmd.velocity_limit.extend(vel.iter().cloned());
488 }
489 }
490 Command::TorqueLimit(torque) => {
491 if torque.len() != 0 {
492 poulpe_cmd.torque_limit.extend(torque.iter().cloned());
493 }
494 }
495 }
496 }
497 poulpe_cmd.published_timestamp = Some(Timestamp::from(std::time::SystemTime::now()));
498 commands.push(poulpe_cmd);
499 }
500
501 buff.clear();
502
503 Some(PoulpeCommands { commands })
504}
505
506pub async fn get_poulpe_ids_async(
508 client: &mut PoulpeMultiplexerClient<tonic::transport::Channel>,
509) -> Result<(Vec<u16>, Vec<String>), Box<dyn std::error::Error>> {
510 let response = client.get_poulpe_ids(Request::new(())).await?;
511 let response = response.into_inner();
512 let ids: Vec<u16> = response.ids.into_iter().map(|id| id as u16).collect();
513 let names: Vec<String> = response
514 .names
515 .into_iter()
516 .map(|name: String| name as String)
517 .collect();
518 Ok((ids, names))
519}
520
521#[derive(Debug)]
527pub struct PoulpeIdClient {
528 rt: Arc<Runtime>,
529 addr: Uri,
530}
531impl PoulpeIdClient {
532 pub fn new(addr: Uri) -> Self {
533 let rt = Arc::new(Builder::new_multi_thread().enable_all().build().unwrap());
534 PoulpeIdClient { rt, addr }
535 }
536
537 pub fn get_slaves(&self) -> Result<(Vec<u16>, Vec<String>), Box<dyn std::error::Error>> {
539 self.rt.block_on(async {
540 let mut client = PoulpeMultiplexerClient::connect(self.addr.to_string()).await?;
541 get_poulpe_ids_async(&mut client).await
542 })
543 }
544}