Skip to content

Commit

Permalink
#200 - switch to global AtomicI32
Browse files Browse the repository at this point in the history
  • Loading branch information
bracyw committed Oct 6, 2024
1 parent 24d5a86 commit 5de9cc1
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 27 deletions.
13 changes: 4 additions & 9 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::atomic::Ordering;

use axum::{
extract::{Path, State},
Extension, Json,
Json,
};
use prisma_client_rust::chrono;
use tokio::sync::mpsc;
use tracing::warn;

use crate::{
error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, Database,
Expand Down Expand Up @@ -41,14 +39,11 @@ pub async fn get_run_by_id(

/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
pub async fn new_run(
State(db): State<Database>,
Extension(run_id): Extension<AtomicI32>,
) -> Result<Json<PublicRun>, ScyllaError> {
pub async fn new_run(State(db): State<Database>) -> Result<Json<PublicRun>, ScyllaError> {
let run_data =
run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()).await?;

run_id.store(run_data.id, Ordering::Relaxed);
crate::RUN_ID.store(run_data.id, Ordering::Relaxed);

Ok(Json::from(PublicRun::from(&run_data)))
}
5 changes: 5 additions & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicI32;

pub mod controllers;
pub mod error;
pub mod processors;
Expand All @@ -23,3 +25,6 @@ pub enum RateLimitMode {
#[default]
None,
}

// Atomic to keep track the current run id across EVERYTHING (very scary)
pub static RUN_ID: AtomicI32 = AtomicI32::new(-1);
10 changes: 3 additions & 7 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::{
sync::{
atomic::{AtomicI32, Ordering},
Arc,
},
sync::{atomic::Ordering, Arc},
time::Duration,
};

Expand All @@ -14,6 +11,7 @@ use axum::{
use clap::Parser;
use prisma_client_rust::chrono;
use rumqttc::v5::AsyncClient;
use scylla_server::RUN_ID;
use scylla_server::{
controllers::{
self,
Expand All @@ -28,7 +26,7 @@ use scylla_server::{
mqtt_processor::{MqttProcessor, MqttProcessorOptions},
ClientData,
},
services::run_service::{self, public_run},
services::run_service::{self},
Database, RateLimitMode,
};
use socketioxide::{extract::SocketRef, SocketIo};
Expand Down Expand Up @@ -108,8 +106,6 @@ struct ScyllaArgs {
socketio_discard_percent: u8,
}

static RUN_ID: AtomicI32 = AtomicI32::new(-1);

#[tokio::main]
async fn main() {
let cli = ScyllaArgs::parse();
Expand Down
17 changes: 6 additions & 11 deletions scylla-server/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap,
sync::{atomic::AtomicI32, Arc},
sync::{atomic::Ordering, Arc},
time::{Duration, SystemTime},
};

Expand All @@ -12,16 +12,12 @@ use rumqttc::v5::{
AsyncClient, Event, EventLoop, MqttOptions,
};
use socketioxide::SocketIo;
use tokio::{
sync::mpsc::{Receiver, Sender},
time::Instant,
};
use tokio::{sync::mpsc::Sender, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn, Level};

use crate::{
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata,
services::run_service, RateLimitMode,
controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, RateLimitMode,
};

use super::ClientData;
Expand Down Expand Up @@ -106,7 +102,6 @@ impl MqttProcessor {
(
MqttProcessor {
channel,
curr_run: opts.initial_run,
io,
cancel_token,
upload_ratio: opts.upload_ratio,
Expand Down Expand Up @@ -164,7 +159,7 @@ impl MqttProcessor {
name: "Viewers".to_string(),
node: "Internal".to_string(),
unit: "".to_string(),
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![sockets.len().to_string()]
};
Expand All @@ -185,7 +180,7 @@ impl MqttProcessor {
name: "Latency".to_string(),
node: "Internal".to_string(),
unit: "ms".to_string(),
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![avg_latency.to_string()]
};
Expand Down Expand Up @@ -284,7 +279,7 @@ impl MqttProcessor {
};

Some(ClientData {
run_id: self.curr_run,
run_id: crate::RUN_ID.load(Ordering::Relaxed),
name: data_type,
unit: data.unit,
values: data.value,
Expand Down

0 comments on commit 5de9cc1

Please sign in to comment.