poulpe_ethercat_grpc/
server.rs

1use std::{
2    env,
3    f32::consts::E,
4    mem::take,
5    sync::Arc,
6    task::Context,
7    time::{Duration, SystemTime},
8};
9
10use poulpe_ethercat_controller::{state_machine::CiA402State, PoulpeController};
11use tokio::{
12    sync::mpsc,
13    time::{error::Elapsed, sleep},
14};
15use tokio_stream::{wrappers::ReceiverStream, StreamExt};
16use tonic::{transport::Server, Request, Response, Status, Streaming};
17
18use crate::pb::{
19    poulpe_multiplexer_server::{PoulpeMultiplexer, PoulpeMultiplexerServer},
20    PoulpeCommands, PoulpeIds, PoulpeState, PoulpeStates, StateStreamRequest,
21};
22
23use prost_types::Timestamp;
24#[derive(Debug)]
25struct PoulpeMultiplexerService {
26    controller: Arc<PoulpeController>,
27}
28
29fn get_state_for_id(
30    controller: &PoulpeController,
31    id: i32,
32) -> Result<PoulpeState, Box<dyn std::error::Error>> {
33    let slave_id = id as u32;
34
35    // check if slave ready, if not dont read its state
36    if controller.is_slave_ready(slave_id as u16) == false {
37        log::error!("Slave (id: {}) not ready!", slave_id);
38        return Err(("Slave not ready!").into());
39    }
40
41    Ok(PoulpeState {
42        id,
43        mode_of_operation: match controller.get_mode_of_operation_display(slave_id as u16) {
44            Ok(mode) => mode as i32,
45            _ => {
46                log::error!("Failed to get mode of operation for slave {}", slave_id);
47                return Err("Failed to get mode of operation".into());
48            }
49        },
50        actual_position: match controller.get_current_position(slave_id) {
51            Ok(Some(pos)) => pos,
52            _ => {
53                log::error!("Failed to get actual position for slave {}", slave_id);
54                return Err("Failed to get actual position".into());
55            }
56        },
57        actual_velocity: match controller.get_current_velocity(slave_id) {
58            Ok(Some(vel)) => vel,
59            _ => {
60                log::error!("Failed to get actual velocity for slave {}", slave_id);
61                return Err("Failed to get actual velocity".into());
62            }
63        },
64        actual_torque: match controller.get_current_torque(slave_id) {
65            Ok(Some(torque)) => torque,
66            _ => {
67                log::error!("Failed to get actual torque for slave {}", slave_id);
68                return Err("Failed to get actual torque".into());
69            }
70        },
71        axis_sensors: match controller.get_current_axis_sensors(slave_id) {
72            Ok(Some(sensor)) => sensor,
73            _ => {
74                log::error!("Failed to get axis sensor for slave {}", slave_id);
75                return Err("Failed to get axis sensor".into());
76            }
77        },
78        axis_sensor_zeros: match controller.get_axis_sensor_zeros(slave_id) {
79            Ok(Some(sensor)) => sensor,
80            _ => {
81                log::error!("Failed to get axis sensor zeros for slave {}", slave_id);
82                return Err("Failed to get axis sensor zeros".into());
83            }
84        },
85        board_temperatures: match controller.get_board_temperatures(slave_id) {
86            Ok(Some(temps)) => temps,
87            _ => {
88                log::error!("Failed to get board temperatures for slave {}", slave_id);
89                return Err("Failed to get board temperatures".into());
90            }
91        },
92        motor_temperatures: match controller.get_motor_temperatures(slave_id) {
93            Ok(Some(temps)) => temps,
94            _ => {
95                log::error!("Failed to get motor temperatures for slave {}", slave_id);
96                return Err("Failed to get motor temperatures".into());
97            }
98        },
99        requested_target_position: match controller.get_current_target_position(slave_id) {
100            Ok(Some(pos)) => pos,
101            _ => {
102                log::error!(
103                    "Failed to get requested target position for slave {}",
104                    slave_id
105                );
106                return Err("Failed to get requested target position".into());
107            }
108        },
109        requested_velocity_limit: match controller.get_current_velocity_limit(slave_id) {
110            Ok(Some(velocity_limit)) => velocity_limit,
111            _ => {
112                log::error!(
113                    "Failed to get requested velcity limit for slave {}",
114                    slave_id
115                );
116                return Err("Failed to get requested  velcity limit".into());
117            }
118        },
119        requested_torque_limit: match controller.get_current_torque_limit(slave_id) {
120            Ok(Some(troque_limit)) => troque_limit,
121            _ => {
122                log::error!(
123                    "Failed to get requested torque limit for slave {}",
124                    slave_id
125                );
126                return Err("Failed to get requested torque limit".into());
127            }
128        },
129        state: match controller.get_status(slave_id) {
130            Ok(state) => state as u32,
131            _ => {
132                log::error!("Failed to get state for slave {}", slave_id);
133                return Err("Failed to get state".into());
134            }
135        },
136        error_codes: match controller.get_error_codes(slave_id) {
137            Ok(error_codes) => error_codes.iter().map(|x| *x as i32).collect(),
138            _ => {
139                log::error!("Failed to get error codes for slave {}", slave_id);
140                return Err("Failed to get error codes".into());
141            }
142        },
143        compliant: match controller.is_torque_on(slave_id) {
144            Ok(Some(state)) => state,
145            _ => {
146                log::error!("Failed to get torque state for slave {}", slave_id);
147                return Err("Failed to get torque state".into());
148            }
149        },
150        published_timestamp: Some(Timestamp::from(std::time::SystemTime::now())),
151    })
152}
153
154#[tonic::async_trait]
155impl PoulpeMultiplexer for PoulpeMultiplexerService {
156    async fn get_poulpe_ids(&self, _request: Request<()>) -> Result<Response<PoulpeIds>, Status> {
157        let reply = PoulpeIds {
158            ids: self
159                .controller
160                .get_slave_ids()
161                .iter()
162                .map(|&id| id as i32)
163                .collect(),
164            names: self.controller.get_slave_names(),
165        };
166
167        Ok(Response::new(reply))
168    }
169
170    type GetStatesStream = ReceiverStream<Result<PoulpeStates, Status>>;
171
172    async fn get_states(
173        &self,
174        request: Request<StateStreamRequest>,
175    ) -> Result<Response<Self::GetStatesStream>, Status> {
176        let (tx, rx) = mpsc::channel(2);
177
178        let controller = Arc::clone(&self.controller);
179
180        let ids = request.get_ref().ids.clone();
181        for id in ids.clone() {
182            let slave_id = id as u32;
183            log::info!(
184                "Setup Slave {} (id: {})...",
185                controller.get_slave_name(slave_id as u16).unwrap(),
186                slave_id
187            );
188            match controller.setup(slave_id) {
189                Ok(_) => log::info!("Done!"),
190                Err(e) => {
191                    log::error!("Failed to setup slave {}: {}", slave_id, e);
192                    return Err(Status::internal("Failed to setup slaves"));
193                }
194            }
195        }
196
197        log::info!(
198            "New client - update period of {}s",
199            request.get_ref().update_period
200        );
201
202        tokio::spawn(async move {
203            let request = request.get_ref();
204            // fixed frequency
205            let mut interval =
206                tokio::time::interval(Duration::from_secs_f32(request.update_period));
207
208            // state to be sent if no state is available
209            // this is to avoid sending empty states
210            let mut last_state = poule_empty_state();
211
212            // DEBUGGING
213            let mut nb_states = 0;
214            let mut t_debug = tokio::time::Instant::now();
215            // debug report sent time
216            // first report will be displayed after this many seconds
217            // and then every other will be after 30s
218            let mut report_display_time = 5.0; // seconds
219            loop {
220                // Wait until the next tick
221                interval.tick().await;
222
223                let states = PoulpeStates {
224                    states: request
225                        .ids
226                        .iter()
227                        .map(|&id| match get_state_for_id(&controller, id) {
228                            Ok(state) => {
229                                last_state = state.clone();
230                                state
231                            }
232                            Err(e) => {
233                                log::error!("Failed to get state for slave {}: {}", id, e);
234                                last_state.clone()
235                            }
236                        })
237                        .collect(),
238                };
239                if tx.send(Ok(states)).await.is_err() {
240                    break;
241                }
242
243                // DEBUGGING
244                // send the report to the user about the states
245                nb_states += 1;
246                if t_debug.elapsed().as_secs_f32() > report_display_time {
247                    log::info!(
248                        "GRPC EtherCAT Slave {} | states sent {} req/s",
249                        request.ids[0],
250                        nb_states as f32 / t_debug.elapsed().as_secs_f32()
251                    );
252                    nb_states = 0;
253                    t_debug = tokio::time::Instant::now();
254                    report_display_time = 30.0; // [secs] next report display
255                }
256            }
257        });
258
259        Ok(Response::new(ReceiverStream::new(rx)))
260    }
261
262    async fn get_commands(
263        &self,
264        request: Request<Streaming<PoulpeCommands>>,
265    ) -> Result<Response<()>, Status> {
266        let mut stream = request.into_inner();
267
268        // DEBUGGING
269        // measure the time for debugging
270        let mut t_debug = tokio::time::Instant::now();
271        // measure the number of commands sent and dropped
272        let mut nb_commands = 0;
273        let mut nb_dropped = 0;
274        // measure the time between the commands
275        let mut t_command = tokio::time::Instant::now();
276        // the longest time between two commands
277        let mut dt_command_max: f32 = 0.0;
278        // debug report sent time
279        // first report will be displayed after this many seconds
280        // and then every other will be after 30s
281        let mut report_display_time = 5.0; // seconds
282
283        let no_slaves: usize = self.controller.get_slave_ids().len();
284        // number of errors before stopping the loop
285        let mut nb_errors = vec![0; no_slaves];
286        let mut nb_errors_max = 1000;
287
288        while let Some(Ok(req)) = stream.next().await {
289            let mut slave_id: u32 = req.commands[0].id as u32;
290            // check if the slave is ready and drop the command if not
291            if self.controller.is_slave_ready(slave_id as u16) == false {
292                log::error!("Slave (id: {}) not ready!", slave_id);
293                nb_dropped += 1;
294                continue;
295            }
296
297            // check if the slave is in fault state
298            match self.controller.get_status(slave_id) {
299                Ok(CiA402State::Fault) | Ok(CiA402State::FaultReactionActive) => {
300                    let i = slave_id as usize;
301                    nb_errors[i] += 1;
302                    if nb_errors[i] % nb_errors_max == 0 {
303                        log::error!(
304                            "Slave {} (name {}) is in fault state for {}s,\n {:#x?}",
305                            slave_id,
306                            self.controller.get_slave_name(slave_id as u16).unwrap(),
307                            nb_errors[i] as f32 * 1e-3,
308                            self.controller.get_error_flags(slave_id as u16).unwrap()
309                        );
310
311                        #[cfg(feature = "qucik_stop_on_slave_fault")]
312                        {
313                            log::warn!("Sending emergency stop on all slaves!");
314                            // quick stop on all slaves
315                            self.controller
316                                .emergency_stop_all(slave_id as u16)
317                                .unwrap_or_else(|e| {
318                                    log::error!("Failed to quick stop all slaves: {}", e);
319                                });
320                        }
321                        #[cfg(feature = "stop_server_on_actuator_error")]
322                        {
323                            log::error!("Stopping the GRPC master!");
324                            // stop the stack
325                            std::process::exit(10);
326                        }
327                        // display the error every 5s
328                        nb_errors_max = 5000;
329                    }
330                    continue;
331                }
332                _ => {} // do nothing
333            }
334
335            log::debug!("Got commands {:?}", req);
336            for cmd in req.commands {
337                slave_id = cmd.id as u32;
338
339                // emergency stop first
340                match cmd.emergency_stop {
341                    Some(true) => {
342                        self.controller
343                            .emergency_stop(slave_id)
344                            .unwrap_or_else(|e| {
345                                log::error!(
346                                    "Failed to set emergency stop for slave {}: {}",
347                                    slave_id,
348                                    e
349                                )
350                            });
351                        continue;
352                    }
353                    _ => (),
354                }
355
356                let mut target_pos = cmd.target_position;
357                match cmd.published_timestamp {
358                    Some(published_time) => {
359                        let published_time = match SystemTime::try_from(published_time) {
360                            Ok(systime) => systime,
361                            Err(_) => {
362                                log::warn!("Cannot parse the timestamp, discarding message!");
363                                continue;
364                            }
365                        };
366                        // check if the message is older than allowed time
367                        if self
368                            .controller
369                            .check_if_too_old(published_time.elapsed().unwrap())
370                        {
371                            nb_dropped += 1;
372                            continue;
373                        }
374                    }
375                    None => {
376                        log::warn!("No published timestamp, discarding message!");
377                        continue;
378                    }
379                }
380
381                let no_axis = self.controller.get_orbita_type(slave_id) as usize;
382
383                let mut set_compliant = cmd.compliancy;
384
385                let mode_of_operation = cmd.mode_of_operation;
386                if mode_of_operation != 0 {
387                    self.controller
388                        .set_mode_of_operation(slave_id as u16, mode_of_operation as u8)
389                        .unwrap_or_else(|e| {
390                            log::error!(
391                                "Failed to set mode of operation for slave {}: {}",
392                                slave_id,
393                                e
394                            )
395                        });
396                }
397
398                if target_pos.len() != 0 {
399                    // set only last target command
400                    self.controller
401                        .set_target_position(
402                            slave_id,
403                            target_pos[(target_pos.len() - no_axis)..].to_vec(),
404                        )
405                        .unwrap_or_else(|e| {
406                            log::error!(
407                                "Failed to set target position for slave {}: {}",
408                                slave_id,
409                                e
410                            );
411                            set_compliant = Some(true); // disable the slave!
412                        });
413                }
414                let velocity_limit = cmd.velocity_limit;
415                if velocity_limit.len() != 0 {
416                    // set only last target command
417                    self.controller
418                        .set_velocity_limit(
419                            slave_id,
420                            velocity_limit[(velocity_limit.len() - no_axis)..].to_vec(),
421                        )
422                        .unwrap_or_else(|e| {
423                            log::error!(
424                                "Failed to set velocity limit for slave {}: {}",
425                                slave_id,
426                                e
427                            );
428                            set_compliant = Some(true); // disable the slave!
429                        });
430                }
431                let torque_limit = cmd.torque_limit;
432                if torque_limit.len() != 0 {
433                    // set only last target command
434                    self.controller
435                        .set_torque_limit(
436                            slave_id,
437                            torque_limit[(torque_limit.len() - no_axis)..].to_vec(),
438                        )
439                        .unwrap_or_else(|e| {
440                            log::error!("Failed to set torque limit for slave {}: {}", slave_id, e);
441                            set_compliant = Some(true); // disable the slave!
442                        });
443                }
444
445                let target_velocity = cmd.target_velocity;
446                if target_velocity.len() != 0 {
447                    // set only last target command
448                    self.controller
449                        .set_target_velocity(
450                            slave_id,
451                            target_velocity[(target_velocity.len() - no_axis)..].to_vec(),
452                        )
453                        .unwrap_or_else(|e| {
454                            log::error!(
455                                "Failed to set target velocity for slave {}: {}",
456                                slave_id,
457                                e
458                            );
459                            set_compliant = Some(true); // disable the slave!
460                        });
461                }
462                let target_torque = cmd.target_torque;
463                if target_torque.len() != 0 {
464                    // set only last target command
465                    self.controller
466                        .set_target_torque(
467                            slave_id,
468                            target_torque[(target_torque.len() - no_axis)..].to_vec(),
469                        )
470                        .unwrap_or_else(|e| {
471                            log::error!(
472                                "Failed to set target torque for slave {}: {}",
473                                slave_id,
474                                e
475                            );
476                            set_compliant = Some(true); // disable the save!
477                        });
478                }
479
480                // check if the slave is compliant
481                match set_compliant {
482                    Some(true) => self
483                        .controller
484                        .set_torque(slave_id, false)
485                        .unwrap_or_else(|e| {
486                            log::error!("Failed to set torque off for slave {}: {}", slave_id, e)
487                        }),
488                    Some(false) => self
489                        .controller
490                        .set_torque(slave_id, true)
491                        .unwrap_or_else(|e| {
492                            log::error!("Failed to set torque on for slave {}: {}", slave_id, e)
493                        }),
494                    None => (),
495                }
496
497                // DEBUGGING
498                // check if the time between commands is longest than
499                // the previous longest time, and save it for debugging
500                let dt_command = t_command.elapsed().as_secs_f32();
501                if dt_command_max < dt_command {
502                    dt_command_max = dt_command;
503                }
504                // timestamp of the new command
505                t_command = tokio::time::Instant::now();
506                nb_commands += 1;
507            }
508            // wait for the next cycle
509            // to make sure the commands are executed
510            // self.controller.inner.wait_for_next_cycle();
511
512            // DEBUGGING
513            // display the status report to the user each 10s
514            let dt_debug = t_debug.elapsed().as_secs_f32();
515            if dt_debug > report_display_time {
516                let f_commands = nb_commands as f32 / dt_debug;
517                let f_commands_dropped = nb_dropped as f32 / dt_debug;
518                let dt_commands_ms = (dt_debug) / (nb_commands as f32) * 1000.0;
519                let dt_command_max_ms = dt_command_max * 1000.0;
520                log::info!("GRPC EtherCAT Slave {} | commands sent {:0.2} cmd/s, commands dropped {:0.2} req/s, command time: {:0.2} (max {:0.2}) ms", 
521                            slave_id,
522                            f_commands,
523                            f_commands_dropped,
524                            dt_commands_ms,
525                            dt_command_max_ms);
526
527                t_debug = tokio::time::Instant::now();
528                nb_commands = 0;
529                nb_dropped = 0;
530                dt_command_max = 0.0;
531                t_command = tokio::time::Instant::now();
532                report_display_time = 30.0; // [secs] next report display
533            }
534        }
535
536        Ok(Response::new(()))
537    }
538}
539
540/// Launch the server with the config file
541///
542/// # Arguments
543///     * `config_file` - the path to the configuration file
544/// # Returns
545///     * Ok if the server was launched successfully
546///     * Err if the server failed to launch
547pub async fn launch_server(config_file: &str) -> Result<(), Box<dyn std::error::Error>> {
548    let controller = PoulpeController::connect(config_file)?;
549
550    println!("POULPE controller ready!");
551
552    let addr = "[::]:50098".parse()?;
553    let srv = PoulpeMultiplexerService {
554        controller: Arc::new(controller),
555    };
556
557    Server::builder()
558        .add_service(PoulpeMultiplexerServer::new(srv))
559        .serve(addr)
560        .await?;
561
562    Ok(())
563}
564
565fn poule_empty_state() -> PoulpeState {
566    PoulpeState {
567        id: 0,
568        mode_of_operation: 0,
569        actual_position: vec![],
570        actual_velocity: vec![],
571        actual_torque: vec![],
572        axis_sensors: vec![],
573        axis_sensor_zeros: vec![],
574        motor_temperatures: vec![],
575        board_temperatures: vec![],
576        requested_target_position: vec![],
577        requested_velocity_limit: vec![],
578        requested_torque_limit: vec![],
579        state: 0,
580        error_codes: vec![],
581        compliant: false,
582        published_timestamp: None,
583    }
584}