Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

223 metadata injector #252

Merged
merged 10 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scylla-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ axum-extra = { version = "0.10.0", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
regex = "1.11.1"
rangemap = "1.5.1"
axum-macros = "0.5.0"
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper", "tokio"] }
rustc-hash = "2.1.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"


[features]
top = ["dep:console-subscriber"]

Expand Down
2 changes: 1 addition & 1 deletion scylla-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ fn main() {
.input("src/proto/command_data.proto")
.input("src/proto/playback_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.out_dir("src/proto")
.run_from_script();
}
4 changes: 2 additions & 2 deletions scylla-server/src/controllers/car_command_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rumqttc::v5::AsyncClient;
use serde::Deserialize;
use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};
use crate::{error::ScyllaError, proto::command_data};

/// the prefix for the calypso topic, so topic of cmd is this plus the key appended on
pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/";
Expand All @@ -34,7 +34,7 @@ pub async fn send_config_command(
);

// the protobuf calypso converts into CAN
let mut payload = CommandData::new();
let mut payload = command_data::CommandData::new();
// empty "data" in the protobuf tells calypso to use the default value
if let Some(data) = data_query.data {
payload.data = data;
Expand Down
4 changes: 3 additions & 1 deletion scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use rangemap::RangeInclusiveMap;
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};

use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle};
use crate::{
error::ScyllaError, proto::playback_data, services::run_service, ClientData, PoolHandle,
};

/// Inserts a file using http multipart
/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded
Expand Down
24 changes: 10 additions & 14 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rustc_hash::FxHashSet;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{broadcast, mpsc};

use tokio::{sync::mpsc::Sender, time::Duration};
use tokio::time::Duration;

use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};
Expand All @@ -15,7 +15,7 @@ pub struct DbHandler {
/// The list of data types seen by this instance, used for when to upsert
datatype_list: FxHashSet<String>,
/// The broadcast channel which provides serial datapoints for processing
receiver: Receiver<ClientData>,
receiver: broadcast::Receiver<ClientData>,
/// The database pool handle
pool: PoolHandle,
/// the queue of data
Expand Down Expand Up @@ -57,7 +57,7 @@ impl DbHandler {
/// Make a new db handler
/// * `recv` - the broadcast reciver of which clientdata will be sent
pub fn new(
receiver: Receiver<ClientData>,
receiver: broadcast::Receiver<ClientData>,
pool: PoolHandle,
upload_interval: u64,
) -> DbHandler {
Expand All @@ -74,7 +74,7 @@ impl DbHandler {
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut batch_queue: Receiver<Vec<ClientData>>,
mut batch_queue: mpsc::Receiver<Vec<ClientData>>,
pool: PoolHandle,
cancel_token: CancellationToken,
) {
Expand Down Expand Up @@ -133,7 +133,7 @@ impl DbHandler {

/// A batching loop that consumes messages but does not upload anything
pub async fn fake_batching_loop(
mut batch_queue: Receiver<Vec<ClientData>>,
mut batch_queue: mpsc::Receiver<Vec<ClientData>>,
cancel_token: CancellationToken,
) {
loop {
Expand Down Expand Up @@ -168,7 +168,7 @@ impl DbHandler {
/// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation
pub async fn handling_loop(
mut self,
data_channel: Sender<Vec<ClientData>>,
data_channel: mpsc::Sender<Vec<ClientData>>,
cancel_token: CancellationToken,
) {
let mut batch_interval = tokio::time::interval(Duration::from_millis(self.upload_interval));
Expand All @@ -181,7 +181,7 @@ impl DbHandler {
data_channel.send(self.data_queue).await.expect("Could not comm data to db thread, shutdown");
break;
},
Some(msg) = self.receiver.recv() => {
Ok(msg) = self.receiver.recv() => {
self.handle_msg(msg, &data_channel).await;
}
_ = batch_interval.tick() => {
Expand All @@ -201,12 +201,8 @@ impl DbHandler {
}

#[instrument(skip(self), level = Level::TRACE)]
async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender<Vec<ClientData>>) {
trace!(
"Mqtt dispatcher: {} of {}",
self.receiver.len(),
self.receiver.max_capacity()
);
async fn handle_msg(&mut self, msg: ClientData, data_channel: &mpsc::Sender<Vec<ClientData>>) {
trace!("Mqtt dispatcher: {}", self.receiver.len(),);

if !self.datatype_list.contains(&msg.name) {
let Ok(mut database) = self.pool.get().await else {
Expand Down
7 changes: 4 additions & 3 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ pub mod services;
pub mod db_handler;
pub mod mqtt_processor;

pub mod metadata_structs;
pub mod socket_handler;

#[allow(non_snake_case)]
pub mod models;
#[allow(non_snake_case)]
pub mod schema;

pub mod command_data;
pub mod playback_data;
pub mod serverdata;
pub mod proto;

pub mod transformers;

Expand Down
41 changes: 34 additions & 7 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use scylla_server::{
data_type_controller, file_insertion_controller, run_controller,
},
services::run_service::{self},
socket_handler::{socket_handler, socket_handler_with_metadata},
RateLimitMode,
};
use scylla_server::{
Expand All @@ -33,7 +34,10 @@ use scylla_server::{
ClientData, RUN_ID,
};
use socketioxide::{extract::SocketRef, SocketIo};
use tokio::{signal, sync::mpsc};
use tokio::{
signal,
sync::{broadcast, mpsc},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tower::ServiceBuilder;
use tower_http::{
Expand Down Expand Up @@ -106,6 +110,10 @@ struct ScyllaArgs {
default_value = "0"
)]
socketio_discard_percent: u8,

/// Whether to disable sending of metadata over the socket to the client
#[arg(long, env = "SCYLLA_SOCKET_DISABLE_METADATA")]
no_metadata: bool,
}

const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
Expand Down Expand Up @@ -180,20 +188,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// channel to pass the mqtt data
// TODO tune buffer size
let (mqtt_send, mqtt_receive) = mpsc::channel::<ClientData>(10000);
let (mqtt_send, mqtt_receive) = broadcast::channel::<ClientData>(10000);

// channel to pass the processed data to the db thread
// channel to pass the processed data to the batch uploading thread
// TODO tune buffer size
let (db_send, db_receive) = mpsc::channel::<Vec<ClientData>>(1000);

// the below two threads need to cancel cleanly to ensure all queued messages are sent. therefore they are part of the a task tracker group.
// create a task tracker and cancellation token
let task_tracker = TaskTracker::new();
let token = CancellationToken::new();

if cli.no_metadata {
task_tracker.spawn(socket_handler(
token.clone(),
mqtt_receive,
cli.socketio_discard_percent,
io,
));
} else {
task_tracker.spawn(socket_handler_with_metadata(
token.clone(),
mqtt_receive,
cli.socketio_discard_percent,
io,
));
}

// spawn the database handler
task_tracker.spawn(
db_handler::DbHandler::new(mqtt_receive, pool.clone(), cli.batch_upsert_time * 1000)
.handling_loop(db_send.clone(), token.clone()),
db_handler::DbHandler::new(
mqtt_send.subscribe(),
pool.clone(),
cli.batch_upsert_time * 1000,
)
.handling_loop(db_send.clone(), token.clone()),
);
// spawn the database inserter, if we have it enabled
if !cli.disable_data_upload {
Expand Down Expand Up @@ -222,14 +251,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Running processor in MQTT (production) mode");
let (recv, opts) = MqttProcessor::new(
mqtt_send,
io,
token.clone(),
MqttProcessorOptions {
mqtt_path: cli.siren_host_url,
initial_run: curr_run.runId,
static_rate_limit_time: cli.static_rate_limit_value,
rate_limit_mode: cli.rate_limit_mode,
upload_ratio: cli.socketio_discard_percent,
},
);
let (client, eventloop) = AsyncClient::new(opts, 600);
Expand Down
65 changes: 65 additions & 0 deletions scylla-server/src/metadata_structs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use ::serde::Serialize;
use chrono::{serde::ts_milliseconds, DateTime, TimeDelta, Utc};

pub const DATA_SOCKET_KEY: &str = "data";

#[derive(Serialize)]
pub struct TimerData {
/// the topic being timed
pub topic: &'static str,
/// the last time the value changed
#[serde(with = "ts_milliseconds")]
pub last_change: DateTime<Utc>,
/// the value at the above time
pub last_value: f32,
}
pub const TIMER_SOCKET_KEY: &str = "timers";
pub const TIMERS_TOPICS: &[&str] = &[
"BMS/Status/Balancing",
"BMS/Status/State",
"BMS/Charging/Control",
];

#[derive(Serialize, PartialEq, Clone, Debug)]
pub enum Node {
Bms,
Dti,
Mpu,
Charger,
}

#[derive(Serialize, Clone)]
pub struct FaultData {
/// the node the fault came from
pub node: Node,
/// the word describing the fault
pub name: String,
/// when the fault occured
#[serde(with = "ts_milliseconds")]
pub occured_at: DateTime<Utc>,
/// when the fault was last seen
#[serde(with = "ts_milliseconds")]
pub last_seen: DateTime<Utc>,
/// whether another fault of the same node and name as occured after this fault
pub expired: bool,
}
pub const FAULT_SOCKET_KEY: &str = "faults";
pub const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8);

pub const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"];
pub const fn map_dti_flt(index: usize) -> Option<&'static str> {
match index {
0 => None,
1 => Some("Overvoltage"),
2 => None,
3 => Some("DRV"),
4 => Some("ABS_Overcurrent"),
5 => Some("CTLR_Overtemp"),
6 => Some("Motor_Overtemp"),
7 => Some("Sensor_wire"),
8 => Some("Sensor_general"),
9 => Some("CAN_command"),
0x0A => Some("Analog_input"),
_ => None,
}
}
Loading