Skip to content

Commit

Permalink
remove unneeded result, fix up docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Jul 1, 2024
1 parent bdc64cb commit 305762d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl fmt::Display for DecodeData {
}

/**
* Implementation fo the Data Structs' methods
* Implementation fo the DecodeData methods
*/
impl DecodeData {
/**
Expand Down Expand Up @@ -68,14 +68,14 @@ impl fmt::Display for EncodeData {
}

/**
* Implementation fo the Data Structs' methods
* Implementation fo the DecodeData methods
*/
impl EncodeData {
/**
* Constructor
* @param id: the id of the can message
* @param value: the can message payload
* @param topic: whether the can message is extended format
* @param is_ext: whether the can message is extended format ID
*/
pub fn new(id: u32, value: Vec<u8>, is_ext: bool) -> Self {
Self { id, value, is_ext }
Expand Down
47 changes: 14 additions & 33 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use calypso::{
use protobuf::Message;
use socketcan::{CanFrame, CanSocket, EmbeddedFrame, Id, Socket};

const ENCODER_MAP_SUB: &str = "Calypso/Bidir/Command/#";

/**
* Reads the can socket and publishes the data to the given client.
*/
fn read_can(pub_path: &str, can_interface: &str) -> Result<JoinHandle<u32>, String> {
fn read_can(pub_path: &str, can_interface: &str) -> JoinHandle<u32> {
//open can socket channel at name can_interface
let mut client = MqttClient::new(pub_path, "calypso-decoder");
client.connect().expect("Could not connect to Siren!");
Expand Down Expand Up @@ -65,21 +67,18 @@ fn read_can(pub_path: &str, can_interface: &str) -> Result<JoinHandle<u32>, Stri
thread::sleep(Duration::from_micros(100));
}
});
Ok(join_handle)
join_handle
}

/**
* Reads the mqtt incoming messages and sends can messages based off of that
*/
fn read_siren(
pub_path: &str,
send_map: Arc<RwLock<HashMap<u32, EncodeData>>>,
) -> Result<JoinHandle<()>, String> {
fn read_siren(pub_path: &str, send_map: Arc<RwLock<HashMap<u32, EncodeData>>>) -> JoinHandle<()> {
let mut client = MqttClient::new(pub_path, "calypso-encoder");
client.connect().expect("Could not connect to Siren!");
let rx = client.start_consumer().expect("Could not begin consuming");
let reciever = client.start_consumer().expect("Could not begin consuming");
client
.subscribe("Calypso/Bidir/Command/#")
.subscribe(ENCODER_MAP_SUB)
.expect("Could not subscribe!");

// do the default initialization for all, do outside of the thread to save time negotiating when send_can comes up
Expand All @@ -92,7 +91,7 @@ fn read_siren(
drop(writable_send_map);

let join_handle = thread::spawn(move || {
for msg in rx.iter() {
for msg in reciever.iter() {
if let Some(msg) = msg {
let buf = match command_data::CommandData::parse_from_bytes(msg.payload()) {
Ok(buf) => buf,
Expand Down Expand Up @@ -131,13 +130,13 @@ fn read_siren(
}
}
});
Ok(join_handle)
join_handle
}

fn send_out(
can_interface: &str,
send_map: Arc<RwLock<HashMap<u32, EncodeData>>>,
) -> Result<JoinHandle<()>, String> {
) -> JoinHandle<()> {
let socket = CanSocket::open(can_interface).expect("Failed to open CAN socket!");

let join_handle = thread::spawn(move || loop {
Expand Down Expand Up @@ -174,7 +173,7 @@ fn send_out(
}
}
});
Ok(join_handle)
join_handle
}

/**
Expand Down Expand Up @@ -211,33 +210,15 @@ fn parse_args() -> (String, String, bool) {
*/
fn main() {
let (path, can_interface, encoding) = parse_args();
let can_handle = match read_can(&path, &can_interface) {
Ok(handle) => handle,
Err(err) => {
println!("Decoder Errored! {}", err);
process::exit(1);
}
};
let can_handle = read_can(&path, &can_interface);

// use a arc for mutlithread, and a rwlock to enforce one writer
if encoding {
let send_map: Arc<RwLock<HashMap<u32, EncodeData>>> = Arc::new(RwLock::new(HashMap::new()));

let siren_handle = match read_siren(&path, Arc::clone(&send_map)) {
Ok(handle) => handle,
Err(err) => {
println!("Encoder Errored! {}", err);
process::exit(1);
}
};
let siren_handle = read_siren(&path, Arc::clone(&send_map));

let send_handle = match send_out(&can_interface, Arc::clone(&send_map)) {
Ok(handle) => handle,
Err(err) => {
println!("Sender Errored! {}", err);
process::exit(1);
}
};
let send_handle = send_out(&can_interface, Arc::clone(&send_map));

siren_handle.join().expect("Encoder failed with ");
println!("Encoder ended");
Expand Down
10 changes: 10 additions & 0 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,20 @@ impl MqttClient {
self.client.publish(msg)
}

/**
* Starts the client consuming messages.
* This starts the client receiving messages and placing them into an mpsc queue.
* It returns the receiving-end of the queue for the application to get the messages.
* This can be called at any time after the client is created, but it should be called before subscribing to any topics, otherwise messages can be lost.
*/
pub fn start_consumer(&mut self) -> Option<Receiver<Option<Message>>> {
Some(self.client.start_consuming())
}

/**
* Subscribes to a single topic.
* topic: the topic to be subscribed to
*/
pub fn subscribe(&mut self, topic: &str) -> Result<ServerResponse, mqtt::Error> {
self.client.subscribe(topic, 2)
}
Expand Down

0 comments on commit 305762d

Please sign in to comment.