Skip to content

Commit

Permalink
fix: save pending files from getting lost
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Dec 31, 2023
1 parent fe0fdcd commit 3b03958
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ flume = "0.10"
futures-util = "0.3"
log = "0.4"
rand = "0.8"
rumqttc = { git = "https://github.com/bytebeamio/rumqtt" }
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "fix-pending" }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tempdir = "0.3"
Expand Down
7 changes: 3 additions & 4 deletions uplink/src/base/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ impl Mqtt {
file.read(&mut buf)?;

let max_packet_size = self.config.mqtt.max_packet_size;

let mut pending = vec![];
loop {
// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
// This leads to force switching to catchup mode. Increasing max_payload_size to bypass this
match read(&mut buf, max_packet_size) {
Ok(Packet::Publish(publish)) => pending.push(Request::Publish(publish)),
Ok(Packet::Publish(publish)) => {
self.eventloop.pending.push(Request::Publish(publish))
}
Ok(packet) => unreachable!("Unexpected packet: {:?}", packet),
Err(rumqttc::Error::InsufficientBytes(_)) => break,
Err(e) => {
Expand All @@ -152,7 +152,6 @@ impl Mqtt {
}
}
}
self.eventloop.pending = pending.into_iter();

info!("Reloaded inflight publishes from previous session; removing file: {}", path.display());
file.delete()?;
Expand Down

0 comments on commit 3b03958

Please sign in to comment.