diff --git a/scylla-server/src/controllers/run_controller.rs b/scylla-server/src/controllers/run_controller.rs index e4df1e95..ce6d11dd 100644 --- a/scylla-server/src/controllers/run_controller.rs +++ b/scylla-server/src/controllers/run_controller.rs @@ -1,9 +1,9 @@ +use std::sync::atomic::Ordering; + use axum::{ extract::{Path, State}, - Extension, Json, + Json, }; -use tokio::sync::mpsc; -use tracing::warn; use crate::{ error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, Database, @@ -38,16 +38,14 @@ pub async fn get_run_by_id( /// create a new run with an auto-incremented ID /// note the new run must be updated so the channel passed in notifies the data processor to use the new run -pub async fn new_run( - State(db): State, - Extension(channel): Extension>, -) -> Result, ScyllaError> { +pub async fn new_run(State(db): State) -> Result, ScyllaError> { let run_data = run_service::create_run(&db, chrono::offset::Utc::now()).await?; - // notify the mqtt receiver a new run has been created - if let Err(err) = channel.send(run_data.clone()).await { - warn!("Could not notify system about an updated run: {}", err); - } + crate::RUN_ID.store(run_data.id, Ordering::Relaxed); + tracing::info!( + "Starting new run with ID: {}", + crate::RUN_ID.load(Ordering::Relaxed) + ); Ok(Json::from(PublicRun::from(&run_data))) } diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index ef595323..70794d70 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicI32; + pub mod controllers; pub mod error; pub mod processors; @@ -23,3 +25,6 @@ pub enum RateLimitMode { #[default] None, } + +// Atomic to keep track the current run id across EVERYTHING (very scary) +pub static RUN_ID: AtomicI32 = AtomicI32::new(-1); diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 96326ed8..002841b6 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{atomic::Ordering, Arc}, + time::Duration, +}; use axum::{ http::Method, @@ -7,6 +10,7 @@ use axum::{ }; use clap::Parser; use rumqttc::v5::AsyncClient; +use scylla_server::RUN_ID; use scylla_server::{ controllers::{ self, @@ -21,7 +25,7 @@ use scylla_server::{ mqtt_processor::{MqttProcessor, MqttProcessorOptions}, ClientData, }, - services::run_service::{self, public_run}, + services::run_service::{self}, Database, RateLimitMode, }; use socketioxide::{extract::SocketRef, SocketIo}; @@ -160,9 +164,6 @@ async fn main() { // TODO tune buffer size let (db_send, db_receive) = mpsc::channel::>(1000); - // channel to update the run to a new value - let (new_run_send, new_run_receive) = mpsc::channel::(5); - // the below two threads need to cancel cleanly to ensure all queued messages are sent. therefore they are part of the a task tracker group. // create a task tracker and cancellation token let task_tracker = TaskTracker::new(); @@ -200,12 +201,12 @@ async fn main() { .expect("Could not create initial run!"); debug!("Configuring current run: {:?}", curr_run); + RUN_ID.store(curr_run.id, Ordering::Relaxed); // run prod if this isnt present // create and spawn the mqtt processor info!("Running processor in MQTT (production) mode"); let (recv, opts) = MqttProcessor::new( mqtt_send, - new_run_receive, io, token.clone(), MqttProcessorOptions { @@ -239,10 +240,7 @@ async fn main() { // RUNS .route("/runs", get(run_controller::get_all_runs)) .route("/runs/:id", get(run_controller::get_run_by_id)) - .route( - "/runs/new", - post(run_controller::new_run).layer(Extension(new_run_send)), - ) + .route("/runs/new", post(run_controller::new_run)) // SYSTEMS .route("/systems", get(system_controller::get_all_systems)) // CONFIG diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 0f226646..604e876f 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - sync::Arc, + sync::{atomic::Ordering, Arc}, time::{Duration, SystemTime}, }; @@ -13,16 +13,12 @@ use rumqttc::v5::{ AsyncClient, Event, EventLoop, MqttOptions, }; use socketioxide::SocketIo; -use tokio::{ - sync::mpsc::{Receiver, Sender}, - time::Instant, -}; +use tokio::{sync::mpsc::Sender, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn, Level}; use crate::{ - controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, - services::run_service, RateLimitMode, + controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, RateLimitMode, }; use super::ClientData; @@ -37,8 +33,6 @@ use super::ClientData; /// It also is the main form of rate limiting pub struct MqttProcessor { channel: Sender, - new_run_channel: Receiver, - curr_run: i32, io: SocketIo, cancel_token: CancellationToken, /// Upload ratio, below is not socket sent above is socket sent @@ -68,14 +62,12 @@ pub struct MqttProcessorOptions { impl MqttProcessor { /// Creates a new mqtt receiver and socketio and db sender /// * `channel` - The mpsc channel to send the database data to - /// * `new_run_channel` - The channel for new run notifications /// * `io` - The socketio layer to send the data to /// * `cancel_token` - The token which indicates cancellation of the task /// * `opts` - The mqtt processor options to use /// Returns the instance and options to create a client, which is then used in the process_mqtt loop pub fn new( channel: Sender, - new_run_channel: Receiver, io: SocketIo, cancel_token: CancellationToken, opts: MqttProcessorOptions, @@ -109,8 +101,6 @@ impl MqttProcessor { ( MqttProcessor { channel, - new_run_channel, - curr_run: opts.initial_run, io, cancel_token, upload_ratio: opts.upload_ratio, @@ -168,7 +158,7 @@ impl MqttProcessor { name: "Viewers".to_string(), node: "Internal".to_string(), unit: "".to_string(), - run_id: self.curr_run, + run_id: crate::RUN_ID.load(Ordering::Relaxed), timestamp: chrono::offset::Utc::now(), values: vec![sockets.len() as f32] }; @@ -189,17 +179,13 @@ impl MqttProcessor { name: "Latency".to_string(), node: "Internal".to_string(), unit: "ms".to_string(), - run_id: self.curr_run, + run_id: crate::RUN_ID.load(Ordering::Relaxed), timestamp: chrono::offset::Utc::now(), values: vec![avg_latency as f32] }; trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32)); self.send_socket_msg(client_data, &mut upload_counter); } - Some(new_run) = self.new_run_channel.recv() => { - trace!("New run: {:?}", new_run); - self.curr_run = new_run.id; - } } } } @@ -314,7 +300,7 @@ impl MqttProcessor { }; Some(ClientData { - run_id: self.curr_run, + run_id: crate::RUN_ID.load(Ordering::Relaxed), name: data_type, unit: data.unit, values: data.values,