Skip to content

Commit

Permalink
test: ensure serializer handles data in FIFO order
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 11, 2024
1 parent 95a9985 commit dc9a079
Showing 1 changed file with 51 additions and 2 deletions.
53 changes: 51 additions & 2 deletions uplink/tests/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{fs::create_dir_all, path::PathBuf, sync::Arc, time::Duration};
use std::{fs::create_dir_all, path::PathBuf, sync::Arc, thread, time::Duration};

use bytes::Bytes;
use flume::bounded;
use rumqttc::{Publish, QoS, Request};
use tempdir::TempDir;
use tokio::spawn;
use tokio::{runtime::Runtime, spawn};

use uplink::{
base::{bridge::Payload, serializer::Serializer},
Expand Down Expand Up @@ -167,3 +167,52 @@ async fn preferential_send_on_network() {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":7,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
}

#[tokio::test]
// Ensures that data pushed maintains FIFO order
async fn fifo_data_push() {
let mut config = Config::default();
config.default_buf_size = 1024 * 1024;
config.mqtt.max_packet_size = 1024 * 1024;
let config = Arc::new(config);
let (data_tx, data_rx) = bounded(1);
let (net_tx, req_rx) = bounded(1);
let (metrics_tx, _metrics_rx) = bounded(1);
let client = MockClient { net_tx };
let serializer = Serializer::new(config, data_rx, client, metrics_tx).unwrap();

// start serializer in the background
thread::spawn(|| _ = Runtime::new().unwrap().block_on(serializer.start()));

spawn(async {
let mut default = MockCollector::new(
"default",
StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() },
data_tx,
);
for i in 0.. {
default.send(i).await.unwrap();
}
});

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":0,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
}

0 comments on commit dc9a079

Please sign in to comment.