Skip to content

Commit

Permalink
feat: preferential ordering of streams during read from disk (#289)
Browse files Browse the repository at this point in the history
* feat: preferential ordering of streams

* test: remove incomplete test

* test: reorganize for testing priority
  • Loading branch information
Devdutt Shenoi authored Jan 23, 2024
1 parent fe7a9c2 commit 3ce9b03
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 32 deletions.
9 changes: 8 additions & 1 deletion configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,23 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# should perform on the data transiting through the stream. Currently supported
# compression schemes are Lz4 and Disabled. Defaults to Disabled.
# - persistence(optional): helps persist relevant information for data recovery purposes,
# used when there is a network/system failure.
# used when there is a network/system failure.
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
#
# In the following config for the device_shadow stream we set buf_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
flush_period = 5
priority = 75

# Example using compression
[streams.imu]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray/lz4"
buf_size = 100
compression = "Lz4"
priority = 50

# Configuration details associated with uplink's persistent storage module which writes publish
# packets to disk in case of slow or crashed network, for recovery purposes.
Expand Down Expand Up @@ -121,9 +125,12 @@ persistence = { max_file_count = 3 }
#
# NOTE: Action statuses are expected on a specifc topic as configured in example below.
# This also means that we require a topic to be configured or uplink will error out.
# Given the importance of conveying action status at the earliest to platform,
# it has highest priority by default of 255.
[action_status]
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
flush_period = 2
priority = 255

# Configurations for uplink's built-in file downloader, including the actions that can trigger
# a download, the location in file system where uplink will download and store files from the
Expand Down
26 changes: 16 additions & 10 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::Ordering;
use std::env::current_dir;
use std::hash::Hash;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, fmt::Debug};
Expand Down Expand Up @@ -64,15 +64,15 @@ pub fn clock() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
}

#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, Hash, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)]
pub enum Compression {
#[default]
Disabled,
Lz4,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Eq)]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct StreamConfig {
pub topic: String,
#[serde(default = "max_buf_size")]
Expand All @@ -86,6 +86,8 @@ pub struct StreamConfig {
pub compression: Compression,
#[serde(default)]
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
}

impl Default for StreamConfig {
Expand All @@ -96,23 +98,27 @@ impl Default for StreamConfig {
flush_period: default_timeout(),
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
}
}
}

impl Hash for StreamConfig {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.topic.hash(state)
impl Ord for StreamConfig {
fn cmp(&self, other: &Self) -> Ordering {
match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) {
(Ordering::Equal, o) => o,
(o, _) => o.reverse(),
}
}
}

impl PartialEq for StreamConfig {
fn eq(&self, other: &Self) -> bool {
self.topic == other.topic
impl PartialOrd for StreamConfig {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)]
pub struct Persistence {
#[serde(default = "default_file_size")]
pub max_file_size: usize,
Expand Down
200 changes: 179 additions & 21 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod metrics;

use std::collections::{HashMap, VecDeque};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::{self, Write};
use std::time::Instant;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -134,14 +134,14 @@ impl MqttClient for AsyncClient {
}

struct StorageHandler {
map: HashMap<Arc<StreamConfig>, Storage>,
map: BTreeMap<Arc<StreamConfig>, Storage>,
// Stream being read from
read_stream: Option<Arc<StreamConfig>>,
}

impl StorageHandler {
fn new(config: Arc<Config>) -> Result<Self, Error> {
let mut map = HashMap::with_capacity(2 * config.streams.len());
let mut map = BTreeMap::new();
for (stream_name, stream_config) in config.streams.iter() {
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
Expand Down Expand Up @@ -870,6 +870,7 @@ impl CtrlTx {
#[cfg(test)]
mod test {
use serde_json::Value;
use tokio::spawn;

use std::collections::HashMap;
use std::time::Duration;
Expand Down Expand Up @@ -971,23 +972,17 @@ mod test {
}

impl MockCollector {
fn new(data_tx: flume::Sender<Box<dyn Package>>) -> MockCollector {
MockCollector {
stream: Stream::new(
"hello",
StreamConfig {
topic: "hello/world".to_string(),
buf_size: 1,
..Default::default()
},
data_tx,
),
}
fn new(
stream_name: &str,
stream_config: StreamConfig,
data_tx: flume::Sender<Box<dyn Package>>,
) -> MockCollector {
MockCollector { stream: Stream::new(stream_name, stream_config, data_tx) }
}

fn send(&mut self, i: u32) -> Result<(), Error> {
let payload = Payload {
stream: "hello".to_owned(),
stream: Default::default(),
sequence: i,
timestamp: 0,
payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}")?,
Expand All @@ -1010,7 +1005,11 @@ mod test {
net_rx.recv().unwrap();
});

let mut collector = MockCollector::new(data_tx);
let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
std::thread::spawn(move || {
for i in 1..3 {
collector.send(i).unwrap();
Expand Down Expand Up @@ -1064,7 +1063,11 @@ mod test {
net_rx.recv().unwrap();
});

let mut collector = MockCollector::new(data_tx);
let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Faster collector, send data every 5s
std::thread::spawn(move || {
for i in 1..10 {
Expand Down Expand Up @@ -1092,7 +1095,11 @@ mod test {
let config = Arc::new(default_config());
let (mut serializer, data_tx, _) = defaults(config);

let mut collector = MockCollector::new(data_tx);
let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Faster collector, send data every 5s
std::thread::spawn(move || {
for i in 1..10 {
Expand Down Expand Up @@ -1149,7 +1156,11 @@ mod test {
.entry(Arc::new(Default::default()))
.or_insert(Storage::new("hello/world", 1024));

let mut collector = MockCollector::new(data_tx);
let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Run a collector practically once
std::thread::spawn(move || {
for i in 2..6 {
Expand Down Expand Up @@ -1204,7 +1215,11 @@ mod test {
}))
.or_insert(Storage::new("hello/world", 1024));

let mut collector = MockCollector::new(data_tx);
let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Run a collector
std::thread::spawn(move || {
for i in 2..6 {
Expand All @@ -1230,4 +1245,147 @@ mod test {
s => unreachable!("Unexpected status: {:?}", s),
}
}

#[tokio::test]
// Ensures that the data of streams are removed on the basis of preference
async fn preferential_send_on_network() {
let mut config = default_config();
config.stream_metrics.timeout = Duration::from_secs(1000);
config.streams.extend([
(
"one".to_owned(),
StreamConfig { topic: "topic/one".to_string(), priority: 1, ..Default::default() },
),
(
"two".to_owned(),
StreamConfig { topic: "topic/two".to_string(), priority: 2, ..Default::default() },
),
(
"top".to_owned(),
StreamConfig {
topic: "topic/top".to_string(),
priority: u8::MAX,
..Default::default()
},
),
]);
let config = Arc::new(config);

let (mut serializer, _data_tx, req_rx) = defaults(config.clone());

let publish = |topic: String, i: u32| Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
topic,
pkid: 0,
payload: Bytes::from(i.to_string()),
};

let mut one = serializer
.storage_handler
.map
.entry(Arc::new(StreamConfig {
topic: "topic/one".to_string(),
priority: 1,
..Default::default()
}))
.or_insert_with(|| unreachable!());
write_to_disk(publish("topic/one".to_string(), 1), &mut one).unwrap();
write_to_disk(publish("topic/one".to_string(), 10), &mut one).unwrap();

let top = serializer
.storage_handler
.map
.entry(Arc::new(StreamConfig {
topic: "topic/top".to_string(),
priority: u8::MAX,
..Default::default()
}))
.or_insert_with(|| unreachable!());
write_to_disk(publish("topic/top".to_string(), 100), top).unwrap();
write_to_disk(publish("topic/top".to_string(), 1000), top).unwrap();

let two = serializer
.storage_handler
.map
.entry(Arc::new(StreamConfig {
topic: "topic/two".to_string(),
priority: 2,
..Default::default()
}))
.or_insert_with(|| unreachable!());
write_to_disk(publish("topic/two".to_string(), 3), two).unwrap();

let mut default = serializer
.storage_handler
.map
.entry(Arc::new(StreamConfig {
topic: "topic/default".to_string(),
priority: 0,
..Default::default()
}))
.or_insert(Storage::new("topic/default", 1024));
write_to_disk(publish("topic/default".to_string(), 0), &mut default).unwrap();
write_to_disk(publish("topic/default".to_string(), 2), &mut default).unwrap();

// run serializer in the background
spawn(async { serializer.start().await.unwrap() });

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/top");
assert_eq!(payload, "100");
}
_ => unreachable!(),
}

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/top");
assert_eq!(payload, "1000");
}
_ => unreachable!(),
}

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/two");
assert_eq!(payload, "3");
}
_ => unreachable!(),
}

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/one");
assert_eq!(payload, "1");
}
_ => unreachable!(),
}

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/one");
assert_eq!(payload, "10");
}
_ => unreachable!(),
}

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "0");
}
_ => unreachable!(),
}

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "2");
}
_ => unreachable!(),
}
}
}
1 change: 1 addition & 0 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub mod config {
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
buf_size = 1
flush_period = 2
priority = 255 # highest priority for quick delivery of action status info to platform
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
Expand Down
1 change: 1 addition & 0 deletions vd-lib
Submodule vd-lib added at 1e71aa

0 comments on commit 3ce9b03

Please sign in to comment.