diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 814cbdb8..ffb9083a 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -458,15 +458,9 @@ impl CtrlTx { #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; + use tokio::runtime::Runtime; - use flume::{bounded, Receiver, Sender}; - use tokio::{runtime::Runtime, select}; - - use crate::{ - config::{ActionRoute, StreamConfig, StreamMetricsConfig}, - Action, ActionResponse, Config, - }; + use crate::config::{StreamConfig, StreamMetricsConfig}; use super::*; diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index a66449e7..3a8c5f4b 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -6,7 +6,7 @@ use std::time::Instant; use std::{sync::Arc, time::Duration}; use bytes::Bytes; -use flume::{bounded, Receiver, RecvError, Sender}; +use flume::{bounded, Receiver, RecvError, Sender, TrySendError}; use log::{debug, error, info, trace}; use lz4_flex::frame::FrameEncoder; use rumqttc::*; @@ -771,7 +771,7 @@ fn check_and_flush_metrics( metrics: &mut Metrics, metrics_tx: &Sender, storage_handler: &StorageHandler, -) -> Result<(), flume::TrySendError> { +) -> Result<(), TrySendError> { use pretty_bytes::converter::convert; let mut inmemory_write_size = 0; @@ -870,17 +870,16 @@ mod test { use serde_json::Value; use tokio::spawn; - use std::collections::HashMap; - use std::time::Duration; + use crate::{ + base::bridge::{stream::Stream, Payload}, + config::MqttConfig, + }; use super::*; - use crate::base::bridge::stream::Stream; - use crate::config::MqttConfig; - use crate::Payload; #[derive(Clone)] pub struct MockClient { - pub net_tx: flume::Sender, + pub net_tx: Sender, } #[async_trait::async_trait] @@ -948,10 +947,10 @@ mod test { fn defaults( config: Arc, - ) -> (Serializer, flume::Sender>, Receiver) { - let (data_tx, data_rx) = flume::bounded(1); - let (net_tx, net_rx) = flume::bounded(1); - let (metrics_tx, _metrics_rx) = flume::bounded(1); + ) -> (Serializer, Sender>, Receiver) { + let (data_tx, data_rx) = bounded(1); + let (net_tx, net_rx) = bounded(1); + let (metrics_tx, _metrics_rx) = bounded(1); let client = MockClient { net_tx }; (Serializer::new(config, data_rx, client, metrics_tx).unwrap(), data_tx, net_rx) @@ -973,7 +972,7 @@ mod test { fn new( stream_name: &str, stream_config: StreamConfig, - data_tx: flume::Sender>, + data_tx: Sender>, ) -> MockCollector { MockCollector { stream: Stream::new(stream_name, stream_config, data_tx) } } diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 8139819a..893e94ec 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -530,18 +530,19 @@ impl CtrlTx { #[cfg(test)] mod test { + use std::collections::HashMap; + use flume::bounded; use serde_json::json; use tempdir::TempDir; - use std::{collections::HashMap, time::Duration}; - - use super::*; use crate::{ base::bridge::{DataTx, StatusTx}, - config::{ActionRoute, DownloaderConfig, MqttConfig}, + config::{ActionRoute, MqttConfig}, }; + use super::*; + fn config(downloader: DownloaderConfig) -> Config { Config { broker: "localhost".to_owned(), diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index 40a0fadd..db8f74ac 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -146,20 +146,18 @@ impl ScriptRunner { #[cfg(test)] mod tests { - use std::thread; - - use super::*; - use crate::{ - base::bridge::{DataTx, StatusTx}, - Action, - }; + use std::thread::spawn; use flume::bounded; + use crate::base::bridge::{DataTx, StatusTx}; + + use super::*; + fn create_bridge() -> (BridgeTx, Receiver) { - let (inner, _) = flume::bounded(2); + let (inner, _) = bounded(2); let data_tx = DataTx { inner }; - let (inner, status_rx) = flume::bounded(2); + let (inner, status_rx) = bounded(2); let status_tx = StatusTx { inner }; (BridgeTx { data_tx, status_tx }, status_rx) @@ -171,7 +169,7 @@ mod tests { let (actions_tx, actions_rx) = bounded(1); let script_runner = ScriptRunner::new(actions_rx, bridge_tx); - thread::spawn(move || script_runner.start().unwrap()); + spawn(move || script_runner.start().unwrap()); actions_tx .send(Action { @@ -194,7 +192,7 @@ mod tests { let (actions_tx, actions_rx) = bounded(1); let script_runner = ScriptRunner::new(actions_rx, bridge_tx); - thread::spawn(move || script_runner.start().unwrap()); + spawn(move || script_runner.start().unwrap()); actions_tx .send(Action {