diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index 81f6587d..6cfb3c9a 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -1972,6 +1972,7 @@ dependencies = [ "protobuf-codegen", "rand", "rangemap", + "regex", "ringbuffer", "rumqttc", "rustc-hash 2.1.0", diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index 7881327c..9c4773c3 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -28,6 +28,7 @@ axum-extra = { version = "0.10.0", features = ["query"] } chrono = { version = "0.4.38", features = ["serde"] } serde_json = "1.0.128" diesel_migrations = { version = "2.2.0", features = ["postgres"] } +regex = "1.11.1" rangemap = "1.5.1" axum-macros = "0.5.0" diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper", "tokio"] } @@ -35,6 +36,7 @@ rustc-hash = "2.1.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" + [features] top = ["dep:console-subscriber"] diff --git a/scylla-server/build.rs b/scylla-server/build.rs index 3c0bfc5c..128d1e6e 100644 --- a/scylla-server/build.rs +++ b/scylla-server/build.rs @@ -11,6 +11,6 @@ fn main() { .input("src/proto/command_data.proto") .input("src/proto/playback_data.proto") // Specify output directory relative to Cargo output directory. - .out_dir("src") + .out_dir("src/proto") .run_from_script(); } diff --git a/scylla-server/src/controllers/car_command_controller.rs b/scylla-server/src/controllers/car_command_controller.rs index 7c6cece7..dfb83039 100644 --- a/scylla-server/src/controllers/car_command_controller.rs +++ b/scylla-server/src/controllers/car_command_controller.rs @@ -7,7 +7,7 @@ use rumqttc::v5::AsyncClient; use serde::Deserialize; use tracing::{info, warn}; -use crate::{command_data::CommandData, error::ScyllaError}; +use crate::{error::ScyllaError, proto::command_data}; /// the prefix for the calypso topic, so topic of cmd is this plus the key appended on pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/"; @@ -34,7 +34,7 @@ pub async fn send_config_command( ); // the protobuf calypso converts into CAN - let mut payload = CommandData::new(); + let mut payload = command_data::CommandData::new(); // empty "data" in the protobuf tells calypso to use the default value if let Some(data) = data_query.data { payload.data = data; diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs index 36846e29..227dd2a8 100644 --- a/scylla-server/src/controllers/file_insertion_controller.rs +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -9,7 +9,9 @@ use rangemap::RangeInclusiveMap; use tokio::sync::mpsc; use tracing::{debug, info, trace, warn}; -use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle}; +use crate::{ + error::ScyllaError, proto::playback_data, services::run_service, ClientData, PoolHandle, +}; /// Inserts a file using http multipart /// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index dd98a245..eb518ed8 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -1,7 +1,7 @@ use rustc_hash::FxHashSet; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{broadcast, mpsc}; -use tokio::{sync::mpsc::Sender, time::Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument, trace, warn, Level}; @@ -15,7 +15,7 @@ pub struct DbHandler { /// The list of data types seen by this instance, used for when to upsert datatype_list: FxHashSet, /// The broadcast channel which provides serial datapoints for processing - receiver: Receiver, + receiver: broadcast::Receiver, /// The database pool handle pool: PoolHandle, /// the queue of data @@ -57,7 +57,7 @@ impl DbHandler { /// Make a new db handler /// * `recv` - the broadcast reciver of which clientdata will be sent pub fn new( - receiver: Receiver, + receiver: broadcast::Receiver, pool: PoolHandle, upload_interval: u64, ) -> DbHandler { @@ -74,7 +74,7 @@ impl DbHandler { /// It uses the queue from data queue to insert to the database specified /// On cancellation, will await one final queue message to cleanup anything remaining in the channel pub async fn batching_loop( - mut batch_queue: Receiver>, + mut batch_queue: mpsc::Receiver>, pool: PoolHandle, cancel_token: CancellationToken, ) { @@ -133,7 +133,7 @@ impl DbHandler { /// A batching loop that consumes messages but does not upload anything pub async fn fake_batching_loop( - mut batch_queue: Receiver>, + mut batch_queue: mpsc::Receiver>, cancel_token: CancellationToken, ) { loop { @@ -168,7 +168,7 @@ impl DbHandler { /// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation pub async fn handling_loop( mut self, - data_channel: Sender>, + data_channel: mpsc::Sender>, cancel_token: CancellationToken, ) { let mut batch_interval = tokio::time::interval(Duration::from_millis(self.upload_interval)); @@ -181,7 +181,7 @@ impl DbHandler { data_channel.send(self.data_queue).await.expect("Could not comm data to db thread, shutdown"); break; }, - Some(msg) = self.receiver.recv() => { + Ok(msg) = self.receiver.recv() => { self.handle_msg(msg, &data_channel).await; } _ = batch_interval.tick() => { @@ -201,12 +201,8 @@ impl DbHandler { } #[instrument(skip(self), level = Level::TRACE)] - async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender>) { - trace!( - "Mqtt dispatcher: {} of {}", - self.receiver.len(), - self.receiver.max_capacity() - ); + async fn handle_msg(&mut self, msg: ClientData, data_channel: &mpsc::Sender>) { + trace!("Mqtt dispatcher: {}", self.receiver.len(),); if !self.datatype_list.contains(&msg.name) { let Ok(mut database) = self.pool.get().await else { diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index 47cff620..431c9811 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -7,14 +7,15 @@ pub mod services; pub mod db_handler; pub mod mqtt_processor; +pub mod metadata_structs; +pub mod socket_handler; + #[allow(non_snake_case)] pub mod models; #[allow(non_snake_case)] pub mod schema; -pub mod command_data; -pub mod playback_data; -pub mod serverdata; +pub mod proto; pub mod transformers; diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index bac8d700..e72cee23 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -25,6 +25,7 @@ use scylla_server::{ data_type_controller, file_insertion_controller, run_controller, }, services::run_service::{self}, + socket_handler::{socket_handler, socket_handler_with_metadata}, RateLimitMode, }; use scylla_server::{ @@ -33,7 +34,10 @@ use scylla_server::{ ClientData, RUN_ID, }; use socketioxide::{extract::SocketRef, SocketIo}; -use tokio::{signal, sync::mpsc}; +use tokio::{ + signal, + sync::{broadcast, mpsc}, +}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tower::ServiceBuilder; use tower_http::{ @@ -106,6 +110,10 @@ struct ScyllaArgs { default_value = "0" )] socketio_discard_percent: u8, + + /// Whether to disable sending of metadata over the socket to the client + #[arg(long, env = "SCYLLA_SOCKET_DISABLE_METADATA")] + no_metadata: bool, } const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); @@ -180,9 +188,9 @@ async fn main() -> Result<(), Box> { // channel to pass the mqtt data // TODO tune buffer size - let (mqtt_send, mqtt_receive) = mpsc::channel::(10000); + let (mqtt_send, mqtt_receive) = broadcast::channel::(10000); - // channel to pass the processed data to the db thread + // channel to pass the processed data to the batch uploading thread // TODO tune buffer size let (db_send, db_receive) = mpsc::channel::>(1000); @@ -190,10 +198,31 @@ async fn main() -> Result<(), Box> { // create a task tracker and cancellation token let task_tracker = TaskTracker::new(); let token = CancellationToken::new(); + + if cli.no_metadata { + task_tracker.spawn(socket_handler( + token.clone(), + mqtt_receive, + cli.socketio_discard_percent, + io, + )); + } else { + task_tracker.spawn(socket_handler_with_metadata( + token.clone(), + mqtt_receive, + cli.socketio_discard_percent, + io, + )); + } + // spawn the database handler task_tracker.spawn( - db_handler::DbHandler::new(mqtt_receive, pool.clone(), cli.batch_upsert_time * 1000) - .handling_loop(db_send.clone(), token.clone()), + db_handler::DbHandler::new( + mqtt_send.subscribe(), + pool.clone(), + cli.batch_upsert_time * 1000, + ) + .handling_loop(db_send.clone(), token.clone()), ); // spawn the database inserter, if we have it enabled if !cli.disable_data_upload { @@ -222,14 +251,12 @@ async fn main() -> Result<(), Box> { info!("Running processor in MQTT (production) mode"); let (recv, opts) = MqttProcessor::new( mqtt_send, - io, token.clone(), MqttProcessorOptions { mqtt_path: cli.siren_host_url, initial_run: curr_run.runId, static_rate_limit_time: cli.static_rate_limit_value, rate_limit_mode: cli.rate_limit_mode, - upload_ratio: cli.socketio_discard_percent, }, ); let (client, eventloop) = AsyncClient::new(opts, 600); diff --git a/scylla-server/src/metadata_structs.rs b/scylla-server/src/metadata_structs.rs new file mode 100644 index 00000000..05c69e12 --- /dev/null +++ b/scylla-server/src/metadata_structs.rs @@ -0,0 +1,65 @@ +use ::serde::Serialize; +use chrono::{serde::ts_milliseconds, DateTime, TimeDelta, Utc}; + +pub const DATA_SOCKET_KEY: &str = "data"; + +#[derive(Serialize)] +pub struct TimerData { + /// the topic being timed + pub topic: &'static str, + /// the last time the value changed + #[serde(with = "ts_milliseconds")] + pub last_change: DateTime, + /// the value at the above time + pub last_value: f32, +} +pub const TIMER_SOCKET_KEY: &str = "timers"; +pub const TIMERS_TOPICS: &[&str] = &[ + "BMS/Status/Balancing", + "BMS/Status/State", + "BMS/Charging/Control", +]; + +#[derive(Serialize, PartialEq, Clone, Debug)] +pub enum Node { + Bms, + Dti, + Mpu, + Charger, +} + +#[derive(Serialize, Clone)] +pub struct FaultData { + /// the node the fault came from + pub node: Node, + /// the word describing the fault + pub name: String, + /// when the fault occured + #[serde(with = "ts_milliseconds")] + pub occured_at: DateTime, + /// when the fault was last seen + #[serde(with = "ts_milliseconds")] + pub last_seen: DateTime, + /// whether another fault of the same node and name as occured after this fault + pub expired: bool, +} +pub const FAULT_SOCKET_KEY: &str = "faults"; +pub const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8); + +pub const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; +pub const fn map_dti_flt(index: usize) -> Option<&'static str> { + match index { + 0 => None, + 1 => Some("Overvoltage"), + 2 => None, + 3 => Some("DRV"), + 4 => Some("ABS_Overcurrent"), + 5 => Some("CTLR_Overtemp"), + 6 => Some("Motor_Overtemp"), + 7 => Some("Sensor_wire"), + 8 => Some("Sensor_general"), + 9 => Some("CAN_command"), + 0x0A => Some("Analog_input"), + _ => None, + } +} diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index d68fdf3e..9aea662d 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -11,13 +11,12 @@ use rumqttc::v5::{ AsyncClient, Event, EventLoop, MqttOptions, }; use rustc_hash::FxHashMap; -use socketioxide::SocketIo; -use tokio::{sync::mpsc::Sender, time::Instant}; +use tokio::{sync::broadcast, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn, Level}; use crate::{ - controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, RateLimitMode, + controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, proto::serverdata, RateLimitMode, }; use super::ClientData; @@ -26,16 +25,12 @@ use super::ClientData; /// - mqtt state /// - reception via mqtt and subsequent parsing /// - labeling of data with runs -/// - sending data over socket -/// - sending data over the channel to a db handler +/// - sending data over the channel to a db handler and socket /// /// It also is the main form of rate limiting pub struct MqttProcessor { - channel: Sender, - io: SocketIo, + channel: broadcast::Sender, cancel_token: CancellationToken, - /// Upload ratio, below is not socket sent above is socket sent - upload_ratio: u8, /// static rate limiter rate_limiter: FxHashMap, /// time to rate limit @@ -54,20 +49,16 @@ pub struct MqttProcessorOptions { pub static_rate_limit_time: u64, /// the rate limit mode pub rate_limit_mode: RateLimitMode, - /// the upload ratio for the socketio - pub upload_ratio: u8, } impl MqttProcessor { /// Creates a new mqtt receiver and socketio and db sender /// * `channel` - The mpsc channel to send the database data to - /// * `io` - The socketio layer to send the data to /// * `cancel_token` - The token which indicates cancellation of the task /// * `opts` - The mqtt processor options to use /// Returns the instance and options to create a client, which is then used in the process_mqtt loop pub fn new( - channel: Sender, - io: SocketIo, + channel: broadcast::Sender, cancel_token: CancellationToken, opts: MqttProcessorOptions, ) -> (MqttProcessor, MqttOptions) { @@ -98,9 +89,7 @@ impl MqttProcessor { ( MqttProcessor { channel, - io, cancel_token, - upload_ratio: opts.upload_ratio, rate_limiter: FxHashMap::default(), rate_limit_time: Duration::from_millis(opts.static_rate_limit_time), rate_limit_mode: opts.rate_limit_mode, @@ -113,13 +102,9 @@ impl MqttProcessor { /// * `eventloop` - The eventloop returned by ::new to connect to. The loop isnt sync so this is the best that can be done /// * `client` - The async mqttt v5 client to use for subscriptions pub async fn process_mqtt(mut self, client: Arc, mut eventloop: EventLoop) { - let mut view_interval = tokio::time::interval(Duration::from_secs(3)); - - let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); + // let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); - let mut upload_counter: u8 = 0; - debug!("Subscribing to siren"); client .subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce) @@ -142,49 +127,30 @@ impl MqttProcessor { None => continue }; latency_ringbuffer.push(chrono::offset::Utc::now() - msg.timestamp); - self.send_socket_msg(&msg, &mut upload_counter); - self.send_db_msg(msg).await; + self.send_db_msg(msg.clone()).await; }, Err(msg) => trace!("Received mqtt error: {:?}", msg), _ => trace!("Received misc mqtt: {:?}", msg), }, - _ = view_interval.tick() => { - trace!("Updating viewership data!"); - let sockets = self.io.sockets(); - let sockets_cnt = match sockets { - Ok(s) => s.len() as f32, - Err(_) => -1f32, - }; - let client_data = ClientData { - name: "Viewers".to_string(), - node: "Internal".to_string(), - unit: "".to_string(), - run_id: crate::RUN_ID.load(Ordering::Relaxed), - timestamp: chrono::offset::Utc::now(), - values: vec![sockets_cnt] - }; - self.send_socket_msg(&client_data, &mut upload_counter); - } - _ = latency_interval.tick() => { - // set latency to 0 if no messages are in buffer - let avg_latency = if latency_ringbuffer.is_empty() { - 0 - } else { - latency_ringbuffer.iter().sum::().num_milliseconds() / latency_ringbuffer.len() as i64 - }; + // _ = latency_interval.tick() => { + // // set latency to 0 if no messages are in buffer + // let avg_latency = if latency_ringbuffer.is_empty() { + // 0 + // } else { + // latency_ringbuffer.iter().sum::().num_milliseconds() / latency_ringbuffer.len() as i64 + // }; - let client_data = ClientData { - name: "Latency".to_string(), - node: "Internal".to_string(), - unit: "ms".to_string(), - run_id: crate::RUN_ID.load(Ordering::Relaxed), - timestamp: chrono::offset::Utc::now(), - values: vec![avg_latency as f32] - }; - trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32)); - self.send_socket_msg(&client_data, &mut upload_counter); + // let client_data = ClientData { + // name: "Latency".to_string(), + // node: "Internal".to_string(), + // unit: "ms".to_string(), + // run_id: crate::RUN_ID.load(Ordering::Relaxed), + // timestamp: chrono::offset::Utc::now(), + // values: vec![avg_latency as f32] + // }; + // trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32)); + // self.send_socket_msg(client_data, &mut upload_counter); } - } } } @@ -308,35 +274,8 @@ impl MqttProcessor { /// Send a message to the channel, printing and IGNORING any error that may occur /// * `client_data` - The client data to send over the broadcast async fn send_db_msg(&self, client_data: ClientData) { - if let Err(err) = self.channel.send(client_data).await { + if let Err(err) = self.channel.send(client_data) { warn!("Error sending through channel: {:?}", err); } } - - /// Sends a message to the socket, printing and IGNORING any error that may occur - /// * `client_data` - The client data to send over the broadcast - fn send_socket_msg(&self, client_data: &ClientData, upload_counter: &mut u8) { - *upload_counter = upload_counter.wrapping_add(1); - if *upload_counter >= self.upload_ratio { - match self.io.emit( - "message", - &serde_json::to_string(&client_data).expect("Could not serialize ClientData"), - ) { - Ok(_) => (), - Err(err) => match err { - socketioxide::BroadcastError::Socket(e) => { - trace!("Socket: Transmit error: {:?}", e); - } - socketioxide::BroadcastError::Serialize(_) => { - warn!("Socket: Serialize error: {}", err) - } - socketioxide::BroadcastError::Adapter(_) => { - warn!("Socket: Adapter error: {}", err) - } - }, - } - } else { - trace!("Discarding message with topic {}", client_data.name); - } - } } diff --git a/scylla-server/src/proto/mod.rs b/scylla-server/src/proto/mod.rs new file mode 100644 index 00000000..dbc1df8b --- /dev/null +++ b/scylla-server/src/proto/mod.rs @@ -0,0 +1,5 @@ +// @generated + +pub mod command_data; +pub mod playback_data; +pub mod serverdata; diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs new file mode 100644 index 00000000..bfa89b9b --- /dev/null +++ b/scylla-server/src/socket_handler.rs @@ -0,0 +1,237 @@ +use chrono::{DateTime, Utc}; +use regex::Regex; +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use serde::Serialize; +use socketioxide::SocketIo; +use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; +use tracing::{debug, trace, warn}; + +use crate::metadata_structs::{ + map_dti_flt, FaultData, Node, TimerData, DATA_SOCKET_KEY, FAULT_BINS, FAULT_MIN_REG_GAP, + FAULT_SOCKET_KEY, TIMERS_TOPICS, TIMER_SOCKET_KEY, +}; +use crate::ClientData; + +pub async fn socket_handler( + cancel_token: CancellationToken, + mut data_channel: broadcast::Receiver, + upload_ratio: u8, + io: SocketIo, +) { + let mut upload_counter = 0u8; + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Shutting down socket handler!"); + break; + }, + Ok(data) = data_channel.recv() => { + send_socket_msg(&data, &mut upload_counter, upload_ratio, &io, DATA_SOCKET_KEY); + } + } + } +} + +pub async fn socket_handler_with_metadata( + cancel_token: CancellationToken, + mut data_channel: broadcast::Receiver, + upload_ratio: u8, + io: SocketIo, +) { + let mut upload_counter = 0u8; + + // INTERVAL TIMERS for periodic things to be sent + let mut view_interval = tokio::time::interval(Duration::from_secs(3)); + let mut timers_interval = tokio::time::interval(Duration::from_secs(1)); + let mut recent_faults_interval = tokio::time::interval(Duration::from_secs(1)); + + // init timers + let mut timer_map: HashMap = HashMap::new(); + for item in TIMERS_TOPICS { + timer_map.insert( + item.to_string(), + TimerData { + topic: item, + last_change: DateTime::UNIX_EPOCH, + last_value: 0.0f32, + }, + ); + } + + // init faults + let fault_regex_bms: Regex = + Regex::new(r"BMS\/Status\/F\/(.*)").expect("Could not compile regex!"); + let fault_regex_charger: Regex = + Regex::new(r"Charger\/Box\/F_(.*)").expect("Could not compile regex!"); + let fault_regex_mpu: Regex = + Regex::new(r"MPU\/Fault\/F_(.*)").expect("Could not compile regex!"); + let mut fault_ringbuffer = AllocRingBuffer::::new(25); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Shutting down socket handler!"); + break; + }, + Ok(data) = data_channel.recv() => { + send_socket_msg( + &data, + &mut upload_counter, + upload_ratio, + &io, + DATA_SOCKET_KEY, + ); + handle_socket_msg(data, &fault_regex_mpu, &fault_regex_bms, &fault_regex_charger, &mut timer_map, &mut fault_ringbuffer); + } + _ = recent_faults_interval.tick() => { + send_socket_msg( + &fault_ringbuffer.to_vec(), + &mut upload_counter, + upload_ratio, + &io, + FAULT_SOCKET_KEY, + ) + }, + _ = timers_interval.tick() => { + trace!("Sending Timers Intervals!"); + for item in timer_map.values() { + send_socket_msg(item, &mut upload_counter, upload_ratio, &io, TIMER_SOCKET_KEY); + } + + }, + _ = view_interval.tick() => { + trace!("Updating viewership data!"); + let sockets = io.sockets(); + let sockets_cnt = match sockets { + Ok(s) => s.len() as f32, + Err(_) => -1f32, + }; + let item = ClientData { + name: "Argos/Viewers".to_string(), + node: "Internal".to_string(), + unit: "".to_string(), + run_id: crate::RUN_ID.load(Ordering::Relaxed), + timestamp: chrono::offset::Utc::now(), + values: vec![sockets_cnt] + }; + send_socket_msg( + &item, + &mut upload_counter, + upload_ratio, + &io, + DATA_SOCKET_KEY, + ); + } + } + } +} + +/// Handles parsing and creating metadata for a newly received socket message. +fn handle_socket_msg( + data: ClientData, + fault_regex_mpu: &Regex, + fault_regex_bms: &Regex, + fault_regex_charger: &Regex, + timer_map: &mut HashMap, + fault_ringbuffer: &mut AllocRingBuffer, +) { + // check to see if we fit a timer case, and then act upon it + // IMPORTANT: assumes a timer is never also a fault + if let Some(time) = timer_map.get_mut(&data.name) { + trace!("Triggering timer: {}", data.name); + let new_val = *data.values.first().unwrap_or(&-1f32); + if time.last_value != new_val { + time.last_value = new_val; + time.last_change = Utc::now(); + } + return; + } + + // check to see if this is a fault, and return the fault name and node + // each bring is the logic to get a node, note the difference in DTI + let (flt_txt, node) = if let Some(mtch) = fault_regex_bms.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Bms) + } else if let Some(mtch) = fault_regex_charger.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Charger) + } else if let Some(mtch) = fault_regex_mpu.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Mpu) + } else if FAULT_BINS[0] == data.name { + let Some(flt) = map_dti_flt(*data.values.first().unwrap_or(&0f32) as usize) else { + return; + }; + (flt, Node::Dti) + } else { + return; + }; + + trace!("Matched on {}, {:?}", flt_txt, node); + + // default to sending a new fault + let mut should_push = true; + // iterate through current faults + for item in fault_ringbuffer.iter_mut() { + // if a fault of the same type is in the queue, and not expired + if item.name == flt_txt && node.clone() == item.node && !item.expired { + // update the last seen metric + item.last_seen = data.timestamp; + // if the time since the last fault is greater than [FAULT_MIN_REG_GAP], mark this fault as expired + if (data.timestamp - item.last_seen) > FAULT_MIN_REG_GAP { + item.expired = true; + } else { + // otherwise, if the fault isnt expired, ensure we dont create a duplicate fault + should_push = false; + } + } + } + // send a new fault if no message matches and is not expired + if should_push { + fault_ringbuffer.push(FaultData { + node, + name: flt_txt.to_string(), + occured_at: data.timestamp, + last_seen: data.timestamp, + expired: false, + }); + } +} + +/// Sends a message to the socket, printing and IGNORING any error that may occur +/// * `client_data` - The client data to send over the broadcast +/// * `upload_counter` - The counter of data that has been uploaded, for basic rate limiting +/// * `upload-ratio` - The rate limit ratio +/// * `io` - The socket to upload to +/// * `socket_key` - The socket key to send to +fn send_socket_msg( + client_data: &T, + upload_counter: &mut u8, + upload_ratio: u8, + io: &SocketIo, + socket_key: &'static str, +) where + T: Serialize, +{ + *upload_counter = upload_counter.wrapping_add(1); + if *upload_counter >= upload_ratio { + match io.emit( + socket_key, + &serde_json::to_string(client_data).expect("Could not serialize ClientData"), + ) { + Ok(_) => (), + Err(err) => match err { + socketioxide::BroadcastError::Socket(e) => { + trace!("Socket: Transmit error: {:?}", e); + } + socketioxide::BroadcastError::Serialize(_) => { + warn!("Socket: Serialize error: {}", err) + } + socketioxide::BroadcastError::Adapter(_) => { + warn!("Socket: Adapter error: {}", err) + } + }, + } + } else { + trace!("Discarding message!"); + } +}