diff --git a/src/main.rs b/src/main.rs index ff8d2a2..5cc647c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,10 +49,23 @@ struct CalypsoArgs { fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle { //open can socket channel at name can_interface let mut client = MqttClient::new(pub_path, "calypso-decoder"); - client.connect().expect("Could not connect to Siren!"); + if client.connect().is_err() { + println!("Unable to connect to Siren, going into reconnection mode."); + if client.reconnect().is_ok() { + println!("Reconnected to Siren!"); + } + } + let socket = CanSocket::open(can_interface).expect("Failed to open CAN socket!"); thread::spawn(move || loop { + if !client.is_connected() { + println!("[read_can] Unable to connect to Siren, going into reconnection mode."); + if client.reconnect().is_ok() { + println!("[read_can] Reconnected to Siren!"); + } + } + let msg = match socket.read_frame() { Ok(CanFrame::Data(msg)) => msg, Ok(CanFrame::Remote(_)) => { @@ -82,14 +95,18 @@ fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle { payload.unit = data.unit.to_string(); payload.value = data.value.iter().map(|x| x.to_string()).collect(); - client + if client .publish( data.topic.to_string(), protobuf::Message::write_to_bytes(&payload).unwrap_or_else(|e| { format!("failed to serialize {}", e).as_bytes().to_vec() }), ) - .expect("Could not publish!"); + .is_err() + { + println!("[read_can] Failed to publish to Siren."); + } + // TODO: investigate disabling this thread::sleep(Duration::from_micros(100)); } @@ -101,7 +118,15 @@ fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle { */ fn read_siren(pub_path: &str, send_map: Arc>>) -> JoinHandle<()> { let mut client = MqttClient::new(pub_path, "calypso-encoder"); - client.connect().expect("Could not connect to Siren!"); + + let _ = client.connect(); + while !client.is_connected() { + println!("[read_siren] Unable to connect to Siren, going into reconnection mode."); + if client.reconnect().is_ok() { + println!("[read_siren] Reconnected to Siren!"); + } + } + let reciever = client.start_consumer().expect("Could not begin consuming"); client .subscribe(ENCODER_MAP_SUB) @@ -141,18 +166,17 @@ fn read_siren(pub_path: &str, send_map: Arc>>) - .expect("Could not modify send messages!") .insert(ret.id, ret); } else { - // the code doesnt work without this else statement - // idk why but never remove this else statement - let is_conn = client.is_connected(); - println!("Client is {}", is_conn); - if !is_conn { - println!("Trying to reconnect"); - match client.reconnect() { - Ok(_) => println!("Reconnected!"), - Err(_) => println!("Could not reconnect!"), + while !client.is_connected() { + println!( + "[read_siren] Unable to connect to Siren, going into reconnection mode." + ); + if client.reconnect().is_ok() { + println!("[read_siren] Reconnected to Siren!"); } - continue; } + client + .subscribe(ENCODER_MAP_SUB) + .expect("Could not subscribe!"); } } })