1use std::{
2 env,
3 f32::consts::E,
4 mem::take,
5 sync::Arc,
6 task::Context,
7 time::{Duration, SystemTime},
8};
9
10use poulpe_ethercat_controller::{state_machine::CiA402State, PoulpeController};
11use tokio::{
12 sync::mpsc,
13 time::{error::Elapsed, sleep},
14};
15use tokio_stream::{wrappers::ReceiverStream, StreamExt};
16use tonic::{transport::Server, Request, Response, Status, Streaming};
17
18use crate::pb::{
19 poulpe_multiplexer_server::{PoulpeMultiplexer, PoulpeMultiplexerServer},
20 PoulpeCommands, PoulpeIds, PoulpeState, PoulpeStates, StateStreamRequest,
21};
22
23use prost_types::Timestamp;
24#[derive(Debug)]
25struct PoulpeMultiplexerService {
26 controller: Arc<PoulpeController>,
27}
28
29fn get_state_for_id(
30 controller: &PoulpeController,
31 id: i32,
32) -> Result<PoulpeState, Box<dyn std::error::Error>> {
33 let slave_id = id as u32;
34
35 if controller.is_slave_ready(slave_id as u16) == false {
37 log::error!("Slave (id: {}) not ready!", slave_id);
38 return Err(("Slave not ready!").into());
39 }
40
41 Ok(PoulpeState {
42 id,
43 mode_of_operation: match controller.get_mode_of_operation_display(slave_id as u16) {
44 Ok(mode) => mode as i32,
45 _ => {
46 log::error!("Failed to get mode of operation for slave {}", slave_id);
47 return Err("Failed to get mode of operation".into());
48 }
49 },
50 actual_position: match controller.get_current_position(slave_id) {
51 Ok(Some(pos)) => pos,
52 _ => {
53 log::error!("Failed to get actual position for slave {}", slave_id);
54 return Err("Failed to get actual position".into());
55 }
56 },
57 actual_velocity: match controller.get_current_velocity(slave_id) {
58 Ok(Some(vel)) => vel,
59 _ => {
60 log::error!("Failed to get actual velocity for slave {}", slave_id);
61 return Err("Failed to get actual velocity".into());
62 }
63 },
64 actual_torque: match controller.get_current_torque(slave_id) {
65 Ok(Some(torque)) => torque,
66 _ => {
67 log::error!("Failed to get actual torque for slave {}", slave_id);
68 return Err("Failed to get actual torque".into());
69 }
70 },
71 axis_sensors: match controller.get_current_axis_sensors(slave_id) {
72 Ok(Some(sensor)) => sensor,
73 _ => {
74 log::error!("Failed to get axis sensor for slave {}", slave_id);
75 return Err("Failed to get axis sensor".into());
76 }
77 },
78 axis_sensor_zeros: match controller.get_axis_sensor_zeros(slave_id) {
79 Ok(Some(sensor)) => sensor,
80 _ => {
81 log::error!("Failed to get axis sensor zeros for slave {}", slave_id);
82 return Err("Failed to get axis sensor zeros".into());
83 }
84 },
85 board_temperatures: match controller.get_board_temperatures(slave_id) {
86 Ok(Some(temps)) => temps,
87 _ => {
88 log::error!("Failed to get board temperatures for slave {}", slave_id);
89 return Err("Failed to get board temperatures".into());
90 }
91 },
92 motor_temperatures: match controller.get_motor_temperatures(slave_id) {
93 Ok(Some(temps)) => temps,
94 _ => {
95 log::error!("Failed to get motor temperatures for slave {}", slave_id);
96 return Err("Failed to get motor temperatures".into());
97 }
98 },
99 requested_target_position: match controller.get_current_target_position(slave_id) {
100 Ok(Some(pos)) => pos,
101 _ => {
102 log::error!(
103 "Failed to get requested target position for slave {}",
104 slave_id
105 );
106 return Err("Failed to get requested target position".into());
107 }
108 },
109 requested_velocity_limit: match controller.get_current_velocity_limit(slave_id) {
110 Ok(Some(velocity_limit)) => velocity_limit,
111 _ => {
112 log::error!(
113 "Failed to get requested velcity limit for slave {}",
114 slave_id
115 );
116 return Err("Failed to get requested velcity limit".into());
117 }
118 },
119 requested_torque_limit: match controller.get_current_torque_limit(slave_id) {
120 Ok(Some(troque_limit)) => troque_limit,
121 _ => {
122 log::error!(
123 "Failed to get requested torque limit for slave {}",
124 slave_id
125 );
126 return Err("Failed to get requested torque limit".into());
127 }
128 },
129 state: match controller.get_status(slave_id) {
130 Ok(state) => state as u32,
131 _ => {
132 log::error!("Failed to get state for slave {}", slave_id);
133 return Err("Failed to get state".into());
134 }
135 },
136 error_codes: match controller.get_error_codes(slave_id) {
137 Ok(error_codes) => error_codes.iter().map(|x| *x as i32).collect(),
138 _ => {
139 log::error!("Failed to get error codes for slave {}", slave_id);
140 return Err("Failed to get error codes".into());
141 }
142 },
143 compliant: match controller.is_torque_on(slave_id) {
144 Ok(Some(state)) => state,
145 _ => {
146 log::error!("Failed to get torque state for slave {}", slave_id);
147 return Err("Failed to get torque state".into());
148 }
149 },
150 published_timestamp: Some(Timestamp::from(std::time::SystemTime::now())),
151 })
152}
153
154#[tonic::async_trait]
155impl PoulpeMultiplexer for PoulpeMultiplexerService {
156 async fn get_poulpe_ids(&self, _request: Request<()>) -> Result<Response<PoulpeIds>, Status> {
157 let reply = PoulpeIds {
158 ids: self
159 .controller
160 .get_slave_ids()
161 .iter()
162 .map(|&id| id as i32)
163 .collect(),
164 names: self.controller.get_slave_names(),
165 };
166
167 Ok(Response::new(reply))
168 }
169
170 type GetStatesStream = ReceiverStream<Result<PoulpeStates, Status>>;
171
172 async fn get_states(
173 &self,
174 request: Request<StateStreamRequest>,
175 ) -> Result<Response<Self::GetStatesStream>, Status> {
176 let (tx, rx) = mpsc::channel(2);
177
178 let controller = Arc::clone(&self.controller);
179
180 let ids = request.get_ref().ids.clone();
181 for id in ids.clone() {
182 let slave_id = id as u32;
183 log::info!(
184 "Setup Slave {} (id: {})...",
185 controller.get_slave_name(slave_id as u16).unwrap(),
186 slave_id
187 );
188 match controller.setup(slave_id) {
189 Ok(_) => log::info!("Done!"),
190 Err(e) => {
191 log::error!("Failed to setup slave {}: {}", slave_id, e);
192 return Err(Status::internal("Failed to setup slaves"));
193 }
194 }
195 }
196
197 log::info!(
198 "New client - update period of {}s",
199 request.get_ref().update_period
200 );
201
202 tokio::spawn(async move {
203 let request = request.get_ref();
204 let mut interval =
206 tokio::time::interval(Duration::from_secs_f32(request.update_period));
207
208 let mut last_state = poule_empty_state();
211
212 let mut nb_states = 0;
214 let mut t_debug = tokio::time::Instant::now();
215 let mut report_display_time = 5.0; loop {
220 interval.tick().await;
222
223 let states = PoulpeStates {
224 states: request
225 .ids
226 .iter()
227 .map(|&id| match get_state_for_id(&controller, id) {
228 Ok(state) => {
229 last_state = state.clone();
230 state
231 }
232 Err(e) => {
233 log::error!("Failed to get state for slave {}: {}", id, e);
234 last_state.clone()
235 }
236 })
237 .collect(),
238 };
239 if tx.send(Ok(states)).await.is_err() {
240 break;
241 }
242
243 nb_states += 1;
246 if t_debug.elapsed().as_secs_f32() > report_display_time {
247 log::info!(
248 "GRPC EtherCAT Slave {} | states sent {} req/s",
249 request.ids[0],
250 nb_states as f32 / t_debug.elapsed().as_secs_f32()
251 );
252 nb_states = 0;
253 t_debug = tokio::time::Instant::now();
254 report_display_time = 30.0; }
256 }
257 });
258
259 Ok(Response::new(ReceiverStream::new(rx)))
260 }
261
262 async fn get_commands(
263 &self,
264 request: Request<Streaming<PoulpeCommands>>,
265 ) -> Result<Response<()>, Status> {
266 let mut stream = request.into_inner();
267
268 let mut t_debug = tokio::time::Instant::now();
271 let mut nb_commands = 0;
273 let mut nb_dropped = 0;
274 let mut t_command = tokio::time::Instant::now();
276 let mut dt_command_max: f32 = 0.0;
278 let mut report_display_time = 5.0; let no_slaves: usize = self.controller.get_slave_ids().len();
284 let mut nb_errors = vec![0; no_slaves];
286 let mut nb_errors_max = 1000;
287
288 while let Some(Ok(req)) = stream.next().await {
289 let mut slave_id: u32 = req.commands[0].id as u32;
290 if self.controller.is_slave_ready(slave_id as u16) == false {
292 log::error!("Slave (id: {}) not ready!", slave_id);
293 nb_dropped += 1;
294 continue;
295 }
296
297 match self.controller.get_status(slave_id) {
299 Ok(CiA402State::Fault) | Ok(CiA402State::FaultReactionActive) => {
300 let i = slave_id as usize;
301 nb_errors[i] += 1;
302 if nb_errors[i] % nb_errors_max == 0 {
303 log::error!(
304 "Slave {} (name {}) is in fault state for {}s,\n {:#x?}",
305 slave_id,
306 self.controller.get_slave_name(slave_id as u16).unwrap(),
307 nb_errors[i] as f32 * 1e-3,
308 self.controller.get_error_flags(slave_id as u16).unwrap()
309 );
310
311 #[cfg(feature = "qucik_stop_on_slave_fault")]
312 {
313 log::warn!("Sending emergency stop on all slaves!");
314 self.controller
316 .emergency_stop_all(slave_id as u16)
317 .unwrap_or_else(|e| {
318 log::error!("Failed to quick stop all slaves: {}", e);
319 });
320 }
321 #[cfg(feature = "stop_server_on_actuator_error")]
322 {
323 log::error!("Stopping the GRPC master!");
324 std::process::exit(10);
326 }
327 nb_errors_max = 5000;
329 }
330 continue;
331 }
332 _ => {} }
334
335 log::debug!("Got commands {:?}", req);
336 for cmd in req.commands {
337 slave_id = cmd.id as u32;
338
339 match cmd.emergency_stop {
341 Some(true) => {
342 self.controller
343 .emergency_stop(slave_id)
344 .unwrap_or_else(|e| {
345 log::error!(
346 "Failed to set emergency stop for slave {}: {}",
347 slave_id,
348 e
349 )
350 });
351 continue;
352 }
353 _ => (),
354 }
355
356 let mut target_pos = cmd.target_position;
357 match cmd.published_timestamp {
358 Some(published_time) => {
359 let published_time = match SystemTime::try_from(published_time) {
360 Ok(systime) => systime,
361 Err(_) => {
362 log::warn!("Cannot parse the timestamp, discarding message!");
363 continue;
364 }
365 };
366 if self
368 .controller
369 .check_if_too_old(published_time.elapsed().unwrap())
370 {
371 nb_dropped += 1;
372 continue;
373 }
374 }
375 None => {
376 log::warn!("No published timestamp, discarding message!");
377 continue;
378 }
379 }
380
381 let no_axis = self.controller.get_orbita_type(slave_id) as usize;
382
383 let mut set_compliant = cmd.compliancy;
384
385 let mode_of_operation = cmd.mode_of_operation;
386 if mode_of_operation != 0 {
387 self.controller
388 .set_mode_of_operation(slave_id as u16, mode_of_operation as u8)
389 .unwrap_or_else(|e| {
390 log::error!(
391 "Failed to set mode of operation for slave {}: {}",
392 slave_id,
393 e
394 )
395 });
396 }
397
398 if target_pos.len() != 0 {
399 self.controller
401 .set_target_position(
402 slave_id,
403 target_pos[(target_pos.len() - no_axis)..].to_vec(),
404 )
405 .unwrap_or_else(|e| {
406 log::error!(
407 "Failed to set target position for slave {}: {}",
408 slave_id,
409 e
410 );
411 set_compliant = Some(true); });
413 }
414 let velocity_limit = cmd.velocity_limit;
415 if velocity_limit.len() != 0 {
416 self.controller
418 .set_velocity_limit(
419 slave_id,
420 velocity_limit[(velocity_limit.len() - no_axis)..].to_vec(),
421 )
422 .unwrap_or_else(|e| {
423 log::error!(
424 "Failed to set velocity limit for slave {}: {}",
425 slave_id,
426 e
427 );
428 set_compliant = Some(true); });
430 }
431 let torque_limit = cmd.torque_limit;
432 if torque_limit.len() != 0 {
433 self.controller
435 .set_torque_limit(
436 slave_id,
437 torque_limit[(torque_limit.len() - no_axis)..].to_vec(),
438 )
439 .unwrap_or_else(|e| {
440 log::error!("Failed to set torque limit for slave {}: {}", slave_id, e);
441 set_compliant = Some(true); });
443 }
444
445 let target_velocity = cmd.target_velocity;
446 if target_velocity.len() != 0 {
447 self.controller
449 .set_target_velocity(
450 slave_id,
451 target_velocity[(target_velocity.len() - no_axis)..].to_vec(),
452 )
453 .unwrap_or_else(|e| {
454 log::error!(
455 "Failed to set target velocity for slave {}: {}",
456 slave_id,
457 e
458 );
459 set_compliant = Some(true); });
461 }
462 let target_torque = cmd.target_torque;
463 if target_torque.len() != 0 {
464 self.controller
466 .set_target_torque(
467 slave_id,
468 target_torque[(target_torque.len() - no_axis)..].to_vec(),
469 )
470 .unwrap_or_else(|e| {
471 log::error!(
472 "Failed to set target torque for slave {}: {}",
473 slave_id,
474 e
475 );
476 set_compliant = Some(true); });
478 }
479
480 match set_compliant {
482 Some(true) => self
483 .controller
484 .set_torque(slave_id, false)
485 .unwrap_or_else(|e| {
486 log::error!("Failed to set torque off for slave {}: {}", slave_id, e)
487 }),
488 Some(false) => self
489 .controller
490 .set_torque(slave_id, true)
491 .unwrap_or_else(|e| {
492 log::error!("Failed to set torque on for slave {}: {}", slave_id, e)
493 }),
494 None => (),
495 }
496
497 let dt_command = t_command.elapsed().as_secs_f32();
501 if dt_command_max < dt_command {
502 dt_command_max = dt_command;
503 }
504 t_command = tokio::time::Instant::now();
506 nb_commands += 1;
507 }
508 let dt_debug = t_debug.elapsed().as_secs_f32();
515 if dt_debug > report_display_time {
516 let f_commands = nb_commands as f32 / dt_debug;
517 let f_commands_dropped = nb_dropped as f32 / dt_debug;
518 let dt_commands_ms = (dt_debug) / (nb_commands as f32) * 1000.0;
519 let dt_command_max_ms = dt_command_max * 1000.0;
520 log::info!("GRPC EtherCAT Slave {} | commands sent {:0.2} cmd/s, commands dropped {:0.2} req/s, command time: {:0.2} (max {:0.2}) ms",
521 slave_id,
522 f_commands,
523 f_commands_dropped,
524 dt_commands_ms,
525 dt_command_max_ms);
526
527 t_debug = tokio::time::Instant::now();
528 nb_commands = 0;
529 nb_dropped = 0;
530 dt_command_max = 0.0;
531 t_command = tokio::time::Instant::now();
532 report_display_time = 30.0; }
534 }
535
536 Ok(Response::new(()))
537 }
538}
539
540pub async fn launch_server(config_file: &str) -> Result<(), Box<dyn std::error::Error>> {
548 let controller = PoulpeController::connect(config_file)?;
549
550 println!("POULPE controller ready!");
551
552 let addr = "[::]:50098".parse()?;
553 let srv = PoulpeMultiplexerService {
554 controller: Arc::new(controller),
555 };
556
557 Server::builder()
558 .add_service(PoulpeMultiplexerServer::new(srv))
559 .serve(addr)
560 .await?;
561
562 Ok(())
563}
564
565fn poule_empty_state() -> PoulpeState {
566 PoulpeState {
567 id: 0,
568 mode_of_operation: 0,
569 actual_position: vec![],
570 actual_velocity: vec![],
571 actual_torque: vec![],
572 axis_sensors: vec![],
573 axis_sensor_zeros: vec![],
574 motor_temperatures: vec![],
575 board_temperatures: vec![],
576 requested_target_position: vec![],
577 requested_velocity_limit: vec![],
578 requested_torque_limit: vec![],
579 state: 0,
580 error_codes: vec![],
581 compliant: false,
582 published_timestamp: None,
583 }
584}