diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 848b0d71..cebec56f 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -507,10 +507,8 @@ impl Serializer { /// Starts operation of the uplink serializer, which can transition between the modes mentioned earlier. pub async fn start(mut self) -> Result<(), Error> { - // check for and publish the packets in persistence/inflight file - let mut status = Status::EventLoopReady; - self.reload_inflight().await?; + let mut status = Status::EventLoopReady; loop { let next_status = match status { @@ -524,21 +522,23 @@ impl Serializer { } } + /// check for and publish the packets in persistence/inflight file + /// once done, delete the file. async fn reload_inflight(&self) -> Result<(), Error> { - // check if the persistence/inflight file exists - // if exists then read and serialize it to Vec of publishes - // once done, delete the infight file and send the publishes onto the eventloop. let mut path = self.config.persistence_path.clone(); - let mut buf = Vec::new(); path.push("inflight"); + if !path.is_file() { return Ok(()); } + + let mut buf = Vec::new(); let mut inflight_file = File::open(&path)?; inflight_file.read_to_end(&mut buf)?; let mut buf = BytesMut::from(buf.as_slice()); - while let Ok(publish) = read(&mut buf, self.config.mqtt.max_packet_size) { - match publish { + + while let Ok(packet) = read(&mut buf, self.config.mqtt.max_packet_size) { + match packet { Packet::Publish(publish) => { self.client .publish(publish.topic, QoS::AtLeastOnce, false, publish.payload) @@ -547,8 +547,9 @@ impl Serializer { packet => unreachable!("Unexpected packet: {:?}", packet), } } - info!("Read inflight packets from : {}", path.display()); + info!("Read and published inflight packets; removing file: {}", path.display()); std::fs::remove_file(path)?; + Ok(()) } }