
1use std::{any::Any, collections::HashMap, f32::consts::E, sync::Arc, time::Duration};
3use super::pb::{
4    poulpe_multiplexer_client::PoulpeMultiplexerClient, PoulpeCommand, PoulpeCommands, PoulpeState,
5    StateStreamRequest,
7use prost_types::Timestamp;
8use tokio::{
9    runtime::{Builder, Handle, Runtime},
10    sync::RwLock,
11    time::sleep,
13use tonic::{transport::Uri, Request};
15use poulpe_ethercat_controller::{register::BoardStatus, state_machine::CiA402State};
18enum Command {
19    EmergencyStop(bool),
20    Compliancy(bool),
21    ModeOfOperation(u32),
22    TargetPosition(Vec<f32>),
23    TargetVelocity(Vec<f32>),
24    TargetTorque(Vec<f32>),
25    VelocityLimit(Vec<f32>),
26    TorqueLimit(Vec<f32>),
30pub struct PoulpeRemoteClient {
31    pub ids: Vec<u16>,
32    pub names: Vec<String>,
33    rt: Arc<Runtime>,
34    addr: Uri,
35    state: Arc<RwLock<HashMap<u16, PoulpeState>>>,
36    command_buff: Arc<RwLock<HashMap<u16, Vec<Command>>>>,
39impl PoulpeRemoteClient {
40    /// Connect to the server with the given poulpe names
41    /// - connects to the server and reads the ids of the poulpe slaves
42    /// - checks if the given poulpe names are valid
43    /// - subsrtibes to the states of the given slaves
44    ///
45    /// # Arguments
46    ///     - addr: Uri: The address of the server
47    ///     - poulpe_names: Vec<String>: The names of the poulpe slaves to connect to
48    ///     - update_period: Duration: The update period for the state stream
49    /// # Returns
50    ///     - Result<Self, std::io::Error>: The client if successful, else an error
51    pub fn connect_with_name(
52        addr: Uri,
53        poulpe_names: Vec<String>,
54        update_period: Duration,
55    ) -> Result<Self, std::io::Error> {
56        // read all slave ids and names in the network
57        let id_client = PoulpeIdClient::new(addr.clone());
58        let (all_ids, all_names) = match id_client.get_slaves() {
59            Ok((ids, names)) => (ids, names),
60            Err(e) => {
61                log::error!("Failed to connect to the server: {}", e);
62                return Err(std::io::Error::new(
63                    std::io::ErrorKind::ConnectionRefused,
64                    "Error in connecting to the server! Check if server is up!!!",
65                ));
66            }
67        };
69        // verify the names
70        let mut poulpe_ids = vec![];
71        for name in poulpe_names {
72            // get index of the name in the all_names
73            let id = all_names.iter().position(|n| n == &name);
74            match id {
75                Some(id) => poulpe_ids.push(all_ids[id]),
76                None => {
77                    log::error!("Invalid poulpe name: {}", name);
78                    return Err(std::io::Error::new(
79                        std::io::ErrorKind::InvalidInput,
80                        "Invalid poulpe name",
81                    ));
82                }
83            }
84        }
86        // create the client
87        PoulpeRemoteClient::connect(addr, poulpe_ids, update_period)
88    }
90    /// Connect to the server with the given poulpe ids
91    /// - connects to the server and reads the ids of the poulpe slaves
92    /// - checks if the given poulpe ids are valid
93    /// - subsrtibes to the states of the given slaves
94    ///
95    /// # Arguments
96    ///    - addr: Uri: The address of the server
97    ///    - ids: Vec<u16>: The ids of the poulpe slaves to connect to
98    ///    - update_period: Duration: The update period for the state stream
99    /// # Returns
100    ///    - Result<Self, std::io::Error>: The client if successful, else an error
101    pub fn connect(
102        addr: Uri,
103        poulpe_ids: Vec<u16>,
104        update_period: Duration,
105    ) -> Result<Self, std::io::Error> {
106        let state = Arc::new(RwLock::new(HashMap::new()));
107        let state_lock = Arc::clone(&state);
109        let command_buff = Arc::new(RwLock::new(HashMap::new()));
110        let command_buff_lock = Arc::clone(&command_buff);
112        // let rt = Builder::new_multi_thread().enable_all().build().unwrap();
113        let rt = Arc::new(Builder::new_multi_thread().enable_all().build().unwrap());
115        // nemes of the poulpe slaves to read the states from
116        let mut names = vec![];
118        // Validate poulpe_ids
119        let client = PoulpeIdClient::new(addr.clone());
121        let url = addr.to_string();
122        let ids = poulpe_ids.clone();
123        // check if the id is valid and the server is up
124        // check if poulpe_ids are valid
125        match client.get_slaves() {
126            Ok((available_ids, available_names)) => {
127                let mut common_ids = available_ids.clone();
128                let mut common_names = available_names.clone();
129                // remove the other ids and names
130                // get indexes to retain
131                let mut retain_inds = vec![];
132                for id in poulpe_ids.iter() {
133                    if let Some(ind) = available_ids.iter().position(|&x| x == *id) {
134                        retain_inds.push(ind);
135                    }
136                }
137                common_ids = retain_inds.iter().map(|&i| available_ids[i]).collect();
138                common_names = retain_inds
139                    .iter()
140                    .map(|&i| available_names[i].clone())
141                    .collect();
142                if common_ids.len() != poulpe_ids.len() || common_names.len() != poulpe_ids.len() {
143                    log::error!(
144                        "Invalid poulpe_ids: {:?}, available_ids: {:?}",
145                        poulpe_ids,
146                        available_ids
147                    );
148                    return Err(std::io::Error::new(
149                        std::io::ErrorKind::InvalidInput,
150                        "Invalid poulpe_ids",
151                    ));
152                }
153                // ids are good,
154                names = common_names.clone();
155            }
156            Err(e) => {
157                log::error!(
158                    "Error in connecting to the server! Check if server is up!!!\n  {:?}",
159                    e
160                );
161                return Err(std::io::Error::new(
162                    std::io::ErrorKind::ConnectionRefused,
163                    "Error in connecting to the server! Check if server is up!!!",
164                ));
165            }
166        }
168        // spawn a single thread to handle both the state stream and command stream
169        rt.spawn(async move {
170            // Connect to the server
171            let mut client = match PoulpeMultiplexerClient::connect(url).await {
172                Ok(client) => client,
173                Err(e) => {
174                    log::error!(
175                        "Error in connecting to the server! Check if server is up!!!\n  {:?}",
176                        e
177                    );
178                    return;
179                }
180            };
182            // Prepare the state stream request
183            let state_request = Request::new(StateStreamRequest {
184                ids: poulpe_ids.iter().map(|&id| id as i32).collect(),
185                update_period: update_period.as_secs_f32(),
186            });
188            // Start receiving states
189            let mut state_stream = client.get_states(state_request).await.unwrap().into_inner();
191            // Prepare the command stream
192            let command_stream = async_stream::stream! {
193                // fixed frequency
194                let mut interval = tokio::time::interval(update_period / 2);
196                loop {
197                    // next cycle
198                    interval.tick().await;
199                    let mut cmd_map = command_buff_lock.write().await;
200                    if let Some(commands) = extract_commands(&mut cmd_map) {
201                        yield commands;
202                    }
203                }
204            };
206            // Send commands in parallel with state handling
207            tokio::select! {
208                // Handle state stream
209                _ = async {
210                    while let Some(poulpe_state) = state_stream.message().await.unwrap() {
211                        log::debug!("Update state with {:?}", poulpe_state);
212                        let mut state_buff = state_lock.write().await;
213                        for s in poulpe_state.states {
214                            state_buff.insert(s.id as u16, s);
215                        }
216                    }
217                } => {},
219                // Handle command stream
220                result = client.get_commands(Request::new(command_stream)) => {
221                    match result {
222                        Ok(_) => log::info!("Command stream ended"),
223                        Err(e) => log::error!("Error in command stream: {:?}", e),
224                    }
225                },
226            }
227        });
229        Ok(PoulpeRemoteClient {
230            ids,
231            names,
232            rt,
233            addr,
234            state,
235            command_buff,
236        })
237    }
239    /// Get all ids and names of slaves in the network
240    pub fn get_poulpe_ids_sync(
241        &self,
242    ) -> Result<(Vec<u16>, Vec<String>), Box<dyn std::error::Error>> {
243        self.rt.block_on(async {
244            let mut client = PoulpeMultiplexerClient::connect(self.addr.to_string()).await?;
245            get_poulpe_ids_async(&mut client).await
246        })
247    }
249    /// get the ids of the slaves the client is connected to
250    pub fn get_poulpe_ids(&self) -> Vec<u16> {
251        self.rt
252            .block_on(self.state.read())
253            .keys()
254            .cloned()
255            .collect()
256    }
258    /// get the state property, reading stream sent by the server
259    ///  
260    /// - check if the state is older than 1s
261    /// - if older than 1s, log an error and return an error
262    fn get_state_property<T, F>(&self, slave_id: u16, f: F, _default: T) -> Result<T, ()>
263    where
264        F: Fn(&PoulpeState) -> T,
265    {
266        let state = self.rt.block_on(self.state.read());
267        let state = state.get(&slave_id).ok_or_else(|| {
268            log::warn!("No state found for slave {}", slave_id);
269        })?;
271        if let Some(ts) = &state.published_timestamp {
272            if let Ok(systime) = std::time::SystemTime::try_from(ts.clone()) {
273                if systime.elapsed().unwrap().as_millis() > 1000 {
274                    log::error!(
275                        "State is older than 1s for slave {}, server maybe down!",
276                        slave_id
277                    );
278                    // kill the slave if error recovery not supported
279                    #[cfg(feature = "stop_client_on_server_timeout")]
280                    std::process::exit(10);
281                    return Err(());
282                }
283            } else {
284                log::warn!("Cannot parse the timestamp, discarding message!");
285                // kill the slave if error recovery not supported
286                #[cfg(feature = "stop_client_on_server_timeout")]
287                std::process::exit(10);
288                return Err(());
289            }
290        } else {
291            log::warn!("No timestamp found for slave {}", slave_id);
292        }
294        Ok(f(state))
295    }
297    /// Get the current position of the slave [rad]
298    pub fn get_position_actual_value(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
299        self.get_state_property(slave_id, |state| state.actual_position.clone(), vec![])
300    }
302    /// Get the current velocity of the slave [rad/s]
303    pub fn get_velocity_actual_value(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
304        self.get_state_property(slave_id, |state| state.actual_velocity.clone(), vec![])
305    }
307    /// Get the current torque of the slave [mA]
308    pub fn get_torque_actual_value(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
309        self.get_state_property(slave_id, |state| state.actual_torque.clone(), vec![])
310    }
312    /// Check if the slave is on or off
313    pub fn is_on(&self, slave_id: u16) -> Result<bool, ()> {
314        self.get_state_property(slave_id, |state| state.compliant, false)
315    }
317    /// Get the target position of the slave [rad]
318    pub fn get_target_position(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
319        self.get_state_property(
320            slave_id,
321            |state| state.requested_target_position.clone(),
322            vec![],
323        )
324    }
326    /// Get the motor temperatures of the slave [°C]
327    pub fn get_motor_temperatures(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
328        self.get_state_property(slave_id, |state| state.motor_temperatures.clone(), vec![])
329    }
330    /// Get the driver board temperatures of the slave [°C]
331    pub fn get_board_temperatures(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
332        self.get_state_property(slave_id, |state| state.board_temperatures.clone(), vec![])
333    }
335    /// Get the mode of operation of the slave [CiA402 State]
336    /// 1: Profile Position, 3: Profile Velocity, 4: Profile Torque
337    /// (see poulpe_ethercat_controller::state_machine)
338    pub fn get_mode_of_operation(&self, slave_id: u16) -> Result<u32, ()> {
339        self.get_state_property(slave_id, |state| state.mode_of_operation as u32, 255)
340    }
342    /// Get the state of the slave [BoardState] (see poulpe_ethercat_controller::register)
343    pub fn get_state(&self, slave_id: u16) -> Result<u32, ()> {
344        // temporaty solution trasnforming the state to board status
345        match BoardStatus::from_cia402_to_board_status(
346            self.get_state_property(slave_id, |state| state.state, 255)?,
347            self.get_state_property(slave_id, |state| state.error_codes.clone(), vec![])?,
348        ) {
349            Ok(s) => Ok(s as u32),
350            Err(_) => Err(()),
351        }
352        // self.get_state_property(slave_id, |state| state.state, 255)
353    }
354    /// Get the state of the slave [CiA402 State]  (see poulpe_ethercat_controller::state_machine)
355    pub fn get_cia402_state(&self, slave_id: u16) -> Result<u32, ()> {
356        self.get_state_property(slave_id, |state| state.state, 255)
357    }
359    /// Get the current compliance state of the slave (the same as is_on - deprecated)
360    pub fn get_torque_state(&self, slave_id: u16) -> Result<bool, ()> {
361        self.get_state_property(slave_id, |state| state.compliant, false)
362    }
364    /// Get the current axis sensor position [rad]
365    pub fn get_axis_sensors(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
366        self.get_state_property(slave_id, |state| state.axis_sensors.clone(), vec![])
367    }
369    /// Get the axis senosr absolute zeros from the firmware [rad]
370    pub fn get_axis_sensor_zeros(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
371        self.get_state_property(slave_id, |state| state.axis_sensor_zeros.clone(), vec![])
372    }
374    /// Get the motor and homing error codes (see poulpe_ethercat_controller::state_machine)
375    pub fn get_error_codes(&self, slave_id: u16) -> Result<Vec<i32>, ()> {
376        self.get_state_property(slave_id, |state| state.error_codes.clone(), vec![])
377    }
379    /// Get current velocity limits (relative from 0 to 1)
380    pub fn get_velocity_limit(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
381        self.get_state_property(
382            slave_id,
383            |state| state.requested_velocity_limit.clone(),
384            vec![],
385        )
386    }
387    /// Get current torque limits (relative from 0 to 1)
388    pub fn get_torque_limit(&self, slave_id: u16) -> Result<Vec<f32>, ()> {
389        self.get_state_property(
390            slave_id,
391            |state| state.requested_torque_limit.clone(),
392            vec![],
393        )
394    }
396    /// Send the command to the server
397    fn push_command(&mut self, slave_id: u16, command: Command) -> Result<(), ()> {
398        self.rt
399            .block_on(self.command_buff.write())
400            .entry(slave_id)
401            .or_insert_with(Vec::new)
402            .push(command);
403        Ok(())
404    }
406    /// Turn on the slave (compliance off)
407    pub fn turn_on(&mut self, slave_id: u16) {
408        self.push_command(slave_id, Command::Compliancy(false));
409    }
411    /// Turn off the slave (compliance on)
412    pub fn turn_off(&mut self, slave_id: u16) {
413        self.push_command(slave_id, Command::Compliancy(true));
414    }
416    /// Set the mode of operation of the slave [CiA402 compliant]
417    /// 1: Profile Position, 3: Profile Velocity, 4: Profile Torque
418    /// (see poulpe_ethercat_controller::state_machine)
419    pub fn set_mode_of_operation(&mut self, slave_id: u16, mode: u32) {
420        self.push_command(slave_id, Command::ModeOfOperation(mode));
421    }
423    /// Set the target position of the slave [rad]
424    pub fn set_target_position(&mut self, slave_id: u16, target_position: Vec<f32>) {
425        self.push_command(slave_id, Command::TargetPosition(target_position));
426    }
427    /// Set the target velocity of the slave [rad/s]
428    pub fn set_target_velocity(&mut self, slave_id: u16, target_velocity: Vec<f32>) {
429        self.push_command(slave_id, Command::TargetVelocity(target_velocity));
430    }
431    /// Set the target torque of the slave [mA]
432    pub fn set_target_torque(&mut self, slave_id: u16, target_torque: Vec<f32>) {
433        self.push_command(slave_id, Command::TargetTorque(target_torque));
434    }
436    /// Set the velocity limit of the slave [relative from 0 to 1]
437    pub fn set_velocity_limit(&mut self, slave_id: u16, velocity_limit: Vec<f32>) {
438        self.push_command(slave_id, Command::VelocityLimit(velocity_limit));
439    }
440    /// Set the torque limit of the slave [relative from 0 to 1]
441    pub fn set_torque_limit(&mut self, slave_id: u16, torque_limit: Vec<f32>) {
442        self.push_command(slave_id, Command::TorqueLimit(torque_limit));
443    }
444    /// Emergency stop the slave
445    pub fn emergency_stop(&mut self, slave_id: u16) {
446        self.push_command(slave_id, Command::EmergencyStop(true));
447    }
450/// Extract the commands from the buffer and prepare them to be sent to the server
451/// timestamp is added in the process to keep track of the time the command was sent
452/// and to discard the command if it is too old
453fn extract_commands(buff: &mut HashMap<u16, Vec<Command>>) -> Option<PoulpeCommands> {
454    if buff.is_empty() {
455        return None;
456    }
458    let mut commands = vec![];
460    for (&id, cmds) in buff.iter() {
461        let mut poulpe_cmd = PoulpeCommand {
462            id: id.into(),
463            ..Default::default()
464        };
465        for cmd in cmds {
466            match cmd {
467                Command::EmergencyStop(stop) => poulpe_cmd.emergency_stop = Some(*stop),
468                Command::Compliancy(comp) => poulpe_cmd.compliancy = Some(*comp),
469                Command::ModeOfOperation(mode) => poulpe_cmd.mode_of_operation = *mode as i32,
470                Command::TargetPosition(pos) => {
471                    if pos.len() != 0 {
472                        poulpe_cmd.target_position.extend(pos.iter().cloned());
473                    }
474                }
475                Command::TargetVelocity(vel) => {
476                    if vel.len() != 0 {
477                        poulpe_cmd.target_velocity.extend(vel.iter().cloned());
478                    }
479                }
480                Command::TargetTorque(torque) => {
481                    if torque.len() != 0 {
482                        poulpe_cmd.target_torque.extend(torque.iter().cloned());
483                    }
484                }
485                Command::VelocityLimit(vel) => {
486                    if vel.len() != 0 {
487                        poulpe_cmd.velocity_limit.extend(vel.iter().cloned());
488                    }
489                }
490                Command::TorqueLimit(torque) => {
491                    if torque.len() != 0 {
492                        poulpe_cmd.torque_limit.extend(torque.iter().cloned());
493                    }
494                }
495            }
496        }
497        poulpe_cmd.published_timestamp = Some(Timestamp::from(std::time::SystemTime::now()));
498        commands.push(poulpe_cmd);
499    }
501    buff.clear();
503    Some(PoulpeCommands { commands })
506/// Get the ids nad names of slaves in the EtherCAT network
507pub async fn get_poulpe_ids_async(
508    client: &mut PoulpeMultiplexerClient<tonic::transport::Channel>,
509) -> Result<(Vec<u16>, Vec<String>), Box<dyn std::error::Error>> {
510    let response = client.get_poulpe_ids(Request::new(())).await?;
511    let response = response.into_inner();
512    let ids: Vec<u16> = response.ids.into_iter().map(|id| id as u16).collect();
513    let names: Vec<String> = response
514        .names
515        .into_iter()
516        .map(|name: String| name as String)
517        .collect();
518    Ok((ids, names))
521/// A simple client that connects to the server and gets the ids of the slaves
522/// in the network
524/// The idea is to avoid subscribing to the slave states of the server if one
525/// just wants to check which nodes are in the network
527pub struct PoulpeIdClient {
528    rt: Arc<Runtime>,
529    addr: Uri,
531impl PoulpeIdClient {
532    pub fn new(addr: Uri) -> Self {
533        let rt = Arc::new(Builder::new_multi_thread().enable_all().build().unwrap());
534        PoulpeIdClient { rt, addr }
535    }
537    /// Get the ids and names of the slaves in the network
538    pub fn get_slaves(&self) -> Result<(Vec<u16>, Vec<String>), Box<dyn std::error::Error>> {
539        self.rt.block_on(async {
540            let mut client = PoulpeMultiplexerClient::connect(self.addr.to_string()).await?;
541            get_poulpe_ids_async(&mut client).await
542        })
543    }