Skip to content

Commit

Permalink
doc: make the code readable
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi authored and Vilayat-Ali committed Nov 21, 2023
1 parent c2e2519 commit dc7b77a
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,8 @@ impl<C: MqttClient> Serializer<C> {

/// Starts operation of the uplink serializer, which can transition between the modes mentioned earlier.
pub async fn start(mut self) -> Result<(), Error> {
// check for and publish the packets in persistence/inflight file
let mut status = Status::EventLoopReady;

self.reload_inflight().await?;
let mut status = Status::EventLoopReady;

loop {
let next_status = match status {
Expand All @@ -524,21 +522,23 @@ impl<C: MqttClient> Serializer<C> {
}
}

/// check for and publish the packets in persistence/inflight file
/// once done, delete the file.
async fn reload_inflight(&self) -> Result<(), Error> {
// check if the persistence/inflight file exists
// if exists then read and serialize it to Vec of publishes
// once done, delete the infight file and send the publishes onto the eventloop.
let mut path = self.config.persistence_path.clone();
let mut buf = Vec::new();
path.push("inflight");

if !path.is_file() {
return Ok(());
}

let mut buf = Vec::new();
let mut inflight_file = File::open(&path)?;
inflight_file.read_to_end(&mut buf)?;
let mut buf = BytesMut::from(buf.as_slice());
while let Ok(publish) = read(&mut buf, self.config.mqtt.max_packet_size) {
match publish {

while let Ok(packet) = read(&mut buf, self.config.mqtt.max_packet_size) {
match packet {
Packet::Publish(publish) => {
self.client
.publish(publish.topic, QoS::AtLeastOnce, false, publish.payload)
Expand All @@ -547,8 +547,9 @@ impl<C: MqttClient> Serializer<C> {
packet => unreachable!("Unexpected packet: {:?}", packet),
}
}
info!("Read inflight packets from : {}", path.display());
info!("Read and published inflight packets; removing file: {}", path.display());
std::fs::remove_file(path)?;

Ok(())
}
}
Expand Down

0 comments on commit dc7b77a

Please sign in to comment.