Skip to content

Commit

Permalink
allow read_can to reconnect on Siren connection failure (#51)
Browse files Browse the repository at this point in the history
* allow read_can to reconnect on failure

* syntax fix according to clippy

* expanded reconnection to read_siren

* update read_siren reconnect

* read_siren add re-subscription

* cleanup
  • Loading branch information
tszwinglitw authored Sep 29, 2024
1 parent 79a2dcd commit 2d6d467
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,23 @@ struct CalypsoArgs {
fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle<u32> {
//open can socket channel at name can_interface
let mut client = MqttClient::new(pub_path, "calypso-decoder");
client.connect().expect("Could not connect to Siren!");
if client.connect().is_err() {
println!("Unable to connect to Siren, going into reconnection mode.");
if client.reconnect().is_ok() {
println!("Reconnected to Siren!");
}
}

let socket = CanSocket::open(can_interface).expect("Failed to open CAN socket!");

thread::spawn(move || loop {
if !client.is_connected() {
println!("[read_can] Unable to connect to Siren, going into reconnection mode.");
if client.reconnect().is_ok() {
println!("[read_can] Reconnected to Siren!");
}
}

let msg = match socket.read_frame() {
Ok(CanFrame::Data(msg)) => msg,
Ok(CanFrame::Remote(_)) => {
Expand Down Expand Up @@ -82,14 +95,18 @@ fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle<u32> {
payload.unit = data.unit.to_string();
payload.value = data.value.iter().map(|x| x.to_string()).collect();

client
if client
.publish(
data.topic.to_string(),
protobuf::Message::write_to_bytes(&payload).unwrap_or_else(|e| {
format!("failed to serialize {}", e).as_bytes().to_vec()
}),
)
.expect("Could not publish!");
.is_err()
{
println!("[read_can] Failed to publish to Siren.");
}

// TODO: investigate disabling this
thread::sleep(Duration::from_micros(100));
}
Expand All @@ -101,7 +118,15 @@ fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle<u32> {
*/
fn read_siren(pub_path: &str, send_map: Arc<RwLock<HashMap<u32, EncodeData>>>) -> JoinHandle<()> {
let mut client = MqttClient::new(pub_path, "calypso-encoder");
client.connect().expect("Could not connect to Siren!");

let _ = client.connect();
while !client.is_connected() {
println!("[read_siren] Unable to connect to Siren, going into reconnection mode.");
if client.reconnect().is_ok() {
println!("[read_siren] Reconnected to Siren!");
}
}

let reciever = client.start_consumer().expect("Could not begin consuming");
client
.subscribe(ENCODER_MAP_SUB)
Expand Down Expand Up @@ -141,18 +166,17 @@ fn read_siren(pub_path: &str, send_map: Arc<RwLock<HashMap<u32, EncodeData>>>) -
.expect("Could not modify send messages!")
.insert(ret.id, ret);
} else {
// the code doesnt work without this else statement
// idk why but never remove this else statement
let is_conn = client.is_connected();
println!("Client is {}", is_conn);
if !is_conn {
println!("Trying to reconnect");
match client.reconnect() {
Ok(_) => println!("Reconnected!"),
Err(_) => println!("Could not reconnect!"),
while !client.is_connected() {
println!(
"[read_siren] Unable to connect to Siren, going into reconnection mode."
);
if client.reconnect().is_ok() {
println!("[read_siren] Reconnected to Siren!");
}
continue;
}
client
.subscribe(ENCODER_MAP_SUB)
.expect("Could not subscribe!");
}
}
})
Expand Down

0 comments on commit 2d6d467

Please sign in to comment.