Skip to content

Commit

Permalink
reorg imports
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Apr 5, 2024
1 parent 855a3d0 commit 3f9d2d9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 36 deletions.
10 changes: 2 additions & 8 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,9 @@ impl CtrlTx {

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use tokio::runtime::Runtime;

use flume::{bounded, Receiver, Sender};
use tokio::{runtime::Runtime, select};

use crate::{
config::{ActionRoute, StreamConfig, StreamMetricsConfig},
Action, ActionResponse, Config,
};
use crate::config::{StreamConfig, StreamMetricsConfig};

use super::*;

Expand Down
25 changes: 12 additions & 13 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Instant;
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use flume::{bounded, Receiver, RecvError, Sender};
use flume::{bounded, Receiver, RecvError, Sender, TrySendError};
use log::{debug, error, info, trace};
use lz4_flex::frame::FrameEncoder;
use rumqttc::*;
Expand Down Expand Up @@ -771,7 +771,7 @@ fn check_and_flush_metrics(
metrics: &mut Metrics,
metrics_tx: &Sender<SerializerMetrics>,
storage_handler: &StorageHandler,
) -> Result<(), flume::TrySendError<SerializerMetrics>> {
) -> Result<(), TrySendError<SerializerMetrics>> {
use pretty_bytes::converter::convert;

let mut inmemory_write_size = 0;
Expand Down Expand Up @@ -870,17 +870,16 @@ mod test {
use serde_json::Value;
use tokio::spawn;

use std::collections::HashMap;
use std::time::Duration;
use crate::{
base::bridge::{stream::Stream, Payload},
config::MqttConfig,
};

use super::*;
use crate::base::bridge::stream::Stream;
use crate::config::MqttConfig;
use crate::Payload;

#[derive(Clone)]
pub struct MockClient {
pub net_tx: flume::Sender<Request>,
pub net_tx: Sender<Request>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -948,10 +947,10 @@ mod test {

fn defaults(
config: Arc<Config>,
) -> (Serializer<MockClient>, flume::Sender<Box<dyn Package>>, Receiver<Request>) {
let (data_tx, data_rx) = flume::bounded(1);
let (net_tx, net_rx) = flume::bounded(1);
let (metrics_tx, _metrics_rx) = flume::bounded(1);
) -> (Serializer<MockClient>, Sender<Box<dyn Package>>, Receiver<Request>) {
let (data_tx, data_rx) = bounded(1);
let (net_tx, net_rx) = bounded(1);
let (metrics_tx, _metrics_rx) = bounded(1);
let client = MockClient { net_tx };

(Serializer::new(config, data_rx, client, metrics_tx).unwrap(), data_tx, net_rx)
Expand All @@ -973,7 +972,7 @@ mod test {
fn new(
stream_name: &str,
stream_config: StreamConfig,
data_tx: flume::Sender<Box<dyn Package>>,
data_tx: Sender<Box<dyn Package>>,
) -> MockCollector {
MockCollector { stream: Stream::new(stream_name, stream_config, data_tx) }
}
Expand Down
9 changes: 5 additions & 4 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,18 +530,19 @@ impl CtrlTx {

#[cfg(test)]
mod test {
use std::collections::HashMap;

use flume::bounded;
use serde_json::json;
use tempdir::TempDir;

use std::{collections::HashMap, time::Duration};

use super::*;
use crate::{
base::bridge::{DataTx, StatusTx},
config::{ActionRoute, DownloaderConfig, MqttConfig},
config::{ActionRoute, MqttConfig},
};

use super::*;

fn config(downloader: DownloaderConfig) -> Config {
Config {
broker: "localhost".to_owned(),
Expand Down
20 changes: 9 additions & 11 deletions uplink/src/collector/script_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,18 @@ impl ScriptRunner {

#[cfg(test)]
mod tests {
use std::thread;

use super::*;
use crate::{
base::bridge::{DataTx, StatusTx},
Action,
};
use std::thread::spawn;

use flume::bounded;

use crate::base::bridge::{DataTx, StatusTx};

use super::*;

fn create_bridge() -> (BridgeTx, Receiver<ActionResponse>) {
let (inner, _) = flume::bounded(2);
let (inner, _) = bounded(2);
let data_tx = DataTx { inner };
let (inner, status_rx) = flume::bounded(2);
let (inner, status_rx) = bounded(2);
let status_tx = StatusTx { inner };

(BridgeTx { data_tx, status_tx }, status_rx)
Expand All @@ -171,7 +169,7 @@ mod tests {

let (actions_tx, actions_rx) = bounded(1);
let script_runner = ScriptRunner::new(actions_rx, bridge_tx);
thread::spawn(move || script_runner.start().unwrap());
spawn(move || script_runner.start().unwrap());

actions_tx
.send(Action {
Expand All @@ -194,7 +192,7 @@ mod tests {
let (actions_tx, actions_rx) = bounded(1);
let script_runner = ScriptRunner::new(actions_rx, bridge_tx);

thread::spawn(move || script_runner.start().unwrap());
spawn(move || script_runner.start().unwrap());

actions_tx
.send(Action {
Expand Down

0 comments on commit 3f9d2d9

Please sign in to comment.