From 82c46e06527389a2d15507c905f454b719fc739c Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 20 Sep 2024 00:51:22 -0400 Subject: [PATCH 1/5] initial rate limit --- compose.yml | 7 ++ scylla-server/src/lib.rs | 10 +++ scylla-server/src/main.rs | 23 +++++- .../src/processors/mqtt_processor.rs | 80 +++++++++++++------ 4 files changed, 93 insertions(+), 27 deletions(-) diff --git a/compose.yml b/compose.yml index 473a7ed2..6187267a 100644 --- a/compose.yml +++ b/compose.yml @@ -28,6 +28,13 @@ services: - SOURCE_DATABASE_URL=postgresql://postgres:password@odyssey-timescale:5432/postgres # - PROD_SIREN_HOST_URL=siren:1883 - SCYLLA_PROD=true + #- SCYLLA_SATURATE_BATCH=false + #-SCYLLA_DATA_UPLOAD_DISABLE=false + #-SCYLLA_SIREN_HOST_URL=localhost:1883 + #-SCYLLA_BATCH_UPSERT_TIME=10 + #-SCYLLA_RATE_LIMIT_MODE=none + #-SCYLLA_STATIC_RATE_LIMIT_VALUE=100 + #-SCYLLA_SOCKET_DISCARD_PERCENT=0 - RUST_LOG=warn,scylla_server=debug cpu_shares: 1024 stop_grace_period: 2m diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index f3bd395c..ef595323 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -13,3 +13,13 @@ pub mod serverdata; /// The type descriptor of the database passed to the middlelayer through axum state pub type Database = std::sync::Arc; + +#[derive(clap::ValueEnum, Debug, PartialEq, Copy, Clone, Default)] +#[clap(rename_all = "kebab_case")] +pub enum RateLimitMode { + /// static rate limiting based on a set value + Static, + /// no rate limiting + #[default] + None, +} diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 818aaf0c..49b6edbb 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -20,7 +20,7 @@ use scylla_server::{ db_handler, mock_processor::MockProcessor, mqtt_processor::MqttProcessor, ClientData, }, services::run_service::{self, public_run}, - Database, + Database, RateLimitMode, }; use socketioxide::{extract::SocketRef, SocketIo}; use tokio::{signal, sync::mpsc}; @@ -70,6 +70,25 @@ struct ScyllaArgs { )] batch_upsert_time: u64, + /// The rate limit mode to use + #[arg( + short = 'm', + long, + env = "SCYLLA_RATE_LIMIT_MODE", + default_value_t = RateLimitMode::None, + value_enum, + )] + rate_limit_mode: RateLimitMode, + + /// The static rate limit number to use in ms + #[arg( + short = 'v', + long, + env = "SCYLLA_STATIC_RATE_LIMIT_VALUE", + default_value = "100" + )] + static_rate_limit_value: u64, + /// The percent of messages discarded when sent from the socket #[arg( short = 'd', @@ -189,6 +208,8 @@ async fn main() { curr_run.id, io, token.clone(), + cli.static_rate_limit_value, + cli.rate_limit_mode, ((cli.socketio_discard_percent as f32 / 100.0) * 255.0) as u8, ); let (client, eventloop) = AsyncClient::new(opts, 600); diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 08bc4ca4..c02d44bc 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -1,5 +1,4 @@ -use core::fmt; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json}; use protobuf::Message; @@ -9,13 +8,16 @@ use rumqttc::v5::{ AsyncClient, Event, EventLoop, MqttOptions, }; use socketioxide::SocketIo; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::{ + sync::mpsc::{Receiver, Sender}, + time::Instant, +}; use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn, Level}; use crate::{ controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, - services::run_service, + services::run_service, RateLimitMode, }; use super::ClientData; @@ -27,8 +29,14 @@ pub struct MqttProcessor { curr_run: i32, io: SocketIo, cancel_token: CancellationToken, - /// Upload ratio, below is not uploaded above is uploaded + /// Upload ratio, below is not socket sent above is socket sent upload_ratio: u8, + /// static rate limiter + rate_limiter: HashMap, + /// time to rate limit in ms + rate_limit_time: u64, + /// rate limit mode + rate_limit_mode: RateLimitMode, } impl MqttProcessor { @@ -46,11 +54,13 @@ impl MqttProcessor { initial_run: i32, io: SocketIo, cancel_token: CancellationToken, + static_rate_limit_time: u64, + rate_limit_mode: RateLimitMode, upload_ratio: u8, ) -> (MqttProcessor, MqttOptions) { // create the mqtt client and configure it let mut mqtt_opts = MqttOptions::new( - "ScyllaServer", + format!("ScyllaServer-{:?}", Instant::now()), mqtt_path.split_once(':').expect("Invalid Siren URL").0, mqtt_path .split_once(':') @@ -66,6 +76,8 @@ impl MqttProcessor { .set_session_expiry_interval(Some(u32::MAX)) .set_topic_alias_max(Some(600)); + let rate_map: HashMap = HashMap::new(); + // TODO mess with incoming message cap if db, etc. cannot keep up ( @@ -76,6 +88,9 @@ impl MqttProcessor { io, cancel_token, upload_ratio, + rate_limiter: rate_map, + rate_limit_time: static_rate_limit_time, + rate_limit_mode, }, mqtt_opts, ) @@ -110,11 +125,8 @@ impl MqttProcessor { trace!("Received mqtt message: {:?}", msg); // parse the message into the data and the node name it falls under let msg = match self.parse_msg(msg) { - Ok(msg) => msg, - Err(err) => { - warn!("Message parse error: {:?}", err); - continue; - } + Some(msg) => msg, + None => continue }; latency_ringbuffer.push(chrono::offset::Utc::now().timestamp_millis() - msg.timestamp); self.send_db_msg(msg.clone()).await; @@ -170,26 +182,41 @@ impl MqttProcessor { /// * `msg` - The mqtt message to parse /// returns the ClientData, or the Err of something that can be debug printed #[instrument(skip(self), level = Level::TRACE)] - fn parse_msg(&self, msg: Publish) -> Result { - let topic = std::str::from_utf8(&msg.topic) - .map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))?; + fn parse_msg(&mut self, msg: Publish) -> Option { + let Ok(topic) = std::str::from_utf8(&msg.topic) else { + warn!("Could not parse topic, topic: {:?}", msg.topic); + return None; + }; // ignore command messages, less confusing in logs than just failing to decode protobuf if topic.starts_with(CALYPSO_BIDIR_CMD_PREFIX) { - return Err(format!("Skipping command message: {}", topic)); + debug!("Skipping command message: {}", topic); + return None; } - let split = topic - .split_once('/') - .ok_or(&format!("Could not parse nesting: {:?}", msg.topic))?; + if self.rate_limit_mode == RateLimitMode::Static { + if let Some(old) = self.rate_limiter.get(topic) { + if old.elapsed() < Duration::from_millis(self.rate_limit_time) { + trace!("Static rate limit skipping message with topic {}", topic); + return None; + } else { + self.rate_limiter.insert(topic.to_string(), Instant::now()); + } + } else { + self.rate_limiter.insert(topic.to_string(), Instant::now()); + } + } + + let Some(split) = topic.split_once('/') else { + warn!("Could not parse nesting: {:?}", msg.topic); + return None; + }; // look at data after topic as if we dont have a topic the protobuf is useless anyways - let data = serverdata::ServerData::parse_from_bytes(&msg.payload).map_err(|f| { - format!( - "Could not parse message payload:{:?} error: {}", - msg.topic, f - ) - })?; + let Ok(data) = serverdata::ServerData::parse_from_bytes(&msg.payload) else { + warn!("Could not parse message payload:{:?}", msg.topic); + return None; + }; // get the node and datatype from the topic extracted at the beginning let node = split.0; @@ -225,14 +252,15 @@ impl MqttProcessor { debug!("Timestamp before year 2000: {}", unix_time); let sys_time = chrono::offset::Utc::now().timestamp_millis(); if sys_time < 963014966000 { - return Err("System has no good time, discarding message!".to_string()); + warn!("System has no good time, discarding message!"); + return None; } sys_time } else { unix_time }; - Ok(ClientData { + Some(ClientData { run_id: self.curr_run, name: data_type, unit: data.unit, From 3b6264e986660116ef3d7ffb7c366bde48d055be Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 20 Sep 2024 14:11:21 -0400 Subject: [PATCH 2/5] add compose options, refactor inputs to mqtt processor --- compose.brick.yml | 1 + compose.router.yml | 4 +- compose.yml | 4 +- scylla-server/src/main.rs | 17 ++++++--- .../src/processors/mqtt_processor.rs | 37 ++++++++++++------- 5 files changed, 41 insertions(+), 22 deletions(-) diff --git a/compose.brick.yml b/compose.brick.yml index 40a015ef..c37d95ad 100644 --- a/compose.brick.yml +++ b/compose.brick.yml @@ -2,4 +2,5 @@ services: scylla-server: environment: - SCYLLA_SIREN_HOST_URL=192.168.100.1:1883 + - SCYLLA_RATE_LIMIT_MODE=none diff --git a/compose.router.yml b/compose.router.yml index d65fa5df..7838aa7e 100644 --- a/compose.router.yml +++ b/compose.router.yml @@ -1,9 +1,11 @@ -services: +comservices: scylla-server: depends_on: - siren environment: - SCYLLA_SIREN_HOST_URL=siren:1883 + - SCYLLA_RATE_LIMIT_MODE=static + - SCYLLA_STATIC_RATE_LIMIT_VALUE=100 client: extends: diff --git a/compose.yml b/compose.yml index 6187267a..ec68754f 100644 --- a/compose.yml +++ b/compose.yml @@ -32,8 +32,8 @@ services: #-SCYLLA_DATA_UPLOAD_DISABLE=false #-SCYLLA_SIREN_HOST_URL=localhost:1883 #-SCYLLA_BATCH_UPSERT_TIME=10 - #-SCYLLA_RATE_LIMIT_MODE=none - #-SCYLLA_STATIC_RATE_LIMIT_VALUE=100 + - SCYLLA_RATE_LIMIT_MODE=static + - SCYLLA_STATIC_RATE_LIMIT_VALUE=50 #-SCYLLA_SOCKET_DISCARD_PERCENT=0 - RUST_LOG=warn,scylla_server=debug cpu_shares: 1024 diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 49b6edbb..5d9c8f87 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -17,7 +17,10 @@ use scylla_server::{ }, prisma::PrismaClient, processors::{ - db_handler, mock_processor::MockProcessor, mqtt_processor::MqttProcessor, ClientData, + db_handler, + mock_processor::MockProcessor, + mqtt_processor::{MqttProcessor, MqttProcessorOptions}, + ClientData, }, services::run_service::{self, public_run}, Database, RateLimitMode, @@ -204,13 +207,15 @@ async fn main() { let (recv, opts) = MqttProcessor::new( mqtt_send, new_run_receive, - cli.siren_host_url, - curr_run.id, io, token.clone(), - cli.static_rate_limit_value, - cli.rate_limit_mode, - ((cli.socketio_discard_percent as f32 / 100.0) * 255.0) as u8, + MqttProcessorOptions { + mqtt_path: cli.siren_host_url, + initial_run: curr_run.id, + static_rate_limit_time: cli.static_rate_limit_value, + rate_limit_mode: cli.rate_limit_mode, + upload_ratio: cli.socketio_discard_percent, + }, ); let (client, eventloop) = AsyncClient::new(opts, 600); let client_sharable: Arc = Arc::new(client); diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index c02d44bc..83edf0fe 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -23,6 +23,7 @@ use crate::{ use super::ClientData; use std::borrow::Cow; +/// a mqtt processor pub struct MqttProcessor { channel: Sender, new_run_channel: Receiver, @@ -39,30 +40,40 @@ pub struct MqttProcessor { rate_limit_mode: RateLimitMode, } +/// processor options +pub struct MqttProcessorOptions { + /// URI of the mqtt server + pub mqtt_path: String, + /// the initial run id + pub initial_run: i32, + /// the static rate limit time interval in ms + pub static_rate_limit_time: u64, + /// the rate limit mode + pub rate_limit_mode: RateLimitMode, + /// the upload ratio for the socketio + pub upload_ratio: u8, +} + impl MqttProcessor { /// Creates a new mqtt receiver and socketio and db sender /// * `channel` - The mpsc channel to send the database data to - /// * `mqtt_path` - The mqtt URI, including port, (without the mqtt://) to subscribe to - /// * `db` - The database to store the data in + /// * `new_run_channel` - The channel for new run notifications /// * `io` - The socketio layer to send the data to /// * `cancel_token` - The token which indicates cancellation of the task + /// * `opts` - The mqtt processor options to use /// Returns the instance and options to create a client, which is then used in the process_mqtt loop pub fn new( channel: Sender, new_run_channel: Receiver, - mqtt_path: String, - initial_run: i32, io: SocketIo, cancel_token: CancellationToken, - static_rate_limit_time: u64, - rate_limit_mode: RateLimitMode, - upload_ratio: u8, + opts: MqttProcessorOptions, ) -> (MqttProcessor, MqttOptions) { // create the mqtt client and configure it let mut mqtt_opts = MqttOptions::new( format!("ScyllaServer-{:?}", Instant::now()), - mqtt_path.split_once(':').expect("Invalid Siren URL").0, - mqtt_path + opts.mqtt_path.split_once(':').expect("Invalid Siren URL").0, + opts.mqtt_path .split_once(':') .unwrap() .1 @@ -84,13 +95,13 @@ impl MqttProcessor { MqttProcessor { channel, new_run_channel, - curr_run: initial_run, + curr_run: opts.initial_run, io, cancel_token, - upload_ratio, + upload_ratio: opts.upload_ratio, rate_limiter: rate_map, - rate_limit_time: static_rate_limit_time, - rate_limit_mode, + rate_limit_time: opts.static_rate_limit_time, + rate_limit_mode: opts.rate_limit_mode, }, mqtt_opts, ) From b1622b9d5ded50ec43115f86e5c64b95c7c6cab6 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 20 Sep 2024 14:31:37 -0400 Subject: [PATCH 3/5] fix naming --- compose.router.yml | 2 +- scylla-server/src/processors/mqtt_processor.rs | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/compose.router.yml b/compose.router.yml index 7838aa7e..bc12e99d 100644 --- a/compose.router.yml +++ b/compose.router.yml @@ -1,4 +1,4 @@ -comservices: +services: scylla-server: depends_on: - siren diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 83edf0fe..44c0b6e2 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, SystemTime}, +}; use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json}; use protobuf::Message; @@ -71,7 +75,13 @@ impl MqttProcessor { ) -> (MqttProcessor, MqttOptions) { // create the mqtt client and configure it let mut mqtt_opts = MqttOptions::new( - format!("ScyllaServer-{:?}", Instant::now()), + format!( + "ScyllaServer-{:?}", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() + ), opts.mqtt_path.split_once(':').expect("Invalid Siren URL").0, opts.mqtt_path .split_once(':') From 3e62501418c475404e1a7acecf01afd1878728e8 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 20 Sep 2024 14:45:26 -0400 Subject: [PATCH 4/5] add docs --- compose.tpu.yml | 2 ++ scylla-server/src/processors/mqtt_processor.rs | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/compose.tpu.yml b/compose.tpu.yml index a68135b5..88ea3216 100644 --- a/compose.tpu.yml +++ b/compose.tpu.yml @@ -2,6 +2,8 @@ services: scylla-server: environment: - SCYLLA_SIREN_HOST_URL=host.docker.internal:1883 + - SCYLLA_RATE_LIMIT_MODE=static + - SCYLLA_STATIC_RATE_LIMIT_VALUE=100 extra_hosts: - "host.docker.internal:host-gateway" # for external siren init: false # not supported on buildroot for some reason, further investigation needed diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 44c0b6e2..056b2207 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -27,7 +27,14 @@ use crate::{ use super::ClientData; use std::borrow::Cow; -/// a mqtt processor +/// The chief processor of incoming mqtt data, this handles +/// - mqtt state +/// - reception via mqtt and subsequent parsing +/// - labeling of data with runs +/// - sending data over socket +/// - sending data over the channel to a db handler +/// +/// It also is the main form of rate limiting pub struct MqttProcessor { channel: Sender, new_run_channel: Receiver, @@ -44,7 +51,7 @@ pub struct MqttProcessor { rate_limit_mode: RateLimitMode, } -/// processor options +/// processor options, these are static immutable settings pub struct MqttProcessorOptions { /// URI of the mqtt server pub mqtt_path: String, @@ -215,15 +222,20 @@ impl MqttProcessor { return None; } + // handle static rate limiting mode if self.rate_limit_mode == RateLimitMode::Static { + // check if we have a previous time for a message based on its topic if let Some(old) = self.rate_limiter.get(topic) { + // if the message is less than the rate limit, skip it and do not update the map if old.elapsed() < Duration::from_millis(self.rate_limit_time) { trace!("Static rate limit skipping message with topic {}", topic); return None; } else { + // if the message is past the rate limit, continue with the parsing of it and mark the new time last received self.rate_limiter.insert(topic.to_string(), Instant::now()); } } else { + // here is the first insertion of the topic (the first time we receive the topic in scylla's lifetime) self.rate_limiter.insert(topic.to_string(), Instant::now()); } } From 7bde294e5653c73d5ae009e08f6dae14ac4e0977 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 20 Sep 2024 15:10:17 -0400 Subject: [PATCH 5/5] remove todo --- scylla-server/src/processors/mqtt_processor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 056b2207..78153d98 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -106,8 +106,6 @@ impl MqttProcessor { let rate_map: HashMap = HashMap::new(); - // TODO mess with incoming message cap if db, etc. cannot keep up - ( MqttProcessor { channel,