diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 1922083f8..7d25eb531 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -1,10 +1,11 @@ mod metrics; use std::collections::{HashMap, VecDeque}; -use std::io::{self, Write}; +use std::fs::File; +use std::io::{self, Read, Write}; use std::{sync::Arc, time::Duration}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use flume::{Receiver, RecvError, Sender}; use log::{debug, error, info, trace}; use lz4_flex::frame::FrameEncoder; @@ -509,6 +510,8 @@ impl Serializer { // check for and publish the packets in persistence/inflight file let mut status = Status::EventLoopReady; + self.reload_inflight().await?; + loop { let next_status = match status { Status::Normal => self.normal().await?, @@ -520,6 +523,35 @@ impl Serializer { status = next_status; } } + + async fn reload_inflight(&self) -> Result<(), Error> { + // check if the persistence/inflight file exists + // if exists then read and serialize it to Vec of publishes + // once done, delete the infight file and send the publishes onto the eventloop. + let mut path = self.config.persistence_path.clone(); + let mut buf = Vec::new(); + path.push("inflight"); + if !path.is_file() { + return Ok(()); + } + let mut inflight_file = File::open(&path)?; + inflight_file.read_to_end(&mut buf)?; + let mut buf = BytesMut::from(buf.as_slice()); + // parse into publishes + while let Ok(publish) = read(&mut buf, self.config.mqtt.max_packet_size) { + match publish { + Packet::Publish(publish) => { + self.client + .publish(publish.topic, QoS::AtLeastOnce, false, publish.payload) + .await?; + } + packet => unreachable!("Unexpected packet: {:?}", packet), + } + } + info!("Read inflight packets from : {}", path.display()); + std::fs::remove_file(path)?; + Ok(()) + } } async fn send_publish(