From 305762d1d0ed6581b0a90fef27a13de0a3bf67e3 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sun, 30 Jun 2024 21:19:13 -0400 Subject: [PATCH] remove unneeded result, fix up docs --- src/data.rs | 6 +++--- src/main.rs | 47 ++++++++++++++--------------------------------- src/mqtt.rs | 10 ++++++++++ 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/data.rs b/src/data.rs index 4e1d620..34fc8aa 100644 --- a/src/data.rs +++ b/src/data.rs @@ -25,7 +25,7 @@ impl fmt::Display for DecodeData { } /** - * Implementation fo the Data Structs' methods + * Implementation fo the DecodeData methods */ impl DecodeData { /** @@ -68,14 +68,14 @@ impl fmt::Display for EncodeData { } /** - * Implementation fo the Data Structs' methods + * Implementation fo the DecodeData methods */ impl EncodeData { /** * Constructor * @param id: the id of the can message * @param value: the can message payload - * @param topic: whether the can message is extended format + * @param is_ext: whether the can message is extended format ID */ pub fn new(id: u32, value: Vec, is_ext: bool) -> Self { Self { id, value, is_ext } diff --git a/src/main.rs b/src/main.rs index c8c5ada..cc4765a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,10 +14,12 @@ use calypso::{ use protobuf::Message; use socketcan::{CanFrame, CanSocket, EmbeddedFrame, Id, Socket}; +const ENCODER_MAP_SUB: &str = "Calypso/Bidir/Command/#"; + /** * Reads the can socket and publishes the data to the given client. */ -fn read_can(pub_path: &str, can_interface: &str) -> Result, String> { +fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle { //open can socket channel at name can_interface let mut client = MqttClient::new(pub_path, "calypso-decoder"); client.connect().expect("Could not connect to Siren!"); @@ -65,21 +67,18 @@ fn read_can(pub_path: &str, can_interface: &str) -> Result, Stri thread::sleep(Duration::from_micros(100)); } }); - Ok(join_handle) + join_handle } /** * Reads the mqtt incoming messages and sends can messages based off of that */ -fn read_siren( - pub_path: &str, - send_map: Arc>>, -) -> Result, String> { +fn read_siren(pub_path: &str, send_map: Arc>>) -> JoinHandle<()> { let mut client = MqttClient::new(pub_path, "calypso-encoder"); client.connect().expect("Could not connect to Siren!"); - let rx = client.start_consumer().expect("Could not begin consuming"); + let reciever = client.start_consumer().expect("Could not begin consuming"); client - .subscribe("Calypso/Bidir/Command/#") + .subscribe(ENCODER_MAP_SUB) .expect("Could not subscribe!"); // do the default initialization for all, do outside of the thread to save time negotiating when send_can comes up @@ -92,7 +91,7 @@ fn read_siren( drop(writable_send_map); let join_handle = thread::spawn(move || { - for msg in rx.iter() { + for msg in reciever.iter() { if let Some(msg) = msg { let buf = match command_data::CommandData::parse_from_bytes(msg.payload()) { Ok(buf) => buf, @@ -131,13 +130,13 @@ fn read_siren( } } }); - Ok(join_handle) + join_handle } fn send_out( can_interface: &str, send_map: Arc>>, -) -> Result, String> { +) -> JoinHandle<()> { let socket = CanSocket::open(can_interface).expect("Failed to open CAN socket!"); let join_handle = thread::spawn(move || loop { @@ -174,7 +173,7 @@ fn send_out( } } }); - Ok(join_handle) + join_handle } /** @@ -211,33 +210,15 @@ fn parse_args() -> (String, String, bool) { */ fn main() { let (path, can_interface, encoding) = parse_args(); - let can_handle = match read_can(&path, &can_interface) { - Ok(handle) => handle, - Err(err) => { - println!("Decoder Errored! {}", err); - process::exit(1); - } - }; + let can_handle = read_can(&path, &can_interface); // use a arc for mutlithread, and a rwlock to enforce one writer if encoding { let send_map: Arc>> = Arc::new(RwLock::new(HashMap::new())); - let siren_handle = match read_siren(&path, Arc::clone(&send_map)) { - Ok(handle) => handle, - Err(err) => { - println!("Encoder Errored! {}", err); - process::exit(1); - } - }; + let siren_handle = read_siren(&path, Arc::clone(&send_map)); - let send_handle = match send_out(&can_interface, Arc::clone(&send_map)) { - Ok(handle) => handle, - Err(err) => { - println!("Sender Errored! {}", err); - process::exit(1); - } - }; + let send_handle = send_out(&can_interface, Arc::clone(&send_map)); siren_handle.join().expect("Encoder failed with "); println!("Encoder ended"); diff --git a/src/mqtt.rs b/src/mqtt.rs index 322903b..cbde325 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -46,10 +46,20 @@ impl MqttClient { self.client.publish(msg) } + /** + * Starts the client consuming messages. + * This starts the client receiving messages and placing them into an mpsc queue. + * It returns the receiving-end of the queue for the application to get the messages. + * This can be called at any time after the client is created, but it should be called before subscribing to any topics, otherwise messages can be lost. + */ pub fn start_consumer(&mut self) -> Option>> { Some(self.client.start_consuming()) } + /** + * Subscribes to a single topic. + * topic: the topic to be subscribed to + */ pub fn subscribe(&mut self, topic: &str) -> Result { self.client.subscribe(topic, 2) }