Skip to content

Commit

Permalink
style: improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Jun 8, 2024
1 parent 9e61907 commit 140db80
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 146 deletions.
46 changes: 21 additions & 25 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,27 @@ impl Storage {
if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}
match &mut self.persistence {
Some(persistence) => {
let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;

// 8 is the number of bytes the hash(u64) occupies
persistence.bytes_occupied += 8 + self.current_write_file.len();
self.current_write_file.clear();
let Some(persistence) = &mut self.persistence else {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
// with id 0
self.current_write_file.clear();
warn!(
"Persistence disabled for storage: {}. Deleted in-memory buffer on overflow",
self.name
);
return Ok(Some(0));
};

Ok(deleted)
}
None => {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
// with id 0
self.current_write_file.clear();
warn!(
"Persistence disabled for storage: {}. Deleted in-memory buffer on overflow",
self.name
);
Ok(Some(0))
}
}
let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;

// 8 is the number of bytes the hash(u64) occupies
persistence.bytes_occupied += 8 + self.current_write_file.len();
self.current_write_file.clear();

Ok(deleted)
}

/// Loads head file to current inmemory read buffer. Deletes
Expand Down Expand Up @@ -197,10 +195,8 @@ fn get_file_ids(path: &Path) -> Result<VecDeque<u64>, Error> {
continue;
}

match id(&path) {
Ok(id) => file_ids.push(id),
Err(_) => continue,
}
let Ok(id) = id(&path) else { continue };
file_ids.push(id);
}

file_ids.sort_unstable();
Expand Down
10 changes: 3 additions & 7 deletions tools/simulator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,11 @@ async fn main() -> Result<(), Error> {
spawn(ActionResponse::simulate(action, tx.clone()));
}
p = rx.recv_async() => {
let payload = match p {
Ok(p) => p,
Err(_) => {
error!("All generators have stopped!");
return Ok(())
}
let Ok(payload) = p else {
error!("All generators have stopped!");
return Ok(());
};


let text = serde_json::to_string(&payload)?;
data_tx.send(text).await?;
}
Expand Down
17 changes: 6 additions & 11 deletions tools/utils/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ async fn main() {
let t2 = t2.clone();
tokio::spawn(async move {
loop {
let value = match r1.recv_async().await {
Ok(value) => value,
Err(_) => break,
};
let Ok(value) = r1.recv_async().await else { break };
first_response(value);
match t2.send_async(value).await {
Ok(value) => value,
Err(_) => break,
if t2.send_async(value).await.is_err() {
break;
}
}
});
Expand All @@ -34,10 +30,9 @@ async fn main() {
tokio::spawn(async move {
let mut idx = 1;
loop {
match t1.send_async(idx).await {
Ok(value) => value,
Err(_) => break,
};
if t1.send_async(idx).await.is_err() {
break;
}
idx += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
11 changes: 5 additions & 6 deletions uplink/src/base/bridge/delaymap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ impl<T: Eq + Hash + Clone + Display> DelayMap<T> {

// Removes timeout if it exists, else returns false.
pub fn remove(&mut self, item: &T) {
match self.map.remove(item) {
Some(key) => {
self.queue.remove(&key);
}
None => warn!("Timeout couldn't be removed from DelayMap: {}", item),
}
let Some(key) = self.map.remove(item) else {
warn!("Timeout couldn't be removed from DelayMap: {item}");
return;
};
self.queue.remove(&key);
}

// Insert new timeout.
Expand Down
32 changes: 16 additions & 16 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@ impl<T: Point> Streams<T> {
pub async fn forward(&mut self, data: T) {
let stream_name = data.stream_name().to_string();

let stream = match self.map.get_mut(&stream_name) {
Some(partition) => partition,
None => {
if self.config.simulator.is_none() && self.map.keys().len() > 20 {
error!("Failed to create {:?} stream. More than max 20 streams", stream_name);
return;
}
// Create stream if it doesn't already exist
if !self.map.contains_key(&stream_name) {
if self.config.simulator.is_none() && self.map.keys().len() > 20 {
error!("Failed to create {:?} stream. More than max 20 streams", stream_name);
return;
}

let stream = Stream::dynamic(
&stream_name,
&self.config.project_id,
&self.config.device_id,
self.data_tx.clone(),
);
let stream = Stream::dynamic(
&stream_name,
&self.config.project_id,
&self.config.device_id,
self.data_tx.clone(),
);

self.map.entry(stream_name.to_owned()).or_insert(stream)
}
};
self.map.insert(stream_name.to_owned(), stream);
}
// Doesn't panic because of above check
let stream = self.map.get_mut(&stream_name).unwrap();

let max_stream_size = stream.config.batch_size;
let state = match stream.fill(data).await {
Expand Down
4 changes: 1 addition & 3 deletions uplink/src/base/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ impl Mqtt {
debug!("Outgoing = {:?}", packet);
match packet {
rumqttc::Outgoing::Publish(_) => self.metrics.add_publish(),
rumqttc::Outgoing::PingReq => {
self.metrics.add_pingreq();
}
rumqttc::Outgoing::PingReq => self.metrics.add_pingreq(),
_ => {}
}
}
Expand Down
109 changes: 48 additions & 61 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,11 @@ impl<C: MqttClient> Serializer<C> {
self.metrics.add_batch();
}
o = &mut publish => match o {
Ok(_) => {
break Ok(Status::EventLoopReady)
}
Ok(_) => break Ok(Status::EventLoopReady),
Err(MqttError::Send(Request::Publish(publish))) => {
break Ok(Status::EventLoopCrash(publish, stream));
},
Err(e) => {
unreachable!("Unexpected error: {}", e);
}
Err(e) => unreachable!("Unexpected error: {}", e),
},
_ = interval.tick() => {
check_metrics(&mut self.metrics, &mut self.stream_metrics, &self.storage_handler);
Expand Down Expand Up @@ -440,9 +436,8 @@ impl<C: MqttClient> Serializer<C> {
let max_packet_size = self.config.mqtt.max_packet_size;
let client = self.client.clone();

let (stream, storage) = match self.storage_handler.next(&mut self.metrics) {
Some(s) => s,
_ => return Ok(Status::Normal),
let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else {
return Ok(Status::Normal);
};

// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
Expand Down Expand Up @@ -500,9 +495,8 @@ impl<C: MqttClient> Serializer<C> {
Err(e) => unreachable!("Unexpected error: {}", e),
};

let (stream, storage) = match self.storage_handler.next(&mut self.metrics) {
Some(s) => s,
_ => return Ok(Status::Normal),
let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else {
return Ok(Status::Normal);
};

let publish = match Packet::read(storage.reader(), max_packet_size) {
Expand Down Expand Up @@ -1328,60 +1322,53 @@ mod test {
// run serializer in the background
spawn(async { serializer.start().await.unwrap() });

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/top");
assert_eq!(payload, "100");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/top");
assert_eq!(payload, "100");

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/top");
assert_eq!(payload, "1000");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/top");
assert_eq!(payload, "1000");

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/two");
assert_eq!(payload, "3");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/two");
assert_eq!(payload, "3");

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/one");
assert_eq!(payload, "1");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/one");
assert_eq!(payload, "1");

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/one");
assert_eq!(payload, "10");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/one");
assert_eq!(payload, "10");

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "0");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "0");

match req_rx.recv_async().await.unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "2");
}
_ => unreachable!(),
}
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "2");
}
}
39 changes: 22 additions & 17 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,30 @@ impl CommandLine {
/// Reads config file to generate config struct and replaces places holders
/// like bike id and data version
fn get_configs(&self) -> Result<Config, anyhow::Error> {
let read_file_contents = |path| std::fs::read_to_string(path).ok();
let auth = read_file_contents(&self.auth).ok_or_else(|| {
Error::msg(format!("Auth file not found at \"{}\"", self.auth.display()))
})?;
let config = match &self.config {
Some(path) => Some(read_file_contents(path).ok_or_else(|| {
Error::msg(format!("Config file not found at \"{}\"", path.display()))
})?),
None => None,
};
let read_file_contents = |path| std::fs::read_to_string(path);
let mut config =
config::Config::builder().add_source(File::from_str(DEFAULT_CONFIG, FileFormat::Toml));

if let Some(path) = &self.config {
let read = read_file_contents(path).map_err(|e| {
Error::msg(format!(
"Config file couldn't be loaded from \"{}\"; error = {e}",
path.display()
))
})?;
config = config.add_source(File::from_str(&read, FileFormat::Toml));
}

let config = config::Config::builder()
.add_source(File::from_str(DEFAULT_CONFIG, FileFormat::Toml))
.add_source(File::from_str(&config.unwrap_or_default(), FileFormat::Toml))
.add_source(File::from_str(&auth, FileFormat::Json))
.add_source(Environment::default())
.build()?;
let auth = read_file_contents(&self.auth).map_err(|e| {
Error::msg(format!(
"Auth file couldn't be loaded from \"{}\"; error = {e}",
self.auth.display()
))
})?;
config = config.add_source(File::from_str(&auth, FileFormat::Json));

let mut config: Config = config.try_deserialize()?;
let mut config: Config =
config.add_source(Environment::default()).build()?.try_deserialize()?;

// Create directory at persistence_path if it doesn't already exist
std::fs::create_dir_all(&config.persistence_path).map_err(|_| {
Expand Down

0 comments on commit 140db80

Please sign in to comment.