Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#200 switch new run to static atomic #222

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::atomic::Ordering;

use axum::{
extract::{Path, State},
Extension, Json,
Json,
};
use prisma_client_rust::chrono;
use tokio::sync::mpsc;
use tracing::warn;

use crate::{
error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, Database,
Expand Down Expand Up @@ -39,17 +39,11 @@ pub async fn get_run_by_id(

/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
pub async fn new_run(
State(db): State<Database>,
Extension(channel): Extension<mpsc::Sender<run_service::public_run::Data>>,
) -> Result<Json<PublicRun>, ScyllaError> {
pub async fn new_run(State(db): State<Database>) -> Result<Json<PublicRun>, ScyllaError> {
let run_data =
run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()).await?;

// notify the mqtt receiver a new run has been created
if let Err(err) = channel.send(run_data.clone()).await {
warn!("Could not notify system about an updated run: {}", err);
}
crate::RUN_ID.store(run_data.id, Ordering::Relaxed);
bracyw marked this conversation as resolved.
Show resolved Hide resolved

Ok(Json::from(PublicRun::from(&run_data)))
}
5 changes: 5 additions & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicI32;

pub mod controllers;
pub mod error;
pub mod processors;
Expand All @@ -23,3 +25,6 @@ pub enum RateLimitMode {
#[default]
None,
}

// Atomic to keep track the current run id across EVERYTHING (very scary)
pub static RUN_ID: AtomicI32 = AtomicI32::new(-1);
18 changes: 8 additions & 10 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::{atomic::Ordering, Arc},
time::Duration,
};

use axum::{
http::Method,
Expand All @@ -8,6 +11,7 @@ use axum::{
use clap::Parser;
use prisma_client_rust::chrono;
use rumqttc::v5::AsyncClient;
use scylla_server::RUN_ID;
use scylla_server::{
controllers::{
self,
Expand All @@ -22,7 +26,7 @@ use scylla_server::{
mqtt_processor::{MqttProcessor, MqttProcessorOptions},
ClientData,
},
services::run_service::{self, public_run},
services::run_service::{self},
Database, RateLimitMode,
};
use socketioxide::{extract::SocketRef, SocketIo};
Expand Down Expand Up @@ -161,9 +165,6 @@ async fn main() {
// TODO tune buffer size
let (db_send, db_receive) = mpsc::channel::<Vec<ClientData>>(1000);

// channel to update the run to a new value
let (new_run_send, new_run_receive) = mpsc::channel::<public_run::Data>(5);

// the below two threads need to cancel cleanly to ensure all queued messages are sent. therefore they are part of the a task tracker group.
// create a task tracker and cancellation token
let task_tracker = TaskTracker::new();
Expand Down Expand Up @@ -201,12 +202,12 @@ async fn main() {
.expect("Could not create initial run!");
debug!("Configuring current run: {:?}", curr_run);

RUN_ID.store(curr_run.id, Ordering::Relaxed);
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
io,
token.clone(),
MqttProcessorOptions {
Expand Down Expand Up @@ -240,10 +241,7 @@ async fn main() {
// RUNS
.route("/runs", get(run_controller::get_all_runs))
.route("/runs/:id", get(run_controller::get_run_by_id))
.route(
"/runs/new",
post(run_controller::new_run).layer(Extension(new_run_send)),
)
.route("/runs/new", post(run_controller::new_run))
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
Expand Down
27 changes: 7 additions & 20 deletions scylla-server/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap,
sync::Arc,
sync::{atomic::Ordering, Arc},
time::{Duration, SystemTime},
};

Expand All @@ -12,16 +12,12 @@ use rumqttc::v5::{
AsyncClient, Event, EventLoop, MqttOptions,
};
use socketioxide::SocketIo;
use tokio::{
sync::mpsc::{Receiver, Sender},
time::Instant,
};
use tokio::{sync::mpsc::Sender, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn, Level};

use crate::{
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata,
services::run_service, RateLimitMode,
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, RateLimitMode,
};

use super::ClientData;
Expand All @@ -37,8 +33,6 @@ use std::borrow::Cow;
/// It also is the main form of rate limiting
pub struct MqttProcessor {
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
curr_run: i32,
io: SocketIo,
cancel_token: CancellationToken,
/// Upload ratio, below is not socket sent above is socket sent
Expand Down Expand Up @@ -68,14 +62,13 @@ pub struct MqttProcessorOptions {
impl MqttProcessor {
/// Creates a new mqtt receiver and socketio and db sender
/// * `channel` - The mpsc channel to send the database data to
/// * `new_run_channel` - The channel for new run notifications
/// * `new_run_id` - The changed atomic run id.
bracyw marked this conversation as resolved.
Show resolved Hide resolved
/// * `io` - The socketio layer to send the data to
/// * `cancel_token` - The token which indicates cancellation of the task
/// * `opts` - The mqtt processor options to use
/// Returns the instance and options to create a client, which is then used in the process_mqtt loop
pub fn new(
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
io: SocketIo,
cancel_token: CancellationToken,
opts: MqttProcessorOptions,
Expand Down Expand Up @@ -109,8 +102,6 @@ impl MqttProcessor {
(
MqttProcessor {
channel,
new_run_channel,
curr_run: opts.initial_run,
io,
cancel_token,
upload_ratio: opts.upload_ratio,
Expand Down Expand Up @@ -168,7 +159,7 @@ impl MqttProcessor {
name: "Viewers".to_string(),
node: "Internal".to_string(),
unit: "".to_string(),
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![sockets.len().to_string()]
};
Expand All @@ -189,17 +180,13 @@ impl MqttProcessor {
name: "Latency".to_string(),
node: "Internal".to_string(),
unit: "ms".to_string(),
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![avg_latency.to_string()]
};
trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&"n/a".to_string()));
self.send_socket_msg(client_data, &mut upload_counter);
}
Some(new_run) = self.new_run_channel.recv() => {
trace!("New run: {:?}", new_run);
self.curr_run = new_run.id;
}
}
}
}
Expand Down Expand Up @@ -292,7 +279,7 @@ impl MqttProcessor {
};

Some(ClientData {
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
name: data_type,
unit: data.unit,
values: data.value,
Expand Down
Loading