From 8ff348655b1b091ee7fb8d18556fead4fdf9b9a8 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 28 Nov 2024 21:18:20 -0500 Subject: [PATCH 1/9] initial support --- scylla-server/src/db_handler.rs | 24 +++---- scylla-server/src/lib.rs | 1 + scylla-server/src/main.rs | 41 +++++++++-- scylla-server/src/mqtt_processor.rs | 105 ++++++--------------------- scylla-server/src/socket_handler.rs | 106 ++++++++++++++++++++++++++++ 5 files changed, 174 insertions(+), 103 deletions(-) create mode 100644 scylla-server/src/socket_handler.rs diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index 35cba368..1fa682e7 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -1,6 +1,6 @@ -use tokio::sync::mpsc::Receiver; +use tokio::sync::{broadcast, mpsc}; -use tokio::{sync::mpsc::Sender, time::Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument, trace, warn, Level}; @@ -14,7 +14,7 @@ pub struct DbHandler { /// The list of data types seen by this instance, used for when to upsert datatype_list: Vec, /// The broadcast channel which provides serial datapoints for processing - receiver: Receiver, + receiver: broadcast::Receiver, /// The database pool handle pool: PoolHandle, /// the queue of data @@ -29,7 +29,7 @@ impl DbHandler { /// Make a new db handler /// * `recv` - the broadcast reciver of which clientdata will be sent pub fn new( - receiver: Receiver, + receiver: broadcast::Receiver, pool: PoolHandle, upload_interval: u64, ) -> DbHandler { @@ -47,7 +47,7 @@ impl DbHandler { /// It uses the queue from data queue to insert to the database specified /// On cancellation, will await one final queue message to cleanup anything remaining in the channel pub async fn batching_loop( - mut batch_queue: Receiver>, + mut batch_queue: mpsc::Receiver>, pool: PoolHandle, cancel_token: CancellationToken, ) { @@ -94,7 +94,7 @@ impl DbHandler { /// A batching loop that consumes messages but does not upload anything pub async fn fake_batching_loop( - mut batch_queue: Receiver>, + mut batch_queue: mpsc::Receiver>, cancel_token: CancellationToken, ) { loop { @@ -129,7 +129,7 @@ impl DbHandler { /// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation pub async fn handling_loop( mut self, - data_channel: Sender>, + data_channel: mpsc::Sender>, cancel_token: CancellationToken, ) { loop { @@ -140,7 +140,7 @@ impl DbHandler { self.data_queue.clear(); break; }, - Some(msg) = self.receiver.recv() => { + Ok(msg) = self.receiver.recv() => { self.handle_msg(msg, &data_channel).await; } } @@ -148,12 +148,8 @@ impl DbHandler { } #[instrument(skip(self), level = Level::TRACE)] - async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender>) { - trace!( - "Mqtt dispatcher: {} of {}", - self.receiver.len(), - self.receiver.max_capacity() - ); + async fn handle_msg(&mut self, msg: ClientData, data_channel: &mpsc::Sender>) { + trace!("Mqtt dispatcher: {}", self.receiver.len(),); // If the time is greater than upload interval, push to batch upload thread and clear queue if tokio::time::Instant::now().duration_since(self.last_time) diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index 5ea5f7db..8c0dca3f 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -6,6 +6,7 @@ pub mod services; pub mod db_handler; pub mod mqtt_processor; +pub mod socket_handler; #[allow(non_snake_case)] pub mod models; diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 7241b6b9..50d6b8b7 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -23,6 +23,7 @@ use scylla_server::{ data_type_controller, run_controller, }, services::run_service::{self}, + socket_handler::{socket_handler, socket_handler_with_metadata}, PoolHandle, RateLimitMode, }; use scylla_server::{ @@ -31,7 +32,10 @@ use scylla_server::{ ClientData, RUN_ID, }; use socketioxide::{extract::SocketRef, SocketIo}; -use tokio::{signal, sync::mpsc}; +use tokio::{ + signal, + sync::{broadcast, mpsc}, +}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tower::ServiceBuilder; use tower_http::{ @@ -97,6 +101,10 @@ struct ScyllaArgs { default_value = "0" )] socketio_discard_percent: u8, + + /// Whether to disable sending of metadata over the socket to the client + #[arg(long, env = "SCYLLA_SOCKET_DISABLE_METADATA")] + no_metadata: bool, } const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); @@ -162,9 +170,9 @@ async fn main() { // channel to pass the mqtt data // TODO tune buffer size - let (mqtt_send, mqtt_receive) = mpsc::channel::(10000); + let (mqtt_send, mqtt_receive) = broadcast::channel::(10000); - // channel to pass the processed data to the db thread + // channel to pass the processed data to the batch uploading thread // TODO tune buffer size let (db_send, db_receive) = mpsc::channel::>(1000); @@ -172,10 +180,31 @@ async fn main() { // create a task tracker and cancellation token let task_tracker = TaskTracker::new(); let token = CancellationToken::new(); + + if cli.no_metadata { + task_tracker.spawn(socket_handler( + token.clone(), + mqtt_receive, + cli.socketio_discard_percent, + io, + )); + } else { + task_tracker.spawn(socket_handler_with_metadata( + token.clone(), + mqtt_receive, + cli.socketio_discard_percent, + io, + )); + } + // spawn the database handler task_tracker.spawn( - db_handler::DbHandler::new(mqtt_receive, db.clone(), cli.batch_upsert_time * 1000) - .handling_loop(db_send, token.clone()), + db_handler::DbHandler::new( + mqtt_send.subscribe(), + db.clone(), + cli.batch_upsert_time * 1000, + ) + .handling_loop(db_send, token.clone()), ); // spawn the database inserter, if we have it enabled if !cli.disable_data_upload { @@ -203,14 +232,12 @@ async fn main() { info!("Running processor in MQTT (production) mode"); let (recv, opts) = MqttProcessor::new( mqtt_send, - io, token.clone(), MqttProcessorOptions { mqtt_path: cli.siren_host_url, initial_run: curr_run.id, static_rate_limit_time: cli.static_rate_limit_value, rate_limit_mode: cli.rate_limit_mode, - upload_ratio: cli.socketio_discard_percent, }, ); let (client, eventloop) = AsyncClient::new(opts, 600); diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index a48de461..8ac47649 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -11,8 +11,7 @@ use rumqttc::v5::{ mqttbytes::v5::{Packet, Publish}, AsyncClient, Event, EventLoop, MqttOptions, }; -use socketioxide::SocketIo; -use tokio::{sync::mpsc::Sender, time::Instant}; +use tokio::{sync::broadcast, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn, Level}; @@ -26,16 +25,12 @@ use super::ClientData; /// - mqtt state /// - reception via mqtt and subsequent parsing /// - labeling of data with runs -/// - sending data over socket /// - sending data over the channel to a db handler /// /// It also is the main form of rate limiting pub struct MqttProcessor { - channel: Sender, - io: SocketIo, + channel: broadcast::Sender, cancel_token: CancellationToken, - /// Upload ratio, below is not socket sent above is socket sent - upload_ratio: u8, /// static rate limiter rate_limiter: HashMap, /// time to rate limit in ms @@ -54,20 +49,16 @@ pub struct MqttProcessorOptions { pub static_rate_limit_time: u64, /// the rate limit mode pub rate_limit_mode: RateLimitMode, - /// the upload ratio for the socketio - pub upload_ratio: u8, } impl MqttProcessor { /// Creates a new mqtt receiver and socketio and db sender /// * `channel` - The mpsc channel to send the database data to - /// * `io` - The socketio layer to send the data to /// * `cancel_token` - The token which indicates cancellation of the task /// * `opts` - The mqtt processor options to use /// Returns the instance and options to create a client, which is then used in the process_mqtt loop pub fn new( - channel: Sender, - io: SocketIo, + channel: broadcast::Sender, cancel_token: CancellationToken, opts: MqttProcessorOptions, ) -> (MqttProcessor, MqttOptions) { @@ -100,9 +91,7 @@ impl MqttProcessor { ( MqttProcessor { channel, - io, cancel_token, - upload_ratio: opts.upload_ratio, rate_limiter: rate_map, rate_limit_time: opts.static_rate_limit_time, rate_limit_mode: opts.rate_limit_mode, @@ -115,12 +104,10 @@ impl MqttProcessor { /// * `eventloop` - The eventloop returned by ::new to connect to. The loop isnt sync so this is the best that can be done /// * `client` - The async mqttt v5 client to use for subscriptions pub async fn process_mqtt(mut self, client: Arc, mut eventloop: EventLoop) { - let mut view_interval = tokio::time::interval(Duration::from_secs(3)); - - let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); + // let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); - let mut upload_counter: u8 = 0; + //let mut upload_counter: u8 = 0; debug!("Subscribing to siren"); client @@ -145,48 +132,29 @@ impl MqttProcessor { }; latency_ringbuffer.push(chrono::offset::Utc::now() - msg.timestamp); self.send_db_msg(msg.clone()).await; - self.send_socket_msg(msg, &mut upload_counter); }, Err(msg) => trace!("Received mqtt error: {:?}", msg), _ => trace!("Received misc mqtt: {:?}", msg), }, - _ = view_interval.tick() => { - trace!("Updating viewership data!"); - let sockets = self.io.sockets(); - let sockets_cnt = match sockets { - Ok(s) => s.len() as f32, - Err(_) => -1f32, - }; - let client_data = ClientData { - name: "Viewers".to_string(), - node: "Internal".to_string(), - unit: "".to_string(), - run_id: crate::RUN_ID.load(Ordering::Relaxed), - timestamp: chrono::offset::Utc::now(), - values: vec![sockets_cnt] - }; - self.send_socket_msg(client_data, &mut upload_counter); - } - _ = latency_interval.tick() => { - // set latency to 0 if no messages are in buffer - let avg_latency = if latency_ringbuffer.is_empty() { - 0 - } else { - latency_ringbuffer.iter().sum::().num_milliseconds() / latency_ringbuffer.len() as i64 - }; + // _ = latency_interval.tick() => { + // // set latency to 0 if no messages are in buffer + // let avg_latency = if latency_ringbuffer.is_empty() { + // 0 + // } else { + // latency_ringbuffer.iter().sum::().num_milliseconds() / latency_ringbuffer.len() as i64 + // }; - let client_data = ClientData { - name: "Latency".to_string(), - node: "Internal".to_string(), - unit: "ms".to_string(), - run_id: crate::RUN_ID.load(Ordering::Relaxed), - timestamp: chrono::offset::Utc::now(), - values: vec![avg_latency as f32] - }; - trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32)); - self.send_socket_msg(client_data, &mut upload_counter); + // let client_data = ClientData { + // name: "Latency".to_string(), + // node: "Internal".to_string(), + // unit: "ms".to_string(), + // run_id: crate::RUN_ID.load(Ordering::Relaxed), + // timestamp: chrono::offset::Utc::now(), + // values: vec![avg_latency as f32] + // }; + // trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32)); + // self.send_socket_msg(client_data, &mut upload_counter); } - } } } @@ -312,35 +280,8 @@ impl MqttProcessor { /// Send a message to the channel, printing and IGNORING any error that may occur /// * `client_data` - The client data to send over the broadcast async fn send_db_msg(&self, client_data: ClientData) { - if let Err(err) = self.channel.send(client_data.clone()).await { + if let Err(err) = self.channel.send(client_data.clone()) { warn!("Error sending through channel: {:?}", err); } } - - /// Sends a message to the socket, printing and IGNORING any error that may occur - /// * `client_data` - The client data to send over the broadcast - fn send_socket_msg(&self, client_data: ClientData, upload_counter: &mut u8) { - *upload_counter = upload_counter.wrapping_add(1); - if *upload_counter >= self.upload_ratio { - match self.io.emit( - "message", - serde_json::to_string(&client_data).expect("Could not serialize ClientData"), - ) { - Ok(_) => (), - Err(err) => match err { - socketioxide::BroadcastError::Socket(e) => { - trace!("Socket: Transmit error: {:?}", e); - } - socketioxide::BroadcastError::Serialize(_) => { - warn!("Socket: Serialize error: {}", err) - } - socketioxide::BroadcastError::Adapter(_) => { - warn!("Socket: Adapter error: {}", err) - } - }, - } - } else { - trace!("Discarding message with topic {}", client_data.name); - } - } } diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs new file mode 100644 index 00000000..f165f38f --- /dev/null +++ b/scylla-server/src/socket_handler.rs @@ -0,0 +1,106 @@ +use std::{sync::atomic::Ordering, time::Duration}; + +use serde::Serialize; +use socketioxide::SocketIo; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; +use tracing::{debug, trace, warn}; + +use crate::ClientData; + +pub async fn socket_handler( + cancel_token: CancellationToken, + mut data_channel: broadcast::Receiver, + upload_ratio: u8, + io: SocketIo, +) { + let mut upload_counter = 0u8; + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Shutting down socket handler!"); + break; + }, + Ok(data) = data_channel.recv() => { + send_socket_msg(data, &mut upload_counter, upload_ratio, &io); + } + } + } +} + +pub async fn socket_handler_with_metadata( + cancel_token: CancellationToken, + mut data_channel: broadcast::Receiver, + upload_ratio: u8, + io: SocketIo, +) { + let mut upload_counter = 0u8; + + // INTERVAL TIMERS for periodic things to be sent + let mut view_interval = tokio::time::interval(Duration::from_secs(3)); + + loop { + // each function in this select returns one or more client_data objects + let send_items: Vec = tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Shutting down socket handler!"); + break; + }, + Ok(data) = data_channel.recv() => { + vec![data] + } + _ = view_interval.tick() => { + trace!("Updating viewership data!"); + let sockets = io.sockets(); + let sockets_cnt = match sockets { + Ok(s) => s.len() as f32, + Err(_) => -1f32, + }; + vec![ClientData { + name: "Viewers".to_string(), + node: "Internal".to_string(), + unit: "".to_string(), + run_id: crate::RUN_ID.load(Ordering::Relaxed), + timestamp: chrono::offset::Utc::now(), + values: vec![sockets_cnt] + }] + } + }; + for item in send_items { + send_socket_msg(item, &mut upload_counter, upload_ratio, &io); + } + } +} + +/// Sends a message to the socket, printing and IGNORING any error that may occur +/// * `client_data` - The client data to send over the broadcast +/// * `upload_counter` - The counter of data that has been uploaded, for basic rate limiting +/// * `upload-ratio` - The rate limit ratio +/// * `io` - The socket to upload to +fn send_socket_msg(client_data: T, upload_counter: &mut u8, upload_ratio: u8, io: &SocketIo) +where + T: Serialize, +{ + *upload_counter = upload_counter.wrapping_add(1); + if *upload_counter >= upload_ratio { + match io.emit( + "message", + serde_json::to_string(&client_data).expect("Could not serialize ClientData"), + ) { + Ok(_) => (), + Err(err) => match err { + socketioxide::BroadcastError::Socket(e) => { + trace!("Socket: Transmit error: {:?}", e); + } + socketioxide::BroadcastError::Serialize(_) => { + warn!("Socket: Serialize error: {}", err) + } + socketioxide::BroadcastError::Adapter(_) => { + warn!("Socket: Adapter error: {}", err) + } + }, + } + } else { + trace!("Discarding message!"); + } +} From 906866c6e9489f44343d412901d1c2de9bd6216b Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 28 Nov 2024 22:04:35 -0500 Subject: [PATCH 2/9] fixups --- scylla-server/src/mqtt_processor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index 8ac47649..5f555a1f 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -25,7 +25,7 @@ use super::ClientData; /// - mqtt state /// - reception via mqtt and subsequent parsing /// - labeling of data with runs -/// - sending data over the channel to a db handler +/// - sending data over the channel to a db handler and socket /// /// It also is the main form of rate limiting pub struct MqttProcessor { @@ -107,7 +107,6 @@ impl MqttProcessor { // let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); - //let mut upload_counter: u8 = 0; debug!("Subscribing to siren"); client From 87b0986ce1f751b4bac207d7b5cb3e2f3fb7e7a8 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sat, 14 Dec 2024 19:24:38 -0500 Subject: [PATCH 3/9] addition of timer logic --- scylla-server/src/mqtt_processor.rs | 1 - scylla-server/src/socket_handler.rs | 111 ++++++++++++++++++++++++---- 2 files changed, 96 insertions(+), 16 deletions(-) diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index 5f555a1f..1d8b564f 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -107,7 +107,6 @@ impl MqttProcessor { // let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); - debug!("Subscribing to siren"); client .subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce) diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index f165f38f..a82c3fa0 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -1,5 +1,6 @@ -use std::{sync::atomic::Ordering, time::Duration}; +use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; +use chrono::{DateTime, Utc}; use serde::Serialize; use socketioxide::SocketIo; use tokio::sync::broadcast; @@ -8,6 +9,8 @@ use tracing::{debug, trace, warn}; use crate::ClientData; +const DATA_SOCKET_KEY: &str = "data"; + pub async fn socket_handler( cancel_token: CancellationToken, mut data_channel: broadcast::Receiver, @@ -22,11 +25,31 @@ pub async fn socket_handler( break; }, Ok(data) = data_channel.recv() => { - send_socket_msg(data, &mut upload_counter, upload_ratio, &io); + send_socket_msg(&data, &mut upload_counter, upload_ratio, &io, DATA_SOCKET_KEY); } } } } +#[derive(Serialize)] +struct TimerData { + pub topic: &'static str, + pub last_change: DateTime, + pub last_value: f32, +} +const TIMER_SOCKET_KEY: &str = "timers"; +const TIMERS_TOPICS: &[&str] = &[ + "BMS/Status/Balancing", + "BMS/Status/State", + "BMS/Charging/Control", +]; + +// #[derive(Serialize)] +// struct FaultData { +// pub topic: &'static str, +// pub name: String, +// pub occured_at: DateTime, +// } +// const FAULT_SOCKET_KEY: &str = "faults"; pub async fn socket_handler_with_metadata( cancel_token: CancellationToken, @@ -38,17 +61,65 @@ pub async fn socket_handler_with_metadata( // INTERVAL TIMERS for periodic things to be sent let mut view_interval = tokio::time::interval(Duration::from_secs(3)); + let mut timers_interval = tokio::time::interval(Duration::from_secs(3)); + + // init timers + let mut timer_map: HashMap = HashMap::new(); + for item in TIMERS_TOPICS { + timer_map.insert( + item.to_string(), + TimerData { + topic: item, + last_change: DateTime::UNIX_EPOCH, + last_value: 0.0f32, + }, + ); + } + + // init faults + // let fault_regex: Regex = Regex::new(r"(BMS/Status/F/*|Charger/Box/F_*|MPU/Fault/F_*") + // .expect("Could not compile regex!"); + // const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; + // let mut fault_ringbuffer = AllocRingBuffer::::new(25); loop { - // each function in this select returns one or more client_data objects - let send_items: Vec = tokio::select! { + tokio::select! { _ = cancel_token.cancelled() => { debug!("Shutting down socket handler!"); break; }, Ok(data) = data_channel.recv() => { - vec![data] + send_socket_msg( + &data, + &mut upload_counter, + upload_ratio, + &io, + DATA_SOCKET_KEY, + ); + + // check to see if we fit a timer case, and then act upon it + if let Some(time) = timer_map.get_mut(&data.name) { + let new_val = *data.values.first().unwrap_or(&-1f32); + if time.last_value != new_val { + time.last_value = new_val; + time.last_change = Utc::now(); + } + continue; + } + // if fault_regex.is_match(&data.name) { + // fault_ringbuffer.push() + // } else { + + // } + } + _ = timers_interval.tick() => { + trace!("Sending Timers Intervals!"); + for item in timer_map.values() { + send_socket_msg(item, &mut upload_counter, upload_ratio, &io, TIMER_SOCKET_KEY); + } + + }, _ = view_interval.tick() => { trace!("Updating viewership data!"); let sockets = io.sockets(); @@ -56,18 +127,22 @@ pub async fn socket_handler_with_metadata( Ok(s) => s.len() as f32, Err(_) => -1f32, }; - vec![ClientData { - name: "Viewers".to_string(), + let item = ClientData { + name: "Argos/Viewers".to_string(), node: "Internal".to_string(), unit: "".to_string(), run_id: crate::RUN_ID.load(Ordering::Relaxed), timestamp: chrono::offset::Utc::now(), values: vec![sockets_cnt] - }] + }; + send_socket_msg( + &item, + &mut upload_counter, + upload_ratio, + &io, + DATA_SOCKET_KEY, + ); } - }; - for item in send_items { - send_socket_msg(item, &mut upload_counter, upload_ratio, &io); } } } @@ -77,15 +152,21 @@ pub async fn socket_handler_with_metadata( /// * `upload_counter` - The counter of data that has been uploaded, for basic rate limiting /// * `upload-ratio` - The rate limit ratio /// * `io` - The socket to upload to -fn send_socket_msg(client_data: T, upload_counter: &mut u8, upload_ratio: u8, io: &SocketIo) -where +/// * `socket_key` - The socket key to send to +fn send_socket_msg( + client_data: &T, + upload_counter: &mut u8, + upload_ratio: u8, + io: &SocketIo, + socket_key: &'static str, +) where T: Serialize, { *upload_counter = upload_counter.wrapping_add(1); if *upload_counter >= upload_ratio { match io.emit( - "message", - serde_json::to_string(&client_data).expect("Could not serialize ClientData"), + socket_key, + serde_json::to_string(client_data).expect("Could not serialize ClientData"), ) { Ok(_) => (), Err(err) => match err { From 128b451d8342d300766d876b7677038034221196 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sun, 15 Dec 2024 23:09:06 -0500 Subject: [PATCH 4/9] in progress faults --- scylla-server/Cargo.lock | 19 +++++++++-------- scylla-server/Cargo.toml | 1 + scylla-server/src/socket_handler.rs | 33 ++++++++++++++++------------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index 874a978e..74d3b7cf 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -1547,14 +1547,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.7", - "regex-syntax 0.8.4", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", ] [[package]] @@ -1568,13 +1568,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.4", + "regex-syntax 0.8.5", ] [[package]] @@ -1585,9 +1585,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "ring" @@ -1755,6 +1755,7 @@ dependencies = [ "protobuf", "protobuf-codegen", "rand", + "regex", "ringbuffer", "rumqttc", "serde", diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index 47229e1a..1e7aaff8 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -28,6 +28,7 @@ axum-extra = { version = "0.9.3", features = ["query"] } chrono = { version = "0.4.38", features = ["serde"] } serde_json = "1.0.128" diesel_migrations = { version = "2.2.0", features = ["postgres"] } +regex = "1.11.1" [features] top = ["dep:console-subscriber"] diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index a82c3fa0..0baf3540 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; use chrono::{DateTime, Utc}; +use regex::Regex; +use ringbuffer::AllocRingBuffer; use serde::Serialize; use socketioxide::SocketIo; use tokio::sync::broadcast; @@ -43,13 +45,13 @@ const TIMERS_TOPICS: &[&str] = &[ "BMS/Charging/Control", ]; -// #[derive(Serialize)] -// struct FaultData { -// pub topic: &'static str, -// pub name: String, -// pub occured_at: DateTime, -// } -// const FAULT_SOCKET_KEY: &str = "faults"; +#[derive(Serialize)] +struct FaultData { + pub topic: &'static str, + pub name: String, + pub occured_at: DateTime, +} +const FAULT_SOCKET_KEY: &str = "faults"; pub async fn socket_handler_with_metadata( cancel_token: CancellationToken, @@ -77,10 +79,10 @@ pub async fn socket_handler_with_metadata( } // init faults - // let fault_regex: Regex = Regex::new(r"(BMS/Status/F/*|Charger/Box/F_*|MPU/Fault/F_*") - // .expect("Could not compile regex!"); - // const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; - // let mut fault_ringbuffer = AllocRingBuffer::::new(25); + let fault_regex: Regex = Regex::new(r"(BMS/Status/F/*|Charger/Box/F_*|MPU/Fault/F_*") + .expect("Could not compile regex!"); + const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; + let mut fault_ringbuffer = AllocRingBuffer::::new(25); loop { tokio::select! { @@ -106,11 +108,12 @@ pub async fn socket_handler_with_metadata( } continue; } - // if fault_regex.is_match(&data.name) { - // fault_ringbuffer.push() - // } else { - // } + if fault_regex.is_match(&data.name) { + //fault_ringbuffer.push() + } else { + + } } _ = timers_interval.tick() => { From 93b238b1c795f0f444b5a6b882176dd51cbcdd1b Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 17 Jan 2025 20:39:19 -0500 Subject: [PATCH 5/9] merge and more features --- scylla-server/Cargo.lock | 2 +- scylla-server/build.rs | 2 +- .../src/controllers/car_command_controller.rs | 4 +- .../controllers/file_insertion_controller.rs | 4 +- scylla-server/src/db_handler.rs | 3 +- scylla-server/src/lib.rs | 4 +- scylla-server/src/main.rs | 5 +- scylla-server/src/mqtt_processor.rs | 8 +- scylla-server/src/proto/mod.rs | 5 + scylla-server/src/socket_handler.rs | 94 +++++++++++++++++-- 10 files changed, 104 insertions(+), 27 deletions(-) create mode 100644 scylla-server/src/proto/mod.rs diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index 04a13a05..6cfb3c9a 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -1971,8 +1971,8 @@ dependencies = [ "protobuf", "protobuf-codegen", "rand", - "regex", "rangemap", + "regex", "ringbuffer", "rumqttc", "rustc-hash 2.1.0", diff --git a/scylla-server/build.rs b/scylla-server/build.rs index 3c0bfc5c..128d1e6e 100644 --- a/scylla-server/build.rs +++ b/scylla-server/build.rs @@ -11,6 +11,6 @@ fn main() { .input("src/proto/command_data.proto") .input("src/proto/playback_data.proto") // Specify output directory relative to Cargo output directory. - .out_dir("src") + .out_dir("src/proto") .run_from_script(); } diff --git a/scylla-server/src/controllers/car_command_controller.rs b/scylla-server/src/controllers/car_command_controller.rs index 7c6cece7..dfb83039 100644 --- a/scylla-server/src/controllers/car_command_controller.rs +++ b/scylla-server/src/controllers/car_command_controller.rs @@ -7,7 +7,7 @@ use rumqttc::v5::AsyncClient; use serde::Deserialize; use tracing::{info, warn}; -use crate::{command_data::CommandData, error::ScyllaError}; +use crate::{error::ScyllaError, proto::command_data}; /// the prefix for the calypso topic, so topic of cmd is this plus the key appended on pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/"; @@ -34,7 +34,7 @@ pub async fn send_config_command( ); // the protobuf calypso converts into CAN - let mut payload = CommandData::new(); + let mut payload = command_data::CommandData::new(); // empty "data" in the protobuf tells calypso to use the default value if let Some(data) = data_query.data { payload.data = data; diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs index 36846e29..227dd2a8 100644 --- a/scylla-server/src/controllers/file_insertion_controller.rs +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -9,7 +9,9 @@ use rangemap::RangeInclusiveMap; use tokio::sync::mpsc; use tracing::{debug, info, trace, warn}; -use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle}; +use crate::{ + error::ScyllaError, proto::playback_data, services::run_service, ClientData, PoolHandle, +}; /// Inserts a file using http multipart /// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index 23b21b5c..eb518ed8 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -1,6 +1,5 @@ -use tokio::sync::{broadcast, mpsc}; use rustc_hash::FxHashSet; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{broadcast, mpsc}; use tokio::time::Duration; diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index de368cc7..5ecc9d07 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -13,9 +13,7 @@ pub mod models; #[allow(non_snake_case)] pub mod schema; -pub mod command_data; -pub mod playback_data; -pub mod serverdata; +pub mod proto; pub mod transformers; diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 8e3868d5..e72cee23 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -26,7 +26,6 @@ use scylla_server::{ }, services::run_service::{self}, socket_handler::{socket_handler, socket_handler_with_metadata}, - PoolHandle, RateLimitMode, RateLimitMode, }; use scylla_server::{ @@ -220,10 +219,10 @@ async fn main() -> Result<(), Box> { task_tracker.spawn( db_handler::DbHandler::new( mqtt_send.subscribe(), - db.clone(), + pool.clone(), cli.batch_upsert_time * 1000, ) - .handling_loop(db_send, token.clone()), + .handling_loop(db_send.clone(), token.clone()), ); // spawn the database inserter, if we have it enabled if !cli.disable_data_upload { diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index 9b82f31d..9aea662d 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -10,15 +10,13 @@ use rumqttc::v5::{ mqttbytes::v5::{Packet, Publish}, AsyncClient, Event, EventLoop, MqttOptions, }; -use tokio::{sync::broadcast, time::Instant}; use rustc_hash::FxHashMap; -use socketioxide::SocketIo; -use tokio::{sync::mpsc::Sender, time::Instant}; +use tokio::{sync::broadcast, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn, Level}; use crate::{ - controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, RateLimitMode, + controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, proto::serverdata, RateLimitMode, }; use super::ClientData; @@ -276,7 +274,7 @@ impl MqttProcessor { /// Send a message to the channel, printing and IGNORING any error that may occur /// * `client_data` - The client data to send over the broadcast async fn send_db_msg(&self, client_data: ClientData) { - if let Err(err) = self.channel.send(client_data).await { + if let Err(err) = self.channel.send(client_data) { warn!("Error sending through channel: {:?}", err); } } diff --git a/scylla-server/src/proto/mod.rs b/scylla-server/src/proto/mod.rs new file mode 100644 index 00000000..dbc1df8b --- /dev/null +++ b/scylla-server/src/proto/mod.rs @@ -0,0 +1,5 @@ +// @generated + +pub mod command_data; +pub mod playback_data; +pub mod serverdata; diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index 0baf3540..737f6de2 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; use chrono::{DateTime, Utc}; use regex::Regex; -use ringbuffer::AllocRingBuffer; +use ringbuffer::{AllocRingBuffer, RingBuffer}; use serde::Serialize; use socketioxide::SocketIo; use tokio::sync::broadcast; @@ -45,14 +45,51 @@ const TIMERS_TOPICS: &[&str] = &[ "BMS/Charging/Control", ]; -#[derive(Serialize)] +#[derive(Serialize, PartialEq, Clone, Debug)] +enum Node { + BMS, + DTI, + MPU, + Charger, +} +// impl Node { +// fn convert_from(value: &str) -> Option { +// match value { +// "DTI" => Some(Self::DTI), +// "BMS" => Some(Self::BMS), +// "MPU" => Some(Self::MPU), +// "Charger" => Some(Self::Charger), +// _ => None, +// } +// } +// } + +#[derive(Serialize, Clone)] struct FaultData { - pub topic: &'static str, + pub node: Node, pub name: String, pub occured_at: DateTime, } const FAULT_SOCKET_KEY: &str = "faults"; +const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; +const fn map_dti_flt(index: usize) -> Option<&'static str> { + match index { + 0 => None, + 1 => Some("Overvoltage"), + 2 => None, + 3 => Some("DRV"), + 4 => Some("ABS_Overcurrent"), + 5 => Some("CTLR_Overtemp"), + 6 => Some("Motor_Overtemp"), + 7 => Some("Sensor_wire"), + 8 => Some("Sensor_general"), + 9 => Some("CAN_command"), + 0x0A => Some("Analog_input"), + _ => None, + } +} + pub async fn socket_handler_with_metadata( cancel_token: CancellationToken, mut data_channel: broadcast::Receiver, @@ -64,6 +101,7 @@ pub async fn socket_handler_with_metadata( // INTERVAL TIMERS for periodic things to be sent let mut view_interval = tokio::time::interval(Duration::from_secs(3)); let mut timers_interval = tokio::time::interval(Duration::from_secs(3)); + let mut recent_faults_interval = tokio::time::interval(Duration::from_secs(1)); // init timers let mut timer_map: HashMap = HashMap::new(); @@ -79,9 +117,12 @@ pub async fn socket_handler_with_metadata( } // init faults - let fault_regex: Regex = Regex::new(r"(BMS/Status/F/*|Charger/Box/F_*|MPU/Fault/F_*") - .expect("Could not compile regex!"); - const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; + let fault_regex_bms: Regex = + Regex::new(r"BMS\/Status\/F\/(.*)").expect("Could not compile regex!"); + let fault_regex_charger: Regex = + Regex::new(r"Charger\/Box\/F_(.*)").expect("Could not compile regex!"); + let fault_regex_mpu: Regex = + Regex::new(r"MPU\/Fault\/F_(.*)").expect("Could not compile regex!"); let mut fault_ringbuffer = AllocRingBuffer::::new(25); loop { @@ -99,8 +140,12 @@ pub async fn socket_handler_with_metadata( DATA_SOCKET_KEY, ); + warn!("hmm: {}", data.name); + // check to see if we fit a timer case, and then act upon it + // assumes a timer is never also a fault if let Some(time) = timer_map.get_mut(&data.name) { + warn!("Triggering timer: {}", data.name); let new_val = *data.values.first().unwrap_or(&-1f32); if time.last_value != new_val { time.last_value = new_val; @@ -109,13 +154,44 @@ pub async fn socket_handler_with_metadata( continue; } - if fault_regex.is_match(&data.name) { - //fault_ringbuffer.push() + // check to see if this is a fault, and return the fault name and node + let (flt_txt, node) = if let Some(mtch) = fault_regex_bms.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::BMS) + } else if let Some(mtch) = fault_regex_charger.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Charger) + } + else if let Some(mtch) = fault_regex_mpu.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::MPU) + } + else if FAULT_BINS[0] == data.name { + let Some(flt) = map_dti_flt(*data.values.first().unwrap_or(&0f32) as usize) else { + continue; + }; + (flt, Node::DTI) } else { + continue; + }; + + warn!("Matched on {}, {:?}", flt_txt, node); + + for item in fault_ringbuffer.iter() { + if item.name == flt_txt && node == item.node { + continue; + } } + fault_ringbuffer.push(FaultData { node: node, name: flt_txt.to_string(), occured_at: data.timestamp }); } + _ = recent_faults_interval.tick() => { + send_socket_msg( + &fault_ringbuffer.to_vec(), + &mut upload_counter, + upload_ratio, + &io, + FAULT_SOCKET_KEY, + ) + }, _ = timers_interval.tick() => { trace!("Sending Timers Intervals!"); for item in timer_map.values() { @@ -169,7 +245,7 @@ fn send_socket_msg( if *upload_counter >= upload_ratio { match io.emit( socket_key, - serde_json::to_string(client_data).expect("Could not serialize ClientData"), + &serde_json::to_string(client_data).expect("Could not serialize ClientData"), ) { Ok(_) => (), Err(err) => match err { From 405fb7d4944f10f0bfcb5e51f60b7659259a6564 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 17 Jan 2025 21:39:08 -0500 Subject: [PATCH 6/9] improve faulting logic --- scylla-server/src/socket_handler.rs | 81 +++++++++++++++++------------ 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index 737f6de2..6c66f708 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeDelta, Utc}; use regex::Regex; use ringbuffer::{AllocRingBuffer, RingBuffer}; use serde::Serialize; @@ -47,30 +47,27 @@ const TIMERS_TOPICS: &[&str] = &[ #[derive(Serialize, PartialEq, Clone, Debug)] enum Node { - BMS, - DTI, - MPU, + Bms, + Dti, + Mpu, Charger, } -// impl Node { -// fn convert_from(value: &str) -> Option { -// match value { -// "DTI" => Some(Self::DTI), -// "BMS" => Some(Self::BMS), -// "MPU" => Some(Self::MPU), -// "Charger" => Some(Self::Charger), -// _ => None, -// } -// } -// } #[derive(Serialize, Clone)] struct FaultData { + /// the node the fault came from pub node: Node, + /// the word describing the fault pub name: String, + /// when the fault occured pub occured_at: DateTime, + /// when the fault was last seen + pub last_seen: DateTime, + /// whether another fault of the same node and name as occured after this fault + pub expired: bool, } const FAULT_SOCKET_KEY: &str = "faults"; +const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8); const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; const fn map_dti_flt(index: usize) -> Option<&'static str> { @@ -140,12 +137,10 @@ pub async fn socket_handler_with_metadata( DATA_SOCKET_KEY, ); - warn!("hmm: {}", data.name); - // check to see if we fit a timer case, and then act upon it - // assumes a timer is never also a fault + // IMPORTANT: assumes a timer is never also a fault if let Some(time) = timer_map.get_mut(&data.name) { - warn!("Triggering timer: {}", data.name); + trace!("Triggering timer: {}", data.name); let new_val = *data.values.first().unwrap_or(&-1f32); if time.last_value != new_val { time.last_value = new_val; @@ -155,33 +150,51 @@ pub async fn socket_handler_with_metadata( } // check to see if this is a fault, and return the fault name and node + // each bring is the logic to get a node, note the difference in DTI let (flt_txt, node) = if let Some(mtch) = fault_regex_bms.captures_iter(&data.name).next() { - (mtch.get(1).map_or("", |m| m.as_str()), Node::BMS) + (mtch.get(1).map_or("", |m| m.as_str()), Node::Bms) } else if let Some(mtch) = fault_regex_charger.captures_iter(&data.name).next() { (mtch.get(1).map_or("", |m| m.as_str()), Node::Charger) - } - else if let Some(mtch) = fault_regex_mpu.captures_iter(&data.name).next() { - (mtch.get(1).map_or("", |m| m.as_str()), Node::MPU) - } - else if FAULT_BINS[0] == data.name { - let Some(flt) = map_dti_flt(*data.values.first().unwrap_or(&0f32) as usize) else { + } else if let Some(mtch) = fault_regex_mpu.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Mpu) + } else if FAULT_BINS[0] == data.name { + let Some(flt) = map_dti_flt(*data.values.first().unwrap_or(&0f32) as usize) else { continue; }; - (flt, Node::DTI) + (flt, Node::Dti) } else { continue; }; - warn!("Matched on {}, {:?}", flt_txt, node); - - for item in fault_ringbuffer.iter() { + trace!("Matched on {}, {:?}", flt_txt, node); - if item.name == flt_txt && node == item.node { - continue; + // default to sending a new fault + let mut should_push = true; + // iterate through current faults + for item in fault_ringbuffer.iter_mut() { + // if a fault of the same type is in the queue, and not expired + if item.name == flt_txt && node.clone() == item.node && !item.expired { + // update the last seen metric + (*item).last_seen = data.timestamp; + // if the time since the last fault is greater than [FAULT_MIN_REG_GAP], mark this fault as expired + if (data.timestamp - item.last_seen) > FAULT_MIN_REG_GAP { + (*item).expired = true; + } else { + // otherwise, if the fault isnt expired, ensure we dont create a duplicate fault + should_push = false; + } } } - fault_ringbuffer.push(FaultData { node: node, name: flt_txt.to_string(), occured_at: data.timestamp }); - + // send a new fault if no message matches and is not expired + if should_push { + fault_ringbuffer.push(FaultData { + node: node, + name: flt_txt.to_string(), + occured_at: data.timestamp, + last_seen: data.timestamp, + expired: false, + }); + } } _ = recent_faults_interval.tick() => { send_socket_msg( From eb2dbea07db5a8c3dcdbdad3de7d7790b4bd7b12 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 17 Jan 2025 21:52:05 -0500 Subject: [PATCH 7/9] fixups, ts ms --- scylla-server/src/socket_handler.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index 6c66f708..004cc1d9 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -1,10 +1,10 @@ -use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; - +use chrono::serde::ts_milliseconds; use chrono::{DateTime, TimeDelta, Utc}; use regex::Regex; use ringbuffer::{AllocRingBuffer, RingBuffer}; use serde::Serialize; use socketioxide::SocketIo; +use std::{collections::HashMap, sync::atomic::Ordering, time::Duration}; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -34,8 +34,12 @@ pub async fn socket_handler( } #[derive(Serialize)] struct TimerData { + /// the topic being timed pub topic: &'static str, + /// the last time the value changed + #[serde(with = "ts_milliseconds")] pub last_change: DateTime, + /// the value at the above time pub last_value: f32, } const TIMER_SOCKET_KEY: &str = "timers"; @@ -60,8 +64,10 @@ struct FaultData { /// the word describing the fault pub name: String, /// when the fault occured + #[serde(with = "ts_milliseconds")] pub occured_at: DateTime, /// when the fault was last seen + #[serde(with = "ts_milliseconds")] pub last_seen: DateTime, /// whether another fault of the same node and name as occured after this fault pub expired: bool, @@ -97,7 +103,7 @@ pub async fn socket_handler_with_metadata( // INTERVAL TIMERS for periodic things to be sent let mut view_interval = tokio::time::interval(Duration::from_secs(3)); - let mut timers_interval = tokio::time::interval(Duration::from_secs(3)); + let mut timers_interval = tokio::time::interval(Duration::from_secs(1)); let mut recent_faults_interval = tokio::time::interval(Duration::from_secs(1)); // init timers @@ -175,10 +181,10 @@ pub async fn socket_handler_with_metadata( // if a fault of the same type is in the queue, and not expired if item.name == flt_txt && node.clone() == item.node && !item.expired { // update the last seen metric - (*item).last_seen = data.timestamp; + item.last_seen = data.timestamp; // if the time since the last fault is greater than [FAULT_MIN_REG_GAP], mark this fault as expired if (data.timestamp - item.last_seen) > FAULT_MIN_REG_GAP { - (*item).expired = true; + item.expired = true; } else { // otherwise, if the fault isnt expired, ensure we dont create a duplicate fault should_push = false; @@ -188,7 +194,7 @@ pub async fn socket_handler_with_metadata( // send a new fault if no message matches and is not expired if should_push { fault_ringbuffer.push(FaultData { - node: node, + node, name: flt_txt.to_string(), occured_at: data.timestamp, last_seen: data.timestamp, From 40ceb2a0a2deea45fbab6c702fdbdd2678421a54 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Tue, 28 Jan 2025 19:00:00 -0500 Subject: [PATCH 8/9] move structs to a metadata_structs file --- scylla-server/src/lib.rs | 2 + scylla-server/src/metadata_structs.rs | 65 +++++++++++++++++++++++++ scylla-server/src/socket_handler.rs | 69 ++------------------------- 3 files changed, 72 insertions(+), 64 deletions(-) create mode 100644 scylla-server/src/metadata_structs.rs diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index 5ecc9d07..431c9811 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -6,6 +6,8 @@ pub mod services; pub mod db_handler; pub mod mqtt_processor; + +pub mod metadata_structs; pub mod socket_handler; #[allow(non_snake_case)] diff --git a/scylla-server/src/metadata_structs.rs b/scylla-server/src/metadata_structs.rs new file mode 100644 index 00000000..05c69e12 --- /dev/null +++ b/scylla-server/src/metadata_structs.rs @@ -0,0 +1,65 @@ +use ::serde::Serialize; +use chrono::{serde::ts_milliseconds, DateTime, TimeDelta, Utc}; + +pub const DATA_SOCKET_KEY: &str = "data"; + +#[derive(Serialize)] +pub struct TimerData { + /// the topic being timed + pub topic: &'static str, + /// the last time the value changed + #[serde(with = "ts_milliseconds")] + pub last_change: DateTime, + /// the value at the above time + pub last_value: f32, +} +pub const TIMER_SOCKET_KEY: &str = "timers"; +pub const TIMERS_TOPICS: &[&str] = &[ + "BMS/Status/Balancing", + "BMS/Status/State", + "BMS/Charging/Control", +]; + +#[derive(Serialize, PartialEq, Clone, Debug)] +pub enum Node { + Bms, + Dti, + Mpu, + Charger, +} + +#[derive(Serialize, Clone)] +pub struct FaultData { + /// the node the fault came from + pub node: Node, + /// the word describing the fault + pub name: String, + /// when the fault occured + #[serde(with = "ts_milliseconds")] + pub occured_at: DateTime, + /// when the fault was last seen + #[serde(with = "ts_milliseconds")] + pub last_seen: DateTime, + /// whether another fault of the same node and name as occured after this fault + pub expired: bool, +} +pub const FAULT_SOCKET_KEY: &str = "faults"; +pub const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8); + +pub const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; +pub const fn map_dti_flt(index: usize) -> Option<&'static str> { + match index { + 0 => None, + 1 => Some("Overvoltage"), + 2 => None, + 3 => Some("DRV"), + 4 => Some("ABS_Overcurrent"), + 5 => Some("CTLR_Overtemp"), + 6 => Some("Motor_Overtemp"), + 7 => Some("Sensor_wire"), + 8 => Some("Sensor_general"), + 9 => Some("CAN_command"), + 0x0A => Some("Analog_input"), + _ => None, + } +} diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index 004cc1d9..1c00f21a 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -1,5 +1,4 @@ -use chrono::serde::ts_milliseconds; -use chrono::{DateTime, TimeDelta, Utc}; +use chrono::{DateTime, Utc}; use regex::Regex; use ringbuffer::{AllocRingBuffer, RingBuffer}; use serde::Serialize; @@ -9,10 +8,12 @@ use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; +use crate::metadata_structs::{ + map_dti_flt, FaultData, Node, TimerData, DATA_SOCKET_KEY, FAULT_BINS, FAULT_MIN_REG_GAP, + FAULT_SOCKET_KEY, TIMERS_TOPICS, TIMER_SOCKET_KEY, +}; use crate::ClientData; -const DATA_SOCKET_KEY: &str = "data"; - pub async fn socket_handler( cancel_token: CancellationToken, mut data_channel: broadcast::Receiver, @@ -32,66 +33,6 @@ pub async fn socket_handler( } } } -#[derive(Serialize)] -struct TimerData { - /// the topic being timed - pub topic: &'static str, - /// the last time the value changed - #[serde(with = "ts_milliseconds")] - pub last_change: DateTime, - /// the value at the above time - pub last_value: f32, -} -const TIMER_SOCKET_KEY: &str = "timers"; -const TIMERS_TOPICS: &[&str] = &[ - "BMS/Status/Balancing", - "BMS/Status/State", - "BMS/Charging/Control", -]; - -#[derive(Serialize, PartialEq, Clone, Debug)] -enum Node { - Bms, - Dti, - Mpu, - Charger, -} - -#[derive(Serialize, Clone)] -struct FaultData { - /// the node the fault came from - pub node: Node, - /// the word describing the fault - pub name: String, - /// when the fault occured - #[serde(with = "ts_milliseconds")] - pub occured_at: DateTime, - /// when the fault was last seen - #[serde(with = "ts_milliseconds")] - pub last_seen: DateTime, - /// whether another fault of the same node and name as occured after this fault - pub expired: bool, -} -const FAULT_SOCKET_KEY: &str = "faults"; -const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8); - -const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; -const fn map_dti_flt(index: usize) -> Option<&'static str> { - match index { - 0 => None, - 1 => Some("Overvoltage"), - 2 => None, - 3 => Some("DRV"), - 4 => Some("ABS_Overcurrent"), - 5 => Some("CTLR_Overtemp"), - 6 => Some("Motor_Overtemp"), - 7 => Some("Sensor_wire"), - 8 => Some("Sensor_general"), - 9 => Some("CAN_command"), - 0x0A => Some("Analog_input"), - _ => None, - } -} pub async fn socket_handler_with_metadata( cancel_token: CancellationToken, From 7f03e9db0a042e08cbf6a11025abce7a7f9e29fa Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Tue, 28 Jan 2025 19:05:55 -0500 Subject: [PATCH 9/9] move handle branch --- scylla-server/src/socket_handler.rs | 129 +++++++++++++++------------- 1 file changed, 70 insertions(+), 59 deletions(-) diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index 1c00f21a..bfa89b9b 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -83,65 +83,7 @@ pub async fn socket_handler_with_metadata( &io, DATA_SOCKET_KEY, ); - - // check to see if we fit a timer case, and then act upon it - // IMPORTANT: assumes a timer is never also a fault - if let Some(time) = timer_map.get_mut(&data.name) { - trace!("Triggering timer: {}", data.name); - let new_val = *data.values.first().unwrap_or(&-1f32); - if time.last_value != new_val { - time.last_value = new_val; - time.last_change = Utc::now(); - } - continue; - } - - // check to see if this is a fault, and return the fault name and node - // each bring is the logic to get a node, note the difference in DTI - let (flt_txt, node) = if let Some(mtch) = fault_regex_bms.captures_iter(&data.name).next() { - (mtch.get(1).map_or("", |m| m.as_str()), Node::Bms) - } else if let Some(mtch) = fault_regex_charger.captures_iter(&data.name).next() { - (mtch.get(1).map_or("", |m| m.as_str()), Node::Charger) - } else if let Some(mtch) = fault_regex_mpu.captures_iter(&data.name).next() { - (mtch.get(1).map_or("", |m| m.as_str()), Node::Mpu) - } else if FAULT_BINS[0] == data.name { - let Some(flt) = map_dti_flt(*data.values.first().unwrap_or(&0f32) as usize) else { - continue; - }; - (flt, Node::Dti) - } else { - continue; - }; - - trace!("Matched on {}, {:?}", flt_txt, node); - - // default to sending a new fault - let mut should_push = true; - // iterate through current faults - for item in fault_ringbuffer.iter_mut() { - // if a fault of the same type is in the queue, and not expired - if item.name == flt_txt && node.clone() == item.node && !item.expired { - // update the last seen metric - item.last_seen = data.timestamp; - // if the time since the last fault is greater than [FAULT_MIN_REG_GAP], mark this fault as expired - if (data.timestamp - item.last_seen) > FAULT_MIN_REG_GAP { - item.expired = true; - } else { - // otherwise, if the fault isnt expired, ensure we dont create a duplicate fault - should_push = false; - } - } - } - // send a new fault if no message matches and is not expired - if should_push { - fault_ringbuffer.push(FaultData { - node, - name: flt_txt.to_string(), - occured_at: data.timestamp, - last_seen: data.timestamp, - expired: false, - }); - } + handle_socket_msg(data, &fault_regex_mpu, &fault_regex_bms, &fault_regex_charger, &mut timer_map, &mut fault_ringbuffer); } _ = recent_faults_interval.tick() => { send_socket_msg( @@ -186,6 +128,75 @@ pub async fn socket_handler_with_metadata( } } +/// Handles parsing and creating metadata for a newly received socket message. +fn handle_socket_msg( + data: ClientData, + fault_regex_mpu: &Regex, + fault_regex_bms: &Regex, + fault_regex_charger: &Regex, + timer_map: &mut HashMap, + fault_ringbuffer: &mut AllocRingBuffer, +) { + // check to see if we fit a timer case, and then act upon it + // IMPORTANT: assumes a timer is never also a fault + if let Some(time) = timer_map.get_mut(&data.name) { + trace!("Triggering timer: {}", data.name); + let new_val = *data.values.first().unwrap_or(&-1f32); + if time.last_value != new_val { + time.last_value = new_val; + time.last_change = Utc::now(); + } + return; + } + + // check to see if this is a fault, and return the fault name and node + // each bring is the logic to get a node, note the difference in DTI + let (flt_txt, node) = if let Some(mtch) = fault_regex_bms.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Bms) + } else if let Some(mtch) = fault_regex_charger.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Charger) + } else if let Some(mtch) = fault_regex_mpu.captures_iter(&data.name).next() { + (mtch.get(1).map_or("", |m| m.as_str()), Node::Mpu) + } else if FAULT_BINS[0] == data.name { + let Some(flt) = map_dti_flt(*data.values.first().unwrap_or(&0f32) as usize) else { + return; + }; + (flt, Node::Dti) + } else { + return; + }; + + trace!("Matched on {}, {:?}", flt_txt, node); + + // default to sending a new fault + let mut should_push = true; + // iterate through current faults + for item in fault_ringbuffer.iter_mut() { + // if a fault of the same type is in the queue, and not expired + if item.name == flt_txt && node.clone() == item.node && !item.expired { + // update the last seen metric + item.last_seen = data.timestamp; + // if the time since the last fault is greater than [FAULT_MIN_REG_GAP], mark this fault as expired + if (data.timestamp - item.last_seen) > FAULT_MIN_REG_GAP { + item.expired = true; + } else { + // otherwise, if the fault isnt expired, ensure we dont create a duplicate fault + should_push = false; + } + } + } + // send a new fault if no message matches and is not expired + if should_push { + fault_ringbuffer.push(FaultData { + node, + name: flt_txt.to_string(), + occured_at: data.timestamp, + last_seen: data.timestamp, + expired: false, + }); + } +} + /// Sends a message to the socket, printing and IGNORING any error that may occur /// * `client_data` - The client data to send over the broadcast /// * `upload_counter` - The counter of data that has been uploaded, for basic rate limiting