diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index bf096b4c..3aeda03d 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -11,7 +11,7 @@ use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration} use super::streams::Streams; use super::{ActionBridgeShutdown, Package, StreamMetrics}; -use crate::base::ActionRoute; +use crate::config::ActionRoute; use crate::{Action, ActionResponse, Config}; const TUNSHELL_ACTION: &str = "launch_shell"; @@ -464,7 +464,7 @@ mod tests { use tokio::{runtime::Runtime, select}; use crate::{ - base::{ActionRoute, StreamConfig, StreamMetricsConfig}, + config::{ActionRoute, StreamConfig, StreamMetricsConfig}, Action, ActionResponse, Config, }; diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index 9f3a2a91..11877511 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -16,8 +16,7 @@ pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx}; use data_lane::DataBridge; pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx}; -use super::StreamConfig; -use crate::base::ActionRoute; +use crate::config::{ActionRoute, StreamConfig}; use crate::{Action, ActionResponse, Config}; pub use metrics::StreamMetrics; diff --git a/uplink/src/base/bridge/stream.rs b/uplink/src/base/bridge/stream.rs index 64c14da9..f0baa7d5 100644 --- a/uplink/src/base/bridge/stream.rs +++ b/uplink/src/base/bridge/stream.rs @@ -4,8 +4,7 @@ use flume::{SendError, Sender}; use log::{debug, trace}; use serde::Serialize; -use crate::base::StreamConfig; - +use crate::config::StreamConfig; use super::{Package, Point, StreamMetrics}; /// Signals status of stream buffer diff --git a/uplink/src/base/bridge/streams.rs b/uplink/src/base/bridge/streams.rs index fc1b92c8..5efb8bf0 100644 --- a/uplink/src/base/bridge/streams.rs +++ b/uplink/src/base/bridge/streams.rs @@ -6,7 +6,7 @@ use log::{error, info, trace}; use super::stream::{self, StreamStatus}; use super::{Point, StreamMetrics}; -use crate::base::StreamConfig; +use crate::config::StreamConfig; use crate::{Config, Package, Stream}; use super::delaymap::DelayMap; diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 7f27f725..442a0f81 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -1,19 +1,8 @@ -use std::cmp::Ordering; -use std::env::current_dir; -use std::path::PathBuf; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, fmt::Debug}; +use std::fmt::Debug; +use std::time::{SystemTime, UNIX_EPOCH}; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DurationSeconds}; use tokio::join; -#[cfg(target_os = "linux")] -use crate::collector::journalctl::JournalCtlConfig; -#[cfg(target_os = "android")] -use crate::collector::logcat::LogcatConfig; - -use self::bridge::stream::MAX_BATCH_SIZE; use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx}; use self::mqtt::CtrlTx as MqttCtrlTx; use self::serializer::CtrlTx as SerializerCtrlTx; @@ -24,273 +13,10 @@ pub mod monitor; pub mod mqtt; pub mod serializer; -pub const DEFAULT_TIMEOUT: u64 = 60; - -#[inline] -fn default_timeout() -> Duration { - Duration::from_secs(DEFAULT_TIMEOUT) -} - -#[inline] -fn max_batch_size() -> usize { - MAX_BATCH_SIZE -} - -fn default_file_size() -> usize { - 10485760 // 10MB -} - -fn default_persistence_path() -> PathBuf { - let mut path = current_dir().expect("Couldn't figure out current directory"); - path.push(".persistence"); - path -} - -fn default_download_path() -> PathBuf { - let mut path = current_dir().expect("Couldn't figure out current directory"); - path.push(".downloads"); - path -} - -// Automatically assigns port 5050 for default main app, if left unconfigured -fn default_tcpapps() -> HashMap { - let mut apps = HashMap::new(); - apps.insert("main".to_string(), AppConfig { port: 5050, actions: vec![] }); - - apps -} - pub fn clock() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() } -#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)] -pub enum Compression { - #[default] - Disabled, - Lz4, -} - -#[serde_as] -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -pub struct StreamConfig { - pub topic: String, - #[serde(default = "max_batch_size", alias = "buf_size")] - pub batch_size: usize, - #[serde(default = "default_timeout")] - #[serde_as(as = "DurationSeconds")] - /// Duration(in seconds) that bridge collector waits from - /// receiving first element, before the stream gets flushed. - pub flush_period: Duration, - #[serde(default)] - pub compression: Compression, - #[serde(default)] - pub persistence: Persistence, - #[serde(default)] - pub priority: u8, -} - -impl Default for StreamConfig { - fn default() -> Self { - Self { - topic: "".to_string(), - batch_size: MAX_BATCH_SIZE, - flush_period: default_timeout(), - compression: Compression::Disabled, - persistence: Persistence::default(), - priority: 0, - } - } -} - -impl Ord for StreamConfig { - fn cmp(&self, other: &Self) -> Ordering { - match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) { - (Ordering::Equal, o) => o, - (o, _) => o.reverse(), - } - } -} - -impl PartialOrd for StreamConfig { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)] -pub struct Persistence { - #[serde(default = "default_file_size")] - pub max_file_size: usize, - #[serde(default)] - pub max_file_count: usize, -} - -impl Default for Persistence { - fn default() -> Self { - Persistence { max_file_size: default_file_size(), max_file_count: 0 } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Authentication { - pub ca_certificate: String, - pub device_certificate: String, - pub device_private_key: String, -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct Stats { - pub enabled: bool, - pub process_names: Vec, - pub update_period: u64, - pub stream_size: Option, -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct SimulatorConfig { - /// path to directory containing files with gps paths to be used in simulation - pub gps_paths: String, - /// actions that are to be routed to simulator - pub actions: Vec, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct DownloaderConfig { - #[serde(default = "default_download_path")] - pub path: PathBuf, - pub actions: Vec, -} - -impl Default for DownloaderConfig { - fn default() -> Self { - Self { path: default_download_path(), actions: vec![] } - } -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct InstallerConfig { - pub path: String, - pub actions: Vec, - pub uplink_port: u16, -} - -#[serde_as] -#[derive(Debug, Clone, Deserialize, Serialize, Default)] -pub struct StreamMetricsConfig { - pub enabled: bool, - pub bridge_topic: String, - pub serializer_topic: String, - pub blacklist: Vec, - #[serde_as(as = "DurationSeconds")] - pub timeout: Duration, -} - -#[serde_as] -#[derive(Debug, Clone, Deserialize, Serialize, Default)] -pub struct SerializerMetricsConfig { - pub enabled: bool, - pub topic: String, - #[serde_as(as = "DurationSeconds")] - pub timeout: Duration, -} - -#[derive(Debug, Clone, Deserialize, Serialize, Default)] -pub struct MqttMetricsConfig { - pub enabled: bool, - pub topic: String, -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct AppConfig { - pub port: u16, - #[serde(default)] - pub actions: Vec, -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct ConsoleConfig { - pub enabled: bool, - pub port: u16, -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct MqttConfig { - pub max_packet_size: usize, - pub max_inflight: u16, - pub keep_alive: u64, - pub network_timeout: u64, -} - -#[serde_as] -#[derive(Debug, Clone, Deserialize, Default)] -pub struct ActionRoute { - pub name: String, - #[serde(default = "default_timeout")] - #[serde_as(as = "DurationSeconds")] - pub timeout: Duration, -} - -impl From<&ActionRoute> for ActionRoute { - fn from(value: &ActionRoute) -> Self { - value.clone() - } -} - -#[derive(Clone, Debug, Deserialize)] -pub struct DeviceShadowConfig { - pub interval: u64, -} - -impl Default for DeviceShadowConfig { - fn default() -> Self { - Self { interval: DEFAULT_TIMEOUT } - } -} - -#[derive(Debug, Clone, Deserialize, Default)] -pub struct Config { - pub project_id: String, - pub device_id: String, - pub broker: String, - pub port: u16, - #[serde(default)] - pub console: ConsoleConfig, - pub authentication: Option, - #[serde(default = "default_tcpapps")] - pub tcpapps: HashMap, - pub mqtt: MqttConfig, - #[serde(default)] - pub processes: Vec, - #[serde(default)] - pub script_runner: Vec, - #[serde(skip)] - pub actions_subscription: String, - pub streams: HashMap, - #[serde(default = "default_persistence_path")] - pub persistence_path: PathBuf, - pub action_status: StreamConfig, - pub stream_metrics: StreamMetricsConfig, - pub serializer_metrics: SerializerMetricsConfig, - pub mqtt_metrics: MqttMetricsConfig, - #[serde(default)] - pub downloader: DownloaderConfig, - pub system_stats: Stats, - pub simulator: Option, - #[serde(default)] - pub ota_installer: InstallerConfig, - #[serde(default)] - pub device_shadow: DeviceShadowConfig, - #[serde(default)] - pub action_redirections: HashMap, - #[serde(default)] - pub ignore_actions_if_no_clients: bool, - #[cfg(target_os = "linux")] - pub logging: Option, - #[cfg(target_os = "android")] - pub logging: Option, -} - /// Send control messages to the various components in uplink. Currently this is /// used only to trigger uplink shutdown. Shutdown signals are sent to all /// components simultaneously with a join. diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 7c246429..a66449e7 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -14,12 +14,10 @@ use storage::Storage; use thiserror::Error; use tokio::{select, time::interval}; -use crate::base::Compression; +use crate::config::{default_file_size, Compression, StreamConfig}; use crate::{Config, Package}; pub use metrics::{Metrics, SerializerMetrics, StreamMetrics}; -use super::{default_file_size, StreamConfig}; - const METRICS_INTERVAL: Duration = Duration::from_secs(10); #[derive(thiserror::Error, Debug)] @@ -877,7 +875,7 @@ mod test { use super::*; use crate::base::bridge::stream::Stream; - use crate::base::MqttConfig; + use crate::config::MqttConfig; use crate::Payload; #[derive(Clone)] diff --git a/uplink/src/collector/device_shadow.rs b/uplink/src/collector/device_shadow.rs index 96d95818..8cc734ee 100644 --- a/uplink/src/collector/device_shadow.rs +++ b/uplink/src/collector/device_shadow.rs @@ -3,7 +3,7 @@ use std::time::Duration; use log::{error, trace}; use serde::Serialize; -use crate::base::DeviceShadowConfig; +use crate::config::DeviceShadowConfig; use crate::base::{bridge::BridgeTx, clock}; use crate::Payload; diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index f44ed100..6e738164 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -66,9 +66,7 @@ use std::{ }; use std::{io::Write, path::PathBuf}; -use crate::base::bridge::BridgeTx; -use crate::base::DownloaderConfig; -use crate::{Action, ActionResponse, Config}; +use crate::{base::bridge::BridgeTx, config::DownloaderConfig, Action, ActionResponse, Config}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -413,9 +411,9 @@ mod test { use std::{collections::HashMap, time::Duration}; use super::*; - use crate::base::{ - bridge::{DataTx, StatusTx}, - ActionRoute, DownloaderConfig, MqttConfig, + use crate::{ + base::bridge::{DataTx, StatusTx}, + config::{ActionRoute, DownloaderConfig, MqttConfig}, }; const DOWNLOAD_DIR: &str = "/tmp/uplink_test"; diff --git a/uplink/src/collector/installer.rs b/uplink/src/collector/installer.rs index b0f2e6a8..50dd68b4 100644 --- a/uplink/src/collector/installer.rs +++ b/uplink/src/collector/installer.rs @@ -6,8 +6,7 @@ use tar::Archive; use tokio::process::Command; use super::downloader::DownloadFile; -use crate::base::{bridge::BridgeTx, InstallerConfig}; -use crate::{Action, ActionResponse}; +use crate::{base::bridge::BridgeTx, config::InstallerConfig, Action, ActionResponse}; #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/uplink/src/collector/simulator/mod.rs b/uplink/src/collector/simulator/mod.rs index 86464c42..f6fa9980 100644 --- a/uplink/src/collector/simulator/mod.rs +++ b/uplink/src/collector/simulator/mod.rs @@ -1,5 +1,5 @@ use crate::base::bridge::{BridgeTx, Payload}; -use crate::base::SimulatorConfig; +use crate::config::SimulatorConfig; use crate::{Action, ActionResponse}; use data::{Bms, DeviceData, DeviceShadow, Gps, Imu, Motor, PeripheralState}; use flume::{bounded, Receiver, Sender}; diff --git a/uplink/src/collector/tcpjson.rs b/uplink/src/collector/tcpjson.rs index 7d151725..46ebc362 100644 --- a/uplink/src/collector/tcpjson.rs +++ b/uplink/src/collector/tcpjson.rs @@ -11,7 +11,7 @@ use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; use std::io; use crate::base::bridge::BridgeTx; -use crate::base::AppConfig; +use crate::config::AppConfig; use crate::{Action, ActionResponse, Payload}; #[derive(Error, Debug)] diff --git a/uplink/src/config.rs b/uplink/src/config.rs new file mode 100644 index 00000000..beddf361 --- /dev/null +++ b/uplink/src/config.rs @@ -0,0 +1,277 @@ +use std::cmp::Ordering; +use std::env::current_dir; +use std::path::PathBuf; +use std::time::Duration; +use std::{collections::HashMap, fmt::Debug}; + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; + +pub use crate::base::bridge::stream::MAX_BATCH_SIZE; +#[cfg(target_os = "linux")] +use crate::collector::journalctl::JournalCtlConfig; +#[cfg(target_os = "android")] +use crate::collector::logcat::LogcatConfig; + +pub const DEFAULT_TIMEOUT: u64 = 60; + +#[inline] +fn default_timeout() -> Duration { + Duration::from_secs(DEFAULT_TIMEOUT) +} + +#[inline] +fn max_batch_size() -> usize { + MAX_BATCH_SIZE +} + +pub fn default_file_size() -> usize { + 10485760 // 10MB +} + +fn default_persistence_path() -> PathBuf { + let mut path = current_dir().expect("Couldn't figure out current directory"); + path.push(".persistence"); + path +} + +fn default_download_path() -> PathBuf { + let mut path = current_dir().expect("Couldn't figure out current directory"); + path.push(".downloads"); + path +} + +// Automatically assigns port 5050 for default main app, if left unconfigured +fn default_tcpapps() -> HashMap { + let mut apps = HashMap::new(); + apps.insert("main".to_string(), AppConfig { port: 5050, actions: vec![] }); + + apps +} + +#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)] +pub enum Compression { + #[default] + Disabled, + Lz4, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +pub struct StreamConfig { + pub topic: String, + #[serde(default = "max_batch_size")] + pub batch_size: usize, + #[serde(default = "default_timeout")] + #[serde_as(as = "DurationSeconds")] + /// Duration(in seconds) that bridge collector waits from + /// receiving first element, before the stream gets flushed. + pub flush_period: Duration, + #[serde(default)] + pub compression: Compression, + #[serde(default)] + pub persistence: Persistence, + #[serde(default)] + pub priority: u8, +} + +impl Default for StreamConfig { + fn default() -> Self { + Self { + topic: "".to_string(), + batch_size: MAX_BATCH_SIZE, + flush_period: default_timeout(), + compression: Compression::Disabled, + persistence: Persistence::default(), + priority: 0, + } + } +} + +impl Ord for StreamConfig { + fn cmp(&self, other: &Self) -> Ordering { + match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) { + (Ordering::Equal, o) => o, + (o, _) => o.reverse(), + } + } +} + +impl PartialOrd for StreamConfig { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)] +pub struct Persistence { + #[serde(default = "default_file_size")] + pub max_file_size: usize, + #[serde(default)] + pub max_file_count: usize, +} + +impl Default for Persistence { + fn default() -> Self { + Persistence { max_file_size: default_file_size(), max_file_count: 0 } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Authentication { + pub ca_certificate: String, + pub device_certificate: String, + pub device_private_key: String, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct Stats { + pub enabled: bool, + pub process_names: Vec, + pub update_period: u64, + pub stream_size: Option, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct SimulatorConfig { + /// path to directory containing files with gps paths to be used in simulation + pub gps_paths: String, + /// actions that are to be routed to simulator + pub actions: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DownloaderConfig { + #[serde(default = "default_download_path")] + pub path: PathBuf, + pub actions: Vec, +} + +impl Default for DownloaderConfig { + fn default() -> Self { + Self { path: default_download_path(), actions: vec![] } + } +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct InstallerConfig { + pub path: String, + pub actions: Vec, + pub uplink_port: u16, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct StreamMetricsConfig { + pub enabled: bool, + pub bridge_topic: String, + pub serializer_topic: String, + pub blacklist: Vec, + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct SerializerMetricsConfig { + pub enabled: bool, + pub topic: String, + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, +} + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct MqttMetricsConfig { + pub enabled: bool, + pub topic: String, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct AppConfig { + pub port: u16, + #[serde(default)] + pub actions: Vec, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ConsoleConfig { + pub enabled: bool, + pub port: u16, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct MqttConfig { + pub max_packet_size: usize, + pub max_inflight: u16, + pub keep_alive: u64, + pub network_timeout: u64, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ActionRoute { + pub name: String, + #[serde(default = "default_timeout")] + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, +} + +impl From<&ActionRoute> for ActionRoute { + fn from(value: &ActionRoute) -> Self { + value.clone() + } +} + +#[derive(Clone, Debug, Deserialize)] +pub struct DeviceShadowConfig { + pub interval: u64, +} + +impl Default for DeviceShadowConfig { + fn default() -> Self { + Self { interval: DEFAULT_TIMEOUT } + } +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct Config { + pub project_id: String, + pub device_id: String, + pub broker: String, + pub port: u16, + #[serde(default)] + pub console: ConsoleConfig, + pub authentication: Option, + #[serde(default = "default_tcpapps")] + pub tcpapps: HashMap, + pub mqtt: MqttConfig, + #[serde(default)] + pub processes: Vec, + #[serde(default)] + pub script_runner: Vec, + #[serde(skip)] + pub actions_subscription: String, + pub streams: HashMap, + #[serde(default = "default_persistence_path")] + pub persistence_path: PathBuf, + pub action_status: StreamConfig, + pub stream_metrics: StreamMetricsConfig, + pub serializer_metrics: SerializerMetricsConfig, + pub mqtt_metrics: MqttMetricsConfig, + #[serde(default)] + pub downloader: DownloaderConfig, + pub system_stats: Stats, + pub simulator: Option, + #[serde(default)] + pub ota_installer: InstallerConfig, + #[serde(default)] + pub device_shadow: DeviceShadowConfig, + #[serde(default)] + pub action_redirections: HashMap, + #[serde(default)] + pub ignore_actions_if_no_clients: bool, + #[cfg(target_os = "linux")] + pub logging: Option, + #[cfg(target_os = "android")] + pub logging: Option, +} diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index a570dd84..e69185e1 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -46,9 +46,19 @@ use std::thread; use std::time::Duration; use anyhow::Error; +use flume::{bounded, Receiver, RecvError, Sender}; +use log::error; + +pub mod base; +pub mod collector; +pub mod config; -use base::bridge::stream::Stream; +use self::config::{ActionRoute, Config}; +pub use base::actions::{Action, ActionResponse}; +use base::bridge::{stream::Stream, Bridge, Package, Payload, Point, StreamMetrics}; use base::monitor::Monitor; +use base::mqtt::Mqtt; +use base::serializer::{Serializer, SerializerMetrics}; use base::CtrlTx; use collector::device_shadow::DeviceShadow; use collector::downloader::FileDownloader; @@ -61,217 +71,6 @@ use collector::process::ProcessHandler; use collector::script_runner::ScriptRunner; use collector::systemstats::StatCollector; use collector::tunshell::TunshellClient; -use flume::{bounded, Receiver, RecvError, Sender}; -use log::error; - -pub mod base; -pub mod collector; - -pub mod config { - use crate::base::{bridge::stream::MAX_BATCH_SIZE, StreamConfig}; - pub use crate::base::{Config, Persistence, Stats}; - use config::{Environment, File, FileFormat}; - use std::fs; - use structopt::StructOpt; - - #[derive(StructOpt, Debug)] - #[structopt(name = "uplink", about = "collect, batch, compress, publish")] - pub struct CommandLine { - /// Binary version - #[structopt(skip = env ! ("VERGEN_BUILD_SEMVER"))] - pub version: String, - /// Build profile - #[structopt(skip = env ! ("VERGEN_CARGO_PROFILE"))] - pub profile: String, - /// Commit SHA - #[structopt(skip = env ! ("VERGEN_GIT_SHA"))] - pub commit_sha: String, - /// Commit SHA - #[structopt(skip = env ! ("VERGEN_GIT_COMMIT_TIMESTAMP"))] - pub commit_date: String, - /// config file - #[structopt(short = "c", help = "Config file")] - pub config: Option, - /// config file - #[structopt(short = "a", help = "Authentication file")] - pub auth: String, - /// log level (v: info, vv: debug, vvv: trace) - #[structopt(short = "v", long = "verbose", parse(from_occurrences))] - pub verbose: u8, - /// list of modules to log - #[structopt(short = "m", long = "modules")] - pub modules: Vec, - } - - const DEFAULT_CONFIG: &str = r#" - [mqtt] - max_packet_size = 256000 - max_inflight = 100 - keep_alive = 30 - network_timeout = 30 - - [stream_metrics] - enabled = false - bridge_topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_stream_metrics/jsonarray" - serializer_topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_serializer_stream_metrics/jsonarray" - blacklist = [] - timeout = 10 - - [serializer_metrics] - enabled = false - topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_serializer_metrics/jsonarray" - timeout = 10 - - [mqtt_metrics] - enabled = true - topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_mqtt_metrics/jsonarray" - - [action_status] - topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" - batch_size = 1 - flush_period = 2 - priority = 255 # highest priority for quick delivery of action status info to platform - - [streams.device_shadow] - topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" - flush_period = 5 - - [streams.logs] - topic = "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray" - batch_size = 32 - - [system_stats] - enabled = true - process_names = ["uplink"] - update_period = 30 -"#; - - /// Reads config file to generate config struct and replaces places holders - /// like bike id and data version - pub fn initialize(auth_config: &str, uplink_config: &str) -> Result { - let config = config::Config::builder() - .add_source(File::from_str(DEFAULT_CONFIG, FileFormat::Toml)) - .add_source(File::from_str(uplink_config, FileFormat::Toml)) - .add_source(File::from_str(auth_config, FileFormat::Json)) - .add_source(Environment::default()) - .build()?; - - let mut config: Config = config.try_deserialize()?; - - // Create directory at persistence_path if it doesn't already exist - fs::create_dir_all(&config.persistence_path).map_err(|_| { - anyhow::Error::msg(format!( - "Permission denied for creating persistence directory at \"{}\"", - config.persistence_path.display() - )) - })?; - - // replace placeholders with device/tenant ID - let tenant_id = config.project_id.trim(); - let device_id = config.device_id.trim(); - for config in config.streams.values_mut() { - replace_topic_placeholders(&mut config.topic, tenant_id, device_id); - } - - replace_topic_placeholders(&mut config.action_status.topic, tenant_id, device_id); - replace_topic_placeholders(&mut config.stream_metrics.bridge_topic, tenant_id, device_id); - replace_topic_placeholders( - &mut config.stream_metrics.serializer_topic, - tenant_id, - device_id, - ); - replace_topic_placeholders(&mut config.serializer_metrics.topic, tenant_id, device_id); - replace_topic_placeholders(&mut config.mqtt_metrics.topic, tenant_id, device_id); - - // for config in [&mut config.serializer_metrics, &mut config.stream_metrics] { - // if let Some(topic) = &config.topic { - // let topic = topic.replace("{tenant_id}", tenant_id); - // let topic = topic.replace("{device_id}", device_id); - // config.topic = Some(topic); - // } - // } - - if config.system_stats.enabled { - for stream_name in [ - "uplink_disk_stats", - "uplink_network_stats", - "uplink_processor_stats", - "uplink_process_stats", - "uplink_component_stats", - "uplink_system_stats", - ] { - config.stream_metrics.blacklist.push(stream_name.to_owned()); - let stream_config = StreamConfig { - topic: format!( - "/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray" - ), - batch_size: config.system_stats.stream_size.unwrap_or(MAX_BATCH_SIZE), - ..Default::default() - }; - config.streams.insert(stream_name.to_owned(), stream_config); - } - } - - #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(batch_size) = config.logging.as_ref().and_then(|c| c.stream_size) { - let stream_config = - config.streams.entry("logs".to_string()).or_insert_with(|| StreamConfig { - topic: format!( - "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray" - ), - batch_size: 32, - ..Default::default() - }); - stream_config.batch_size = batch_size; - } - - let action_topic_template = "/tenants/{tenant_id}/devices/{device_id}/actions"; - let mut device_action_topic = action_topic_template.to_string(); - replace_topic_placeholders(&mut device_action_topic, tenant_id, device_id); - config.actions_subscription = device_action_topic; - - Ok(config) - } - - // Replace placeholders in topic strings with configured values for tenant_id and device_id - fn replace_topic_placeholders(topic: &mut String, tenant_id: &str, device_id: &str) { - *topic = topic.replace("{tenant_id}", tenant_id); - *topic = topic.replace("{device_id}", device_id); - } - - #[derive(Debug, thiserror::Error)] - pub enum ReadFileError { - #[error("Auth file not found at {0}")] - Auth(String), - #[error("Config file not found at {0}")] - Config(String), - } - - fn read_file_contents(path: &str) -> Option { - fs::read_to_string(path).ok() - } - - pub fn get_configs( - commandline: &CommandLine, - ) -> Result<(String, Option), ReadFileError> { - let auth = read_file_contents(&commandline.auth) - .ok_or_else(|| ReadFileError::Auth(commandline.auth.to_string()))?; - let config = match &commandline.config { - Some(path) => Some( - read_file_contents(path).ok_or_else(|| ReadFileError::Config(path.to_string()))?, - ), - None => None, - }; - - Ok((auth, config)) - } -} - -pub use base::actions::{Action, ActionResponse}; -use base::bridge::{Bridge, Package, Payload, Point, StreamMetrics}; -use base::mqtt::Mqtt; -use base::serializer::{Serializer, SerializerMetrics}; -pub use base::{ActionRoute, Config}; pub use collector::{simulator, tcpjson::TcpJson}; pub use storage::Storage; diff --git a/uplink/src/main.rs b/uplink/src/main.rs index e06aff13..572fea50 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -1,9 +1,11 @@ mod console; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use anyhow::Error; +use config::{Environment, File, FileFormat}; use log::info; use structopt::StructOpt; use tokio::time::sleep; @@ -15,97 +17,257 @@ use tracing_subscriber::{EnvFilter, Registry}; pub type ReloadHandle = Handle>, Registry>>; -use uplink::base::AppConfig; -use uplink::config::{get_configs, initialize, CommandLine}; -use uplink::{simulator, spawn_named_thread, Config, TcpJson, Uplink}; +use uplink::config::{AppConfig, Config, StreamConfig, MAX_BATCH_SIZE}; +use uplink::{simulator, spawn_named_thread, TcpJson, Uplink}; -fn initialize_logging(commandline: &CommandLine) -> ReloadHandle { - let level = match commandline.verbose { - 0 => "warn", - 1 => "info", - 2 => "debug", - _ => "trace", - }; +const DEFAULT_CONFIG: &str = r#" + [mqtt] + max_packet_size = 256000 + max_inflight = 100 + keep_alive = 30 + network_timeout = 30 - let levels = - match commandline.modules.clone().into_iter().reduce(|e, acc| format!("{e}={level},{acc}")) - { - Some(f) => format!("{f}={level}"), - _ => format!("uplink={level},storage={level}"), - }; + [stream_metrics] + enabled = false + bridge_topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_stream_metrics/jsonarray" + serializer_topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_serializer_stream_metrics/jsonarray" + blacklist = [] + timeout = 10 + + [serializer_metrics] + enabled = false + topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_serializer_metrics/jsonarray" + timeout = 10 - let builder = tracing_subscriber::fmt() - .pretty() - .with_line_number(false) - .with_file(false) - .with_thread_ids(false) - .with_thread_names(false) - .with_env_filter(levels) - .with_filter_reloading(); + [mqtt_metrics] + enabled = true + topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_mqtt_metrics/jsonarray" - let reload_handle = builder.reload_handle(); + [action_status] + topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" + batch_size = 1 + flush_period = 2 + priority = 255 # highest priority for quick delivery of action status info to platform - builder.try_init().expect("initialized subscriber succesfully"); + [streams.device_shadow] + topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" + flush_period = 5 - reload_handle + [streams.logs] + topic = "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray" + batch_size = 32 + + [system_stats] + enabled = true + process_names = ["uplink"] + update_period = 30 +"#; + +#[derive(StructOpt, Debug)] +#[structopt(name = "uplink", about = "collect, batch, compress, publish")] +pub struct CommandLine { + /// Binary version + #[structopt(skip = env ! ("VERGEN_BUILD_SEMVER"))] + pub version: String, + /// Build profile + #[structopt(skip = env ! ("VERGEN_CARGO_PROFILE"))] + pub profile: String, + /// Commit SHA + #[structopt(skip = env ! ("VERGEN_GIT_SHA"))] + pub commit_sha: String, + /// Commit SHA + #[structopt(skip = env ! ("VERGEN_GIT_COMMIT_TIMESTAMP"))] + pub commit_date: String, + /// config file + #[structopt(short = "c", help = "Config file")] + pub config: Option, + /// config file + #[structopt(short = "a", help = "Authentication file")] + pub auth: PathBuf, + /// log level (v: info, vv: debug, vvv: trace) + #[structopt(short = "v", long = "verbose", parse(from_occurrences))] + pub verbose: u8, + /// list of modules to log + #[structopt(short = "m", long = "modules")] + pub modules: Vec, } -fn banner(commandline: &CommandLine, config: &Arc) { - const B: &str = r#" - ░█░▒█░▄▀▀▄░█░░░▀░░█▀▀▄░█░▄ - ░█░▒█░█▄▄█░█░░░█▀░█░▒█░█▀▄ - ░░▀▀▀░█░░░░▀▀░▀▀▀░▀░░▀░▀░▀ - "#; - - println!("{B}"); - println!(" version: {}", commandline.version); - println!(" profile: {}", commandline.profile); - println!(" commit_sha: {}", commandline.commit_sha); - println!(" commit_date: {}", commandline.commit_date); - println!(" project_id: {}", config.project_id); - println!(" device_id: {}", config.device_id); - println!(" remote: {}:{}", config.broker, config.port); - println!(" persistence_path: {}", config.persistence_path.display()); - if !config.action_redirections.is_empty() { - println!(" action redirections:"); - for (action, redirection) in config.action_redirections.iter() { - println!(" {action} -> {redirection}"); +impl CommandLine { + /// Reads config file to generate config struct and replaces places holders + /// like bike id and data version + fn get_configs(&self) -> Result { + let read_file_contents = |path| std::fs::read_to_string(path).ok(); + let auth = read_file_contents(&self.auth).ok_or_else(|| { + Error::msg(format!("Auth file not found at \"{}\"", self.auth.display())) + })?; + let config = match &self.config { + Some(path) => Some(read_file_contents(path).ok_or_else(|| { + Error::msg(format!("Config file not found at \"{}\"", path.display())) + })?), + None => None, + }; + + let config = config::Config::builder() + .add_source(File::from_str(DEFAULT_CONFIG, FileFormat::Toml)) + .add_source(File::from_str(&config.unwrap_or_default(), FileFormat::Toml)) + .add_source(File::from_str(&auth, FileFormat::Json)) + .add_source(Environment::default()) + .build()?; + + let mut config: Config = config.try_deserialize()?; + + // Create directory at persistence_path if it doesn't already exist + std::fs::create_dir_all(&config.persistence_path).map_err(|_| { + Error::msg(format!( + "Permission denied for creating persistence directory at \"{}\"", + config.persistence_path.display() + )) + })?; + + // replace placeholders with device/tenant ID + let tenant_id = config.project_id.trim(); + let device_id = config.device_id.trim(); + + // Replace placeholders in topic strings with configured values for tenant_id and device_id + // e.g. for tenant_id: "demo"; device_id: "123" + // "/tenants/{tenant_id}/devices/{device_id}/events/stream/jsonarry" ~> "/tenants/demo/devices/123/events/stream/jsonarry" + let replace_topic_placeholders = |topic: &mut String| { + *topic = topic.replace("{tenant_id}", tenant_id).replace("{device_id}", device_id); + }; + + for config in config.streams.values_mut() { + replace_topic_placeholders(&mut config.topic); } - } - if !config.tcpapps.is_empty() { - println!(" tcp applications:"); - for (app, AppConfig { port, actions }) in config.tcpapps.iter() { - println!(" name: {app:?}"); - println!(" port: {port}"); - println!(" actions: {actions:?}"); - println!(" @"); + + replace_topic_placeholders(&mut config.action_status.topic); + replace_topic_placeholders(&mut config.stream_metrics.bridge_topic); + replace_topic_placeholders(&mut config.stream_metrics.serializer_topic); + replace_topic_placeholders(&mut config.serializer_metrics.topic); + replace_topic_placeholders(&mut config.mqtt_metrics.topic); + + if config.system_stats.enabled { + for stream_name in [ + "uplink_disk_stats", + "uplink_network_stats", + "uplink_processor_stats", + "uplink_process_stats", + "uplink_component_stats", + "uplink_system_stats", + ] { + config.stream_metrics.blacklist.push(stream_name.to_owned()); + let stream_config = StreamConfig { + topic: format!( + "/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray" + ), + batch_size: config.system_stats.stream_size.unwrap_or(MAX_BATCH_SIZE), + ..Default::default() + }; + config.streams.insert(stream_name.to_owned(), stream_config); + } } + + #[cfg(any(target_os = "linux", target_os = "android"))] + if let Some(batch_size) = config.logging.as_ref().and_then(|c| c.stream_size) { + let stream_config = + config.streams.entry("logs".to_string()).or_insert_with(|| StreamConfig { + topic: format!( + "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray" + ), + batch_size: 32, + ..Default::default() + }); + stream_config.batch_size = batch_size; + } + + config.actions_subscription = format!("/tenants/{tenant_id}/devices/{device_id}/actions"); + + Ok(config) } - println!(" secure_transport: {}", config.authentication.is_some()); - println!(" max_packet_size: {}", config.mqtt.max_packet_size); - println!(" max_inflight_messages: {}", config.mqtt.max_inflight); - println!(" keep_alive_timeout: {}", config.mqtt.keep_alive); - - if !config.downloader.actions.is_empty() { - println!( - " downloader:\n\tpath: \"{}\"\n\tactions: {:?}", - config.downloader.path.display(), - config.downloader.actions - ); - } - if !config.ota_installer.actions.is_empty() { - println!( - " installer:\n\tpath: {}\n\tactions: {:?}", - config.ota_installer.path, config.ota_installer.actions - ); - } - if config.system_stats.enabled { - println!(" processes: {:?}", config.system_stats.process_names); + + fn initialize_logging(&self) -> ReloadHandle { + let level = match self.verbose { + 0 => "warn", + 1 => "info", + 2 => "debug", + _ => "trace", + }; + + let levels = + match self.modules.clone().into_iter().reduce(|e, acc| format!("{e}={level},{acc}")) { + Some(f) => format!("{f}={level}"), + _ => format!("uplink={level},storage={level}"), + }; + + let builder = tracing_subscriber::fmt() + .pretty() + .with_line_number(false) + .with_file(false) + .with_thread_ids(false) + .with_thread_names(false) + .with_env_filter(levels) + .with_filter_reloading(); + + let reload_handle = builder.reload_handle(); + + builder.try_init().expect("initialized subscriber succesfully"); + + reload_handle } - if config.console.enabled { - println!(" console: http://localhost:{}", config.console.port); + + fn banner(&self, config: &Config) { + const B: &str = r#" + ░█░▒█░▄▀▀▄░█░░░▀░░█▀▀▄░█░▄ + ░█░▒█░█▄▄█░█░░░█▀░█░▒█░█▀▄ + ░░▀▀▀░█░░░░▀▀░▀▀▀░▀░░▀░▀░▀ + "#; + + println!("{B}"); + println!(" version: {}", self.version); + println!(" profile: {}", self.profile); + println!(" commit_sha: {}", self.commit_sha); + println!(" commit_date: {}", self.commit_date); + println!(" project_id: {}", config.project_id); + println!(" device_id: {}", config.device_id); + println!(" remote: {}:{}", config.broker, config.port); + println!(" persistence_path: {}", config.persistence_path.display()); + if !config.action_redirections.is_empty() { + println!(" action redirections:"); + for (action, redirection) in config.action_redirections.iter() { + println!("\t{action} -> {redirection}"); + } + } + if !config.tcpapps.is_empty() { + println!(" tcp applications:"); + for (app, AppConfig { port, actions }) in config.tcpapps.iter() { + println!("\tname: {app:?}\n\tport: {port}\n\tactions: {actions:?}\n\t@"); + } + } + println!(" secure_transport: {}", config.authentication.is_some()); + println!(" max_packet_size: {}", config.mqtt.max_packet_size); + println!(" max_inflight_messages: {}", config.mqtt.max_inflight); + println!(" keep_alive_timeout: {}", config.mqtt.keep_alive); + + if !config.downloader.actions.is_empty() { + println!( + " downloader:\n\tpath: \"{}\"\n\tactions: {:?}", + config.downloader.path.display(), + config.downloader.actions + ); + } + if !config.ota_installer.actions.is_empty() { + println!( + " installer:\n\tpath: {}\n\tactions: {:?}", + config.ota_installer.path, config.ota_installer.actions + ); + } + if config.system_stats.enabled { + println!(" processes: {:?}", config.system_stats.process_names); + } + if config.console.enabled { + println!(" console: http://localhost:{}", config.console.port); + } + println!("\n"); } - println!("\n"); } fn main() -> Result<(), Error> { @@ -115,13 +277,11 @@ fn main() -> Result<(), Error> { } let commandline: CommandLine = StructOpt::from_args(); - let reload_handle = initialize_logging(&commandline); - - let (auth, config) = get_configs(&commandline)?; - let config = Arc::new(initialize(&auth, &config.unwrap_or_default())?); - - banner(&commandline, &config); + let reload_handle = commandline.initialize_logging(); + let config = commandline.get_configs()?; + commandline.banner(&config); + let config = Arc::new(config); let mut uplink = Uplink::new(config.clone())?; let mut bridge = uplink.configure_bridge(); uplink.spawn_builtins(&mut bridge)?;