Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reorganize binary helpers and mod config #324

Merged
merged 6 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration}

use super::streams::Streams;
use super::{ActionBridgeShutdown, Package, StreamMetrics};
use crate::base::ActionRoute;
use crate::config::ActionRoute;
use crate::{Action, ActionResponse, Config};

const TUNSHELL_ACTION: &str = "launch_shell";
Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {
use tokio::{runtime::Runtime, select};

use crate::{
base::{ActionRoute, StreamConfig, StreamMetricsConfig},
config::{ActionRoute, StreamConfig, StreamMetricsConfig},
Action, ActionResponse, Config,
};

Expand Down
3 changes: 1 addition & 2 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx};
use data_lane::DataBridge;
pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx};

use super::StreamConfig;
use crate::base::ActionRoute;
use crate::config::{ActionRoute, StreamConfig};
use crate::{Action, ActionResponse, Config};
pub use metrics::StreamMetrics;

Expand Down
3 changes: 1 addition & 2 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use flume::{SendError, Sender};
use log::{debug, trace};
use serde::Serialize;

use crate::base::StreamConfig;

use crate::config::StreamConfig;
use super::{Package, Point, StreamMetrics};

/// Signals status of stream buffer
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::{error, info, trace};

use super::stream::{self, StreamStatus};
use super::{Point, StreamMetrics};
use crate::base::StreamConfig;
use crate::config::StreamConfig;
use crate::{Config, Package, Stream};

use super::delaymap::DelayMap;
Expand Down
278 changes: 2 additions & 276 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
use std::cmp::Ordering;
use std::env::current_dir;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, fmt::Debug};
use std::fmt::Debug;
use std::time::{SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use tokio::join;

#[cfg(target_os = "linux")]
use crate::collector::journalctl::JournalCtlConfig;
#[cfg(target_os = "android")]
use crate::collector::logcat::LogcatConfig;

use self::bridge::stream::MAX_BATCH_SIZE;
use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx};
use self::mqtt::CtrlTx as MqttCtrlTx;
use self::serializer::CtrlTx as SerializerCtrlTx;
Expand All @@ -24,273 +13,10 @@ pub mod monitor;
pub mod mqtt;
pub mod serializer;

pub const DEFAULT_TIMEOUT: u64 = 60;

#[inline]
fn default_timeout() -> Duration {
Duration::from_secs(DEFAULT_TIMEOUT)
}

#[inline]
fn max_batch_size() -> usize {
MAX_BATCH_SIZE
}

fn default_file_size() -> usize {
10485760 // 10MB
}

fn default_persistence_path() -> PathBuf {
let mut path = current_dir().expect("Couldn't figure out current directory");
path.push(".persistence");
path
}

fn default_download_path() -> PathBuf {
let mut path = current_dir().expect("Couldn't figure out current directory");
path.push(".downloads");
path
}

// Automatically assigns port 5050 for default main app, if left unconfigured
fn default_tcpapps() -> HashMap<String, AppConfig> {
let mut apps = HashMap::new();
apps.insert("main".to_string(), AppConfig { port: 5050, actions: vec![] });

apps
}

pub fn clock() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
}

#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)]
pub enum Compression {
#[default]
Disabled,
Lz4,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct StreamConfig {
pub topic: String,
#[serde(default = "max_batch_size", alias = "buf_size")]
pub batch_size: usize,
#[serde(default = "default_timeout")]
#[serde_as(as = "DurationSeconds<u64>")]
/// Duration(in seconds) that bridge collector waits from
/// receiving first element, before the stream gets flushed.
pub flush_period: Duration,
#[serde(default)]
pub compression: Compression,
#[serde(default)]
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
}

impl Default for StreamConfig {
fn default() -> Self {
Self {
topic: "".to_string(),
batch_size: MAX_BATCH_SIZE,
flush_period: default_timeout(),
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
}
}
}

impl Ord for StreamConfig {
fn cmp(&self, other: &Self) -> Ordering {
match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) {
(Ordering::Equal, o) => o,
(o, _) => o.reverse(),
}
}
}

impl PartialOrd for StreamConfig {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)]
pub struct Persistence {
#[serde(default = "default_file_size")]
pub max_file_size: usize,
#[serde(default)]
pub max_file_count: usize,
}

impl Default for Persistence {
fn default() -> Self {
Persistence { max_file_size: default_file_size(), max_file_count: 0 }
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Authentication {
pub ca_certificate: String,
pub device_certificate: String,
pub device_private_key: String,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct Stats {
pub enabled: bool,
pub process_names: Vec<String>,
pub update_period: u64,
pub stream_size: Option<usize>,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct SimulatorConfig {
/// path to directory containing files with gps paths to be used in simulation
pub gps_paths: String,
/// actions that are to be routed to simulator
pub actions: Vec<ActionRoute>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct DownloaderConfig {
#[serde(default = "default_download_path")]
pub path: PathBuf,
pub actions: Vec<ActionRoute>,
}

impl Default for DownloaderConfig {
fn default() -> Self {
Self { path: default_download_path(), actions: vec![] }
}
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct InstallerConfig {
pub path: String,
pub actions: Vec<ActionRoute>,
pub uplink_port: u16,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct StreamMetricsConfig {
pub enabled: bool,
pub bridge_topic: String,
pub serializer_topic: String,
pub blacklist: Vec<String>,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct SerializerMetricsConfig {
pub enabled: bool,
pub topic: String,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct MqttMetricsConfig {
pub enabled: bool,
pub topic: String,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct AppConfig {
pub port: u16,
#[serde(default)]
pub actions: Vec<ActionRoute>,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConsoleConfig {
pub enabled: bool,
pub port: u16,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct MqttConfig {
pub max_packet_size: usize,
pub max_inflight: u16,
pub keep_alive: u64,
pub network_timeout: u64,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ActionRoute {
pub name: String,
#[serde(default = "default_timeout")]
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
}

impl From<&ActionRoute> for ActionRoute {
fn from(value: &ActionRoute) -> Self {
value.clone()
}
}

#[derive(Clone, Debug, Deserialize)]
pub struct DeviceShadowConfig {
pub interval: u64,
}

impl Default for DeviceShadowConfig {
fn default() -> Self {
Self { interval: DEFAULT_TIMEOUT }
}
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct Config {
pub project_id: String,
pub device_id: String,
pub broker: String,
pub port: u16,
#[serde(default)]
pub console: ConsoleConfig,
pub authentication: Option<Authentication>,
#[serde(default = "default_tcpapps")]
pub tcpapps: HashMap<String, AppConfig>,
pub mqtt: MqttConfig,
#[serde(default)]
pub processes: Vec<ActionRoute>,
#[serde(default)]
pub script_runner: Vec<ActionRoute>,
#[serde(skip)]
pub actions_subscription: String,
pub streams: HashMap<String, StreamConfig>,
#[serde(default = "default_persistence_path")]
pub persistence_path: PathBuf,
pub action_status: StreamConfig,
pub stream_metrics: StreamMetricsConfig,
pub serializer_metrics: SerializerMetricsConfig,
pub mqtt_metrics: MqttMetricsConfig,
#[serde(default)]
pub downloader: DownloaderConfig,
pub system_stats: Stats,
pub simulator: Option<SimulatorConfig>,
#[serde(default)]
pub ota_installer: InstallerConfig,
#[serde(default)]
pub device_shadow: DeviceShadowConfig,
#[serde(default)]
pub action_redirections: HashMap<String, String>,
#[serde(default)]
pub ignore_actions_if_no_clients: bool,
#[cfg(target_os = "linux")]
pub logging: Option<JournalCtlConfig>,
#[cfg(target_os = "android")]
pub logging: Option<LogcatConfig>,
}

/// Send control messages to the various components in uplink. Currently this is
/// used only to trigger uplink shutdown. Shutdown signals are sent to all
/// components simultaneously with a join.
Expand Down
6 changes: 2 additions & 4 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ use storage::Storage;
use thiserror::Error;
use tokio::{select, time::interval};

use crate::base::Compression;
use crate::config::{default_file_size, Compression, StreamConfig};
use crate::{Config, Package};
pub use metrics::{Metrics, SerializerMetrics, StreamMetrics};

use super::{default_file_size, StreamConfig};

const METRICS_INTERVAL: Duration = Duration::from_secs(10);

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -877,7 +875,7 @@ mod test {

use super::*;
use crate::base::bridge::stream::Stream;
use crate::base::MqttConfig;
use crate::config::MqttConfig;
use crate::Payload;

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/collector/device_shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use log::{error, trace};
use serde::Serialize;

use crate::base::DeviceShadowConfig;
use crate::config::DeviceShadowConfig;
use crate::base::{bridge::BridgeTx, clock};
use crate::Payload;

Expand Down
10 changes: 4 additions & 6 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ use std::{
};
use std::{io::Write, path::PathBuf};

use crate::base::bridge::BridgeTx;
use crate::base::DownloaderConfig;
use crate::{Action, ActionResponse, Config};
use crate::{base::bridge::BridgeTx, config::DownloaderConfig, Action, ActionResponse, Config};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -413,9 +411,9 @@ mod test {
use std::{collections::HashMap, time::Duration};

use super::*;
use crate::base::{
bridge::{DataTx, StatusTx},
ActionRoute, DownloaderConfig, MqttConfig,
use crate::{
base::bridge::{DataTx, StatusTx},
config::{ActionRoute, DownloaderConfig, MqttConfig},
};

const DOWNLOAD_DIR: &str = "/tmp/uplink_test";
Expand Down
Loading
Loading