Skip to content

Commit

Permalink
Merge pull request #252 from Northeastern-Electric-Racing/223-metadat…
Browse files Browse the repository at this point in the history
…a-injector

223 metadata injector
  • Loading branch information
bracyw authored Jan 29, 2025
2 parents 39bd9be + 7f03e9d commit 00fba67
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 114 deletions.
1 change: 1 addition & 0 deletions scylla-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ axum-extra = { version = "0.10.0", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
regex = "1.11.1"
rangemap = "1.5.1"
axum-macros = "0.5.0"
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper", "tokio"] }
rustc-hash = "2.1.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"


[features]
top = ["dep:console-subscriber"]

Expand Down
2 changes: 1 addition & 1 deletion scylla-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ fn main() {
.input("src/proto/command_data.proto")
.input("src/proto/playback_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.out_dir("src/proto")
.run_from_script();
}
4 changes: 2 additions & 2 deletions scylla-server/src/controllers/car_command_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rumqttc::v5::AsyncClient;
use serde::Deserialize;
use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};
use crate::{error::ScyllaError, proto::command_data};

/// the prefix for the calypso topic, so topic of cmd is this plus the key appended on
pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/";
Expand All @@ -34,7 +34,7 @@ pub async fn send_config_command(
);

// the protobuf calypso converts into CAN
let mut payload = CommandData::new();
let mut payload = command_data::CommandData::new();
// empty "data" in the protobuf tells calypso to use the default value
if let Some(data) = data_query.data {
payload.data = data;
Expand Down
4 changes: 3 additions & 1 deletion scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use rangemap::RangeInclusiveMap;
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};

use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle};
use crate::{
error::ScyllaError, proto::playback_data, services::run_service, ClientData, PoolHandle,
};

/// Inserts a file using http multipart
/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded
Expand Down
24 changes: 10 additions & 14 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rustc_hash::FxHashSet;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{broadcast, mpsc};

use tokio::{sync::mpsc::Sender, time::Duration};
use tokio::time::Duration;

use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};
Expand All @@ -15,7 +15,7 @@ pub struct DbHandler {
/// The list of data types seen by this instance, used for when to upsert
datatype_list: FxHashSet<String>,
/// The broadcast channel which provides serial datapoints for processing
receiver: Receiver<ClientData>,
receiver: broadcast::Receiver<ClientData>,
/// The database pool handle
pool: PoolHandle,
/// the queue of data
Expand Down Expand Up @@ -57,7 +57,7 @@ impl DbHandler {
/// Make a new db handler
/// * `recv` - the broadcast reciver of which clientdata will be sent
pub fn new(
receiver: Receiver<ClientData>,
receiver: broadcast::Receiver<ClientData>,
pool: PoolHandle,
upload_interval: u64,
) -> DbHandler {
Expand All @@ -74,7 +74,7 @@ impl DbHandler {
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut batch_queue: Receiver<Vec<ClientData>>,
mut batch_queue: mpsc::Receiver<Vec<ClientData>>,
pool: PoolHandle,
cancel_token: CancellationToken,
) {
Expand Down Expand Up @@ -133,7 +133,7 @@ impl DbHandler {

/// A batching loop that consumes messages but does not upload anything
pub async fn fake_batching_loop(
mut batch_queue: Receiver<Vec<ClientData>>,
mut batch_queue: mpsc::Receiver<Vec<ClientData>>,
cancel_token: CancellationToken,
) {
loop {
Expand Down Expand Up @@ -168,7 +168,7 @@ impl DbHandler {
/// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation
pub async fn handling_loop(
mut self,
data_channel: Sender<Vec<ClientData>>,
data_channel: mpsc::Sender<Vec<ClientData>>,
cancel_token: CancellationToken,
) {
let mut batch_interval = tokio::time::interval(Duration::from_millis(self.upload_interval));
Expand All @@ -181,7 +181,7 @@ impl DbHandler {
data_channel.send(self.data_queue).await.expect("Could not comm data to db thread, shutdown");
break;
},
Some(msg) = self.receiver.recv() => {
Ok(msg) = self.receiver.recv() => {
self.handle_msg(msg, &data_channel).await;
}
_ = batch_interval.tick() => {
Expand All @@ -201,12 +201,8 @@ impl DbHandler {
}

#[instrument(skip(self), level = Level::TRACE)]
async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender<Vec<ClientData>>) {
trace!(
"Mqtt dispatcher: {} of {}",
self.receiver.len(),
self.receiver.max_capacity()
);
async fn handle_msg(&mut self, msg: ClientData, data_channel: &mpsc::Sender<Vec<ClientData>>) {
trace!("Mqtt dispatcher: {}", self.receiver.len(),);

if !self.datatype_list.contains(&msg.name) {
let Ok(mut database) = self.pool.get().await else {
Expand Down
7 changes: 4 additions & 3 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ pub mod services;
pub mod db_handler;
pub mod mqtt_processor;

pub mod metadata_structs;
pub mod socket_handler;

#[allow(non_snake_case)]
pub mod models;
#[allow(non_snake_case)]
pub mod schema;

pub mod command_data;
pub mod playback_data;
pub mod serverdata;
pub mod proto;

pub mod transformers;

Expand Down
41 changes: 34 additions & 7 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use scylla_server::{
data_type_controller, file_insertion_controller, run_controller,
},
services::run_service::{self},
socket_handler::{socket_handler, socket_handler_with_metadata},
RateLimitMode,
};
use scylla_server::{
Expand All @@ -33,7 +34,10 @@ use scylla_server::{
ClientData, RUN_ID,
};
use socketioxide::{extract::SocketRef, SocketIo};
use tokio::{signal, sync::mpsc};
use tokio::{
signal,
sync::{broadcast, mpsc},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tower::ServiceBuilder;
use tower_http::{
Expand Down Expand Up @@ -106,6 +110,10 @@ struct ScyllaArgs {
default_value = "0"
)]
socketio_discard_percent: u8,

/// Whether to disable sending of metadata over the socket to the client
#[arg(long, env = "SCYLLA_SOCKET_DISABLE_METADATA")]
no_metadata: bool,
}

const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
Expand Down Expand Up @@ -180,20 +188,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// channel to pass the mqtt data
// TODO tune buffer size
let (mqtt_send, mqtt_receive) = mpsc::channel::<ClientData>(10000);
let (mqtt_send, mqtt_receive) = broadcast::channel::<ClientData>(10000);

// channel to pass the processed data to the db thread
// channel to pass the processed data to the batch uploading thread
// TODO tune buffer size
let (db_send, db_receive) = mpsc::channel::<Vec<ClientData>>(1000);

// the below two threads need to cancel cleanly to ensure all queued messages are sent. therefore they are part of the a task tracker group.
// create a task tracker and cancellation token
let task_tracker = TaskTracker::new();
let token = CancellationToken::new();

if cli.no_metadata {
task_tracker.spawn(socket_handler(
token.clone(),
mqtt_receive,
cli.socketio_discard_percent,
io,
));
} else {
task_tracker.spawn(socket_handler_with_metadata(
token.clone(),
mqtt_receive,
cli.socketio_discard_percent,
io,
));
}

// spawn the database handler
task_tracker.spawn(
db_handler::DbHandler::new(mqtt_receive, pool.clone(), cli.batch_upsert_time * 1000)
.handling_loop(db_send.clone(), token.clone()),
db_handler::DbHandler::new(
mqtt_send.subscribe(),
pool.clone(),
cli.batch_upsert_time * 1000,
)
.handling_loop(db_send.clone(), token.clone()),
);
// spawn the database inserter, if we have it enabled
if !cli.disable_data_upload {
Expand Down Expand Up @@ -222,14 +251,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Running processor in MQTT (production) mode");
let (recv, opts) = MqttProcessor::new(
mqtt_send,
io,
token.clone(),
MqttProcessorOptions {
mqtt_path: cli.siren_host_url,
initial_run: curr_run.runId,
static_rate_limit_time: cli.static_rate_limit_value,
rate_limit_mode: cli.rate_limit_mode,
upload_ratio: cli.socketio_discard_percent,
},
);
let (client, eventloop) = AsyncClient::new(opts, 600);
Expand Down
65 changes: 65 additions & 0 deletions scylla-server/src/metadata_structs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use ::serde::Serialize;
use chrono::{serde::ts_milliseconds, DateTime, TimeDelta, Utc};

pub const DATA_SOCKET_KEY: &str = "data";

#[derive(Serialize)]
pub struct TimerData {
/// the topic being timed
pub topic: &'static str,
/// the last time the value changed
#[serde(with = "ts_milliseconds")]
pub last_change: DateTime<Utc>,
/// the value at the above time
pub last_value: f32,
}
pub const TIMER_SOCKET_KEY: &str = "timers";
pub const TIMERS_TOPICS: &[&str] = &[
"BMS/Status/Balancing",
"BMS/Status/State",
"BMS/Charging/Control",
];

#[derive(Serialize, PartialEq, Clone, Debug)]
pub enum Node {
Bms,
Dti,
Mpu,
Charger,
}

#[derive(Serialize, Clone)]
pub struct FaultData {
/// the node the fault came from
pub node: Node,
/// the word describing the fault
pub name: String,
/// when the fault occured
#[serde(with = "ts_milliseconds")]
pub occured_at: DateTime<Utc>,
/// when the fault was last seen
#[serde(with = "ts_milliseconds")]
pub last_seen: DateTime<Utc>,
/// whether another fault of the same node and name as occured after this fault
pub expired: bool,
}
pub const FAULT_SOCKET_KEY: &str = "faults";
pub const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8);

pub const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"];
pub const fn map_dti_flt(index: usize) -> Option<&'static str> {
match index {
0 => None,
1 => Some("Overvoltage"),
2 => None,
3 => Some("DRV"),
4 => Some("ABS_Overcurrent"),
5 => Some("CTLR_Overtemp"),
6 => Some("Motor_Overtemp"),
7 => Some("Sensor_wire"),
8 => Some("Sensor_general"),
9 => Some("CAN_command"),
0x0A => Some("Analog_input"),
_ => None,
}
}
Loading

0 comments on commit 00fba67

Please sign in to comment.