From 2694ffa5e71969bb9f3411c35b84525c669e1b4a Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sat, 23 Nov 2024 00:41:00 -0500 Subject: [PATCH] operation file insertion code --- scylla-server/.gitignore | 1 + scylla-server/Cargo.lock | 46 ++++++++ scylla-server/Cargo.toml | 4 +- scylla-server/build.rs | 1 + .../controllers/file_insertion_controller.rs | 101 ++++++++++++++++++ scylla-server/src/controllers/mod.rs | 2 + scylla-server/src/lib.rs | 1 + scylla-server/src/main.rs | 6 +- scylla-server/src/mod.rs | 1 + scylla-server/src/mqtt_processor.rs | 2 +- scylla-server/src/proto/playback_data.proto | 12 +++ scylla-server/src/services/data_service.rs | 3 +- scylla-server/src/services/run_service.rs | 2 +- 13 files changed, 177 insertions(+), 5 deletions(-) create mode 100644 scylla-server/src/controllers/file_insertion_controller.rs create mode 100644 scylla-server/src/proto/playback_data.proto diff --git a/scylla-server/.gitignore b/scylla-server/.gitignore index a9a01cca..02186a82 100644 --- a/scylla-server/.gitignore +++ b/scylla-server/.gitignore @@ -23,3 +23,4 @@ prisma.rs # protobuf serverdata.rs command_data.rs +playback_data.rs \ No newline at end of file diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index 874a978e..ba5e4c84 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -200,6 +200,7 @@ dependencies = [ "matchit 0.7.3", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -276,6 +277,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.67", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -640,6 +652,15 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "engineioxide" version = "0.14.0" @@ -1223,6 +1244,23 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "nom" version = "7.1.3" @@ -1536,6 +1574,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rangemap" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60fcc7d6849342eff22c4350c8b9a989ee8ceabc4b481253e8946b9fe83d684" + [[package]] name = "redox_syscall" version = "0.5.2" @@ -1745,6 +1789,7 @@ version = "0.0.1" dependencies = [ "axum 0.7.5", "axum-extra", + "axum-macros", "chrono", "clap", "console-subscriber", @@ -1755,6 +1800,7 @@ dependencies = [ "protobuf", "protobuf-codegen", "rand", + "rangemap", "ringbuffer", "rumqttc", "serde", diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index 47229e1a..3c889b5b 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -12,7 +12,7 @@ serde = "1.0.203" protobuf-codegen = "3.5.1" protobuf = "3.5.1" tokio = { version = "1.38.0", features = ["full", "tracing"] } -axum = "0.7.5" +axum = { version = "0.7.5", features = ["multipart"] } tower = { version = "0.4.13", features = ["timeout"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } socketioxide = { version = "0.14.0", features = ["tracing"] } @@ -28,6 +28,8 @@ axum-extra = { version = "0.9.3", features = ["query"] } chrono = { version = "0.4.38", features = ["serde"] } serde_json = "1.0.128" diesel_migrations = { version = "2.2.0", features = ["postgres"] } +rangemap = "1.5.1" +axum-macros = "0.4.2" [features] top = ["dep:console-subscriber"] diff --git a/scylla-server/build.rs b/scylla-server/build.rs index 44b9825e..3c0bfc5c 100644 --- a/scylla-server/build.rs +++ b/scylla-server/build.rs @@ -9,6 +9,7 @@ fn main() { // Inputs must reside in some of include paths. .input("src/proto/serverdata.proto") .input("src/proto/command_data.proto") + .input("src/proto/playback_data.proto") // Specify output directory relative to Cargo output directory. .out_dir("src") .run_from_script(); diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs new file mode 100644 index 00000000..9c4f0af9 --- /dev/null +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -0,0 +1,101 @@ +use axum::extract::{Multipart, State}; +use axum_macros::debug_handler; +use chrono::DateTime; +use protobuf::CodedInputStream; +use rangemap::RangeInclusiveMap; +use tokio_util::bytes::Buf; +use tracing::{debug, info, trace}; + +use crate::{ + error::ScyllaError, + playback_data, + services::{data_service, run_service}, + ClientData, PoolHandle, +}; + +// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails +#[debug_handler] +pub async fn insert_file( + State(pool): State, + mut multipart: Multipart, +) -> Result { + // create a run ID cache + let mut db = pool.get()?; + debug!("Warming up run ID map!"); + let mut run_iter = run_service::get_all_runs(&mut db) + .await? + .into_iter() + .map(|f| (f.id, f.time.timestamp_micros() as u64)) + .peekable(); + let mut run_rng: RangeInclusiveMap = RangeInclusiveMap::new(); + // this actual formulates the list, where keys are ranges of times (us) and the values are the run IDs + while let Some(it) = run_iter.next() { + match run_iter.peek() { + Some(next) => { + run_rng.insert(it.1..=next.1, it.0); + } + // if this is the last item in the list + None => { + run_rng.insert(it.1..=std::u64::MAX, it.0); + continue; + } + } + } + + // iterate through all files + while let Some(field) = multipart.next_field().await.unwrap() { + // round up all of the protobuf segments as a giant list + let mut data = field.bytes().await.unwrap().reader(); + let mut insertable_data: Vec = vec![]; + { + // this cannot be used across an await, hence scoped + let mut stream = CodedInputStream::new(&mut data); + loop { + match stream.read_message::() { + Ok(a) => { + trace!("Decoded file msg: {}", a); + insertable_data.push(a); + } + Err(e) => { + trace!("Exiting from read loop {}", e); + break; + } + } + } + } + + debug!("Mapping data to ClientData type, with inferred run IDs!"); + let new_data: Vec = insertable_data + .into_iter() + .map(|f| match run_rng.get(&f.time_us) { + Some(a) => ClientData { + run_id: *a, + name: f.topic.clone(), + unit: f.unit, + values: f.values, + timestamp: DateTime::from_timestamp_micros(f.time_us as i64).unwrap(), + node: f.topic.split_once('/').unwrap_or_default().0.to_owned(), + }, + None => ClientData { + run_id: -1, + name: f.topic.clone(), + unit: f.unit, + values: f.values, + timestamp: DateTime::from_timestamp_micros(f.time_us as i64).unwrap(), + node: f.topic.split_once('/').unwrap_or_default().0.to_owned(), + }, + }) + .collect(); + info!( + "Inserting {} points, {} of which have no run ID (-1).", + new_data.len(), + new_data + .iter() + .filter(|x| x.run_id == -1) + .collect::>() + .len() + ); + data_service::add_many(&mut db, new_data).await?; + } + Ok("Successfully inserted all!".to_string()) +} diff --git a/scylla-server/src/controllers/mod.rs b/scylla-server/src/controllers/mod.rs index acc5fb63..9c77cd4f 100644 --- a/scylla-server/src/controllers/mod.rs +++ b/scylla-server/src/controllers/mod.rs @@ -3,3 +3,5 @@ pub mod car_command_controller; pub mod data_controller; pub mod data_type_controller; pub mod run_controller; + +pub mod file_insertion_controller; \ No newline at end of file diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index 5ea5f7db..9b8954be 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -14,6 +14,7 @@ pub mod schema; pub mod command_data; pub mod serverdata; +pub mod playback_data; pub mod transformers; diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 7241b6b9..a3667f7b 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -4,6 +4,7 @@ use std::{ }; use axum::{ + extract::DefaultBodyLimit, http::Method, routing::{get, post}, Extension, Router, @@ -20,7 +21,7 @@ use scylla_server::{ controllers::{ self, car_command_controller::{self}, - data_type_controller, run_controller, + data_type_controller, file_insertion_controller, run_controller, }, services::run_service::{self}, PoolHandle, RateLimitMode, @@ -233,6 +234,9 @@ async fn main() { "/config/set/:configKey", post(car_command_controller::send_config_command).layer(Extension(client_sharable)), ) + // FILE INSERT + .route("/insert/file", post(file_insertion_controller::insert_file)) + .layer(DefaultBodyLimit::disable()) // for CORS handling .layer( CorsLayer::new() diff --git a/scylla-server/src/mod.rs b/scylla-server/src/mod.rs index b7017c61..dbc1df8b 100644 --- a/scylla-server/src/mod.rs +++ b/scylla-server/src/mod.rs @@ -1,4 +1,5 @@ // @generated pub mod command_data; +pub mod playback_data; pub mod serverdata; diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index a48de461..2255b6a5 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -238,7 +238,7 @@ impl MqttProcessor { // get the node and datatype from the topic extracted at the beginning let node = split.0; - let data_type = String::from(split.1); + let data_type = String::from(topic); // extract the unix time // levels of time priority diff --git a/scylla-server/src/proto/playback_data.proto b/scylla-server/src/proto/playback_data.proto new file mode 100644 index 00000000..42609973 --- /dev/null +++ b/scylla-server/src/proto/playback_data.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package playbackdata.v1; + +message PlaybackData { + string topic = 1; + string unit = 2; + // time since unix epoch in MICROSECONDS + uint64 time_us = 3; + repeated float values = 4; + +} \ No newline at end of file diff --git a/scylla-server/src/services/data_service.rs b/scylla-server/src/services/data_service.rs index 3f15725e..16d144ac 100644 --- a/scylla-server/src/services/data_service.rs +++ b/scylla-server/src/services/data_service.rs @@ -40,7 +40,7 @@ pub async fn add_data( .get_result(db) } -/// Adds many datapoints via a batch insert +/// Adds many datapoints via a batch insert, skips any data which conflicts with existing data /// * `db` - The database connection to use /// * `client_data` - A list of data to batch insert /// returns: A result containing the number of rows inserted or the QueryError propogated by the db @@ -66,5 +66,6 @@ pub async fn add_many( }) .collect::>(), ) + .on_conflict_do_nothing() .execute(db) } diff --git a/scylla-server/src/services/run_service.rs b/scylla-server/src/services/run_service.rs index 1c2e0d11..fdf937e2 100644 --- a/scylla-server/src/services/run_service.rs +++ b/scylla-server/src/services/run_service.rs @@ -6,7 +6,7 @@ use diesel::prelude::*; /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db pub async fn get_all_runs(db: &mut Database) -> Result, diesel::result::Error> { - run.load(db) + run.order(id.asc()).get_results(db) } /// Gets a single run by its id