Skip to content

Commit

Permalink
Merge pull request #222 from Northeastern-Electric-Racing/200-switch-…
Browse files Browse the repository at this point in the history
…new-run-to-static-atomic

#200 switch new run to static atomic
  • Loading branch information
bracyw authored Oct 25, 2024
2 parents eaa9482 + dc25d46 commit 71d79e1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 41 deletions.
20 changes: 9 additions & 11 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::atomic::Ordering;

use axum::{
extract::{Path, State},
Extension, Json,
Json,
};
use tokio::sync::mpsc;
use tracing::warn;

use crate::{
error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, Database,
Expand Down Expand Up @@ -38,16 +38,14 @@ pub async fn get_run_by_id(

/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
pub async fn new_run(
State(db): State<Database>,
Extension(channel): Extension<mpsc::Sender<run_service::public_run::Data>>,
) -> Result<Json<PublicRun>, ScyllaError> {
pub async fn new_run(State(db): State<Database>) -> Result<Json<PublicRun>, ScyllaError> {
let run_data = run_service::create_run(&db, chrono::offset::Utc::now()).await?;

// notify the mqtt receiver a new run has been created
if let Err(err) = channel.send(run_data.clone()).await {
warn!("Could not notify system about an updated run: {}", err);
}
crate::RUN_ID.store(run_data.id, Ordering::Relaxed);
tracing::info!(
"Starting new run with ID: {}",
crate::RUN_ID.load(Ordering::Relaxed)
);

Ok(Json::from(PublicRun::from(&run_data)))
}
5 changes: 5 additions & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicI32;

pub mod controllers;
pub mod error;
pub mod processors;
Expand All @@ -23,3 +25,6 @@ pub enum RateLimitMode {
#[default]
None,
}

// Atomic to keep track the current run id across EVERYTHING (very scary)
pub static RUN_ID: AtomicI32 = AtomicI32::new(-1);
18 changes: 8 additions & 10 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::{atomic::Ordering, Arc},
time::Duration,
};

use axum::{
http::Method,
Expand All @@ -7,6 +10,7 @@ use axum::{
};
use clap::Parser;
use rumqttc::v5::AsyncClient;
use scylla_server::RUN_ID;
use scylla_server::{
controllers::{
self,
Expand All @@ -21,7 +25,7 @@ use scylla_server::{
mqtt_processor::{MqttProcessor, MqttProcessorOptions},
ClientData,
},
services::run_service::{self, public_run},
services::run_service::{self},
Database, RateLimitMode,
};
use socketioxide::{extract::SocketRef, SocketIo};
Expand Down Expand Up @@ -160,9 +164,6 @@ async fn main() {
// TODO tune buffer size
let (db_send, db_receive) = mpsc::channel::<Vec<ClientData>>(1000);

// channel to update the run to a new value
let (new_run_send, new_run_receive) = mpsc::channel::<public_run::Data>(5);

// the below two threads need to cancel cleanly to ensure all queued messages are sent. therefore they are part of the a task tracker group.
// create a task tracker and cancellation token
let task_tracker = TaskTracker::new();
Expand Down Expand Up @@ -200,12 +201,12 @@ async fn main() {
.expect("Could not create initial run!");
debug!("Configuring current run: {:?}", curr_run);

RUN_ID.store(curr_run.id, Ordering::Relaxed);
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
io,
token.clone(),
MqttProcessorOptions {
Expand Down Expand Up @@ -239,10 +240,7 @@ async fn main() {
// RUNS
.route("/runs", get(run_controller::get_all_runs))
.route("/runs/:id", get(run_controller::get_run_by_id))
.route(
"/runs/new",
post(run_controller::new_run).layer(Extension(new_run_send)),
)
.route("/runs/new", post(run_controller::new_run))
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
Expand Down
26 changes: 6 additions & 20 deletions scylla-server/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap,
sync::Arc,
sync::{atomic::Ordering, Arc},
time::{Duration, SystemTime},
};

Expand All @@ -13,16 +13,12 @@ use rumqttc::v5::{
AsyncClient, Event, EventLoop, MqttOptions,
};
use socketioxide::SocketIo;
use tokio::{
sync::mpsc::{Receiver, Sender},
time::Instant,
};
use tokio::{sync::mpsc::Sender, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn, Level};

use crate::{
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata,
services::run_service, RateLimitMode,
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, RateLimitMode,
};

use super::ClientData;
Expand All @@ -37,8 +33,6 @@ use super::ClientData;
/// It also is the main form of rate limiting
pub struct MqttProcessor {
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
curr_run: i32,
io: SocketIo,
cancel_token: CancellationToken,
/// Upload ratio, below is not socket sent above is socket sent
Expand Down Expand Up @@ -68,14 +62,12 @@ pub struct MqttProcessorOptions {
impl MqttProcessor {
/// Creates a new mqtt receiver and socketio and db sender
/// * `channel` - The mpsc channel to send the database data to
/// * `new_run_channel` - The channel for new run notifications
/// * `io` - The socketio layer to send the data to
/// * `cancel_token` - The token which indicates cancellation of the task
/// * `opts` - The mqtt processor options to use
/// Returns the instance and options to create a client, which is then used in the process_mqtt loop
pub fn new(
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
io: SocketIo,
cancel_token: CancellationToken,
opts: MqttProcessorOptions,
Expand Down Expand Up @@ -109,8 +101,6 @@ impl MqttProcessor {
(
MqttProcessor {
channel,
new_run_channel,
curr_run: opts.initial_run,
io,
cancel_token,
upload_ratio: opts.upload_ratio,
Expand Down Expand Up @@ -168,7 +158,7 @@ impl MqttProcessor {
name: "Viewers".to_string(),
node: "Internal".to_string(),
unit: "".to_string(),
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
timestamp: chrono::offset::Utc::now(),
values: vec![sockets.len() as f32]
};
Expand All @@ -189,17 +179,13 @@ impl MqttProcessor {
name: "Latency".to_string(),
node: "Internal".to_string(),
unit: "ms".to_string(),
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
timestamp: chrono::offset::Utc::now(),
values: vec![avg_latency as f32]
};
trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32));
self.send_socket_msg(client_data, &mut upload_counter);
}
Some(new_run) = self.new_run_channel.recv() => {
trace!("New run: {:?}", new_run);
self.curr_run = new_run.id;
}
}
}
}
Expand Down Expand Up @@ -314,7 +300,7 @@ impl MqttProcessor {
};

Some(ClientData {
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
name: data_type,
unit: data.unit,
values: data.values,
Expand Down

0 comments on commit 71d79e1

Please sign in to comment.