diff --git a/scylla-server/.gitignore b/scylla-server/.gitignore index a9a01cca..02186a82 100644 --- a/scylla-server/.gitignore +++ b/scylla-server/.gitignore @@ -23,3 +23,4 @@ prisma.rs # protobuf serverdata.rs command_data.rs +playback_data.rs \ No newline at end of file diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index 874a978e..6f82b66c 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -200,6 +200,7 @@ dependencies = [ "matchit 0.7.3", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -276,6 +277,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.67", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -640,6 +652,15 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "engineioxide" version = "0.14.0" @@ -1223,6 +1244,23 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "nom" version = "7.1.3" @@ -1437,10 +1475,11 @@ dependencies = [ [[package]] name = "protobuf" -version = "3.5.1" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bcc343da15609eaecd65f8aa76df8dc4209d325131d8219358c0aaaebab0bf6" +checksum = "a3a7c64d9bf75b1b8d981124c14c179074e8caa7dfe7b6a12e6222ddcd0c8f72" dependencies = [ + "bytes", "once_cell", "protobuf-support", "thiserror", @@ -1448,9 +1487,9 @@ dependencies = [ [[package]] name = "protobuf-codegen" -version = "3.5.1" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4d0cde5642ea4df842b13eb9f59ea6fafa26dcb43e3e1ee49120e9757556189" +checksum = "e26b833f144769a30e04b1db0146b2aaa53fd2fd83acf10a6b5f996606c18144" dependencies = [ "anyhow", "once_cell", @@ -1463,9 +1502,9 @@ dependencies = [ [[package]] name = "protobuf-parse" -version = "3.5.1" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b0e9b447d099ae2c4993c0cbb03c7a9d6c937b17f2d56cfc0b1550e6fcfdb76" +checksum = "322330e133eab455718444b4e033ebfac7c6528972c784fcde28d2cc783c6257" dependencies = [ "anyhow", "indexmap 2.2.6", @@ -1479,9 +1518,9 @@ dependencies = [ [[package]] name = "protobuf-support" -version = "3.5.1" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0766e3675a627c327e4b3964582594b0e8741305d628a98a5de75a1d15f99b9" +checksum = "b088fd20b938a875ea00843b6faf48579462630015c3788d397ad6a786663252" dependencies = [ "thiserror", ] @@ -1536,6 +1575,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rangemap" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60fcc7d6849342eff22c4350c8b9a989ee8ceabc4b481253e8946b9fe83d684" + [[package]] name = "redox_syscall" version = "0.5.2" @@ -1745,6 +1790,7 @@ version = "0.0.1" dependencies = [ "axum 0.7.5", "axum-extra", + "axum-macros", "chrono", "clap", "console-subscriber", @@ -1755,6 +1801,7 @@ dependencies = [ "protobuf", "protobuf-codegen", "rand", + "rangemap", "ringbuffer", "rumqttc", "serde", diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index 47229e1a..3e795258 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -9,10 +9,10 @@ diesel = { version = "2.2.4", features = ["postgres", "r2d2", "chrono"] } pq-sys = { version = "0.6", features = ["bundled"] } dotenvy = "0.15" serde = "1.0.203" -protobuf-codegen = "3.5.1" -protobuf = "3.5.1" +protobuf-codegen = "3.7.1" +protobuf = { version = "3.7.1", features = ["with-bytes"] } tokio = { version = "1.38.0", features = ["full", "tracing"] } -axum = "0.7.5" +axum = { version = "0.7.5", features = ["multipart"] } tower = { version = "0.4.13", features = ["timeout"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } socketioxide = { version = "0.14.0", features = ["tracing"] } @@ -28,12 +28,14 @@ axum-extra = { version = "0.9.3", features = ["query"] } chrono = { version = "0.4.38", features = ["serde"] } serde_json = "1.0.128" diesel_migrations = { version = "2.2.0", features = ["postgres"] } +rangemap = "1.5.1" +axum-macros = "0.4.2" [features] top = ["dep:console-subscriber"] [build-dependencies] -protobuf-codegen = "3.5.1" +protobuf-codegen = "3.7.1" [profile.release] lto = true diff --git a/scylla-server/build.rs b/scylla-server/build.rs index 44b9825e..3c0bfc5c 100644 --- a/scylla-server/build.rs +++ b/scylla-server/build.rs @@ -9,6 +9,7 @@ fn main() { // Inputs must reside in some of include paths. .input("src/proto/serverdata.proto") .input("src/proto/command_data.proto") + .input("src/proto/playback_data.proto") // Specify output directory relative to Cargo output directory. .out_dir("src") .run_from_script(); diff --git a/scylla-server/migrations/2024-11-10-031516_create_all/up.sql b/scylla-server/migrations/2024-11-10-031516_create_all/up.sql index 71d46db7..beb38f90 100644 --- a/scylla-server/migrations/2024-11-10-031516_create_all/up.sql +++ b/scylla-server/migrations/2024-11-10-031516_create_all/up.sql @@ -1,3 +1,5 @@ +CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; + -- CreateTable CREATE TABLE "run" ( "id" SERIAL NOT NULL, @@ -5,7 +7,7 @@ CREATE TABLE "run" ( "latitude" DOUBLE PRECISION, "longitude" DOUBLE PRECISION, "driverName" TEXT, - "notes" TEXT DEFAULT '', + "notes" TEXT NOT NULL DEFAULT '', "time" TIMESTAMPTZ NOT NULL, CONSTRAINT "run_pkey" PRIMARY KEY ("id") @@ -13,14 +15,16 @@ CREATE TABLE "run" ( -- CreateTable CREATE TABLE "data" ( - "id" SERIAL NOT NULL, "values" DOUBLE PRECISION[], "dataTypeName" TEXT NOT NULL, "time" TIMESTAMPTZ NOT NULL, "runId" INTEGER NOT NULL, - CONSTRAINT "data_pkey" PRIMARY KEY ("id") + PRIMARY KEY("time", "dataTypeName") ); +-- SELECT * FROM create_hypertable("data", by_range("time")); +-- SELECT * FROM add_dimension("data", by_hash("dataTypeNmae", 4)); + -- CreateTable CREATE TABLE "dataType" ( @@ -34,9 +38,6 @@ CREATE TABLE "dataType" ( -- CreateIndex CREATE UNIQUE INDEX "run_id_key" ON "run"("id"); --- CreateIndex -CREATE UNIQUE INDEX "data_id_time_key" ON "data"("id", "time"); - -- CreateIndex CREATE UNIQUE INDEX "dataType_name_key" ON "dataType"("name"); diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs new file mode 100644 index 00000000..f067895c --- /dev/null +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -0,0 +1,95 @@ +use axum::{ + extract::{Multipart, State}, + Extension, +}; +use axum_macros::debug_handler; +use chrono::DateTime; +use protobuf::CodedInputStream; +use rangemap::RangeInclusiveMap; +use tokio::sync::mpsc; +use tracing::{debug, info, trace, warn}; + +use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle}; + +/// Inserts a file using http multipart +/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded +// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails +#[debug_handler] +pub async fn insert_file( + State(pool): State, + Extension(batcher): Extension>>, + mut multipart: Multipart, +) -> Result { + // create a run ID cache + let mut db = pool.get()?; + debug!("Warming up run ID map!"); + let mut run_iter = run_service::get_all_runs(&mut db) + .await? + .into_iter() + .map(|f| (f.id, f.time.timestamp_micros() as u64)) + .peekable(); + let mut run_rng: RangeInclusiveMap = RangeInclusiveMap::new(); + // this actual formulates the list, where keys are ranges of times (us) and the values are the run IDs + while let Some(it) = run_iter.next() { + match run_iter.peek() { + Some(next) => { + run_rng.insert(it.1..=next.1, it.0); + } + // if this is the last item in the list + None => { + run_rng.insert(it.1..=u64::MAX, it.0); + continue; + } + } + } + + // iterate through all files + debug!("Converting file data to insertable data!"); + while let Some(field) = multipart.next_field().await.unwrap() { + // round up all of the protobuf segments as a giant list + let data = field.bytes().await.unwrap(); + let mut count_bad_run = 0usize; + let mut insertable_data: Vec = Vec::new(); + { + // this cannot be used across an await, hence scoped + let mut stream = CodedInputStream::from_tokio_bytes(&data); + loop { + match stream.read_message::() { + Ok(f) => { + trace!("Decoded file msg: {}", f); + let f = match run_rng.get(&f.time_us) { + Some(a) => ClientData { + run_id: *a, + name: f.topic.clone(), + unit: f.unit, + values: f.values, + timestamp: DateTime::from_timestamp_micros(f.time_us as i64) + .unwrap(), + node: f.topic.split_once('/').unwrap_or_default().0.to_owned(), + }, + None => { + count_bad_run += 1; + continue; + } + }; + insertable_data.push(f); + } + Err(e) => { + info!("Exiting from read loop {}", e); + break; + } + } + } + } + info!( + "Inserting {} points. {} points could not be assigned IDs.", + insertable_data.len(), + count_bad_run + ); + if let Err(err) = batcher.send(insertable_data).await { + warn!("Error sending file insert data to batcher! {}", err); + }; + } + info!("Finished file insert request!"); + Ok("Successfully sent all to batcher!".to_string()) +} diff --git a/scylla-server/src/controllers/mod.rs b/scylla-server/src/controllers/mod.rs index acc5fb63..280a0e0e 100644 --- a/scylla-server/src/controllers/mod.rs +++ b/scylla-server/src/controllers/mod.rs @@ -3,3 +3,5 @@ pub mod car_command_controller; pub mod data_controller; pub mod data_type_controller; pub mod run_controller; + +pub mod file_insertion_controller; diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index 35cba368..0ed9145b 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -63,10 +63,14 @@ impl DbHandler { info!("{} batches remaining!", batch_queue.len()+1); // do not spawn new tasks in this mode, see below comment for chunk_size math let chunk_size = final_msgs.len() / ((final_msgs.len() / 16380) + 1); + if chunk_size == 0 { + warn!("Could not insert {} messages, chunk size zero!", final_msgs.len()); + continue; + } for chunk in final_msgs.chunks(chunk_size).collect::>() { info!( - "A cleanup batch uploaded: {:?}", - data_service::add_many(&mut database, chunk.to_vec()).await + "A cleanup chunk uploaded: {:?}", + data_service::add_many(&mut database, chunk.to_vec()) ); } } @@ -78,9 +82,16 @@ impl DbHandler { // max for batch is 65535/4 params per message, hence the below, rounded down with a margin for safety // TODO avoid this code batch uploading the remainder messages as a new batch, combine it with another safely let chunk_size = msgs.len() / ((msgs.len() / 16380) + 1); + if chunk_size == 0 { + warn!("Could not insert {} messages, chunk size zero!", msgs.len()); + continue; + } debug!("Batch uploading {} chunks in parrallel", msgs.len() / chunk_size); for chunk in msgs.chunks(chunk_size).collect::>() { - tokio::spawn(DbHandler::batch_upload(chunk.to_vec(), pool.clone())); + let owned = chunk.to_vec(); + let pool = pool.clone(); + tokio::task::spawn_blocking(move || { + DbHandler::batch_upload(owned, pool)}); } debug!( "DB send: {} of {}", @@ -110,13 +121,13 @@ impl DbHandler { } } - #[instrument(level = Level::DEBUG, skip(msg, pool))] - async fn batch_upload(msg: Vec, pool: PoolHandle) { + //#[instrument(level = Level::DEBUG, skip(msg, pool))] + fn batch_upload(msg: Vec, pool: PoolHandle) { let Ok(mut database) = pool.get() else { warn!("Could not get connection for batch upload!"); return; }; - match data_service::add_many(&mut database, msg).await { + match data_service::add_many(&mut database, msg) { Ok(count) => info!("Batch uploaded: {:?}", count), Err(err) => warn!("Error in batch upload: {:?}", err), } diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index 5ea5f7db..7043a662 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -13,6 +13,7 @@ pub mod models; pub mod schema; pub mod command_data; +pub mod playback_data; pub mod serverdata; pub mod transformers; diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 7241b6b9..d2cf4b6e 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -4,6 +4,7 @@ use std::{ }; use axum::{ + extract::DefaultBodyLimit, http::Method, routing::{get, post}, Extension, Router, @@ -20,7 +21,7 @@ use scylla_server::{ controllers::{ self, car_command_controller::{self}, - data_type_controller, run_controller, + data_type_controller, file_insertion_controller, run_controller, }, services::run_service::{self}, PoolHandle, RateLimitMode, @@ -175,7 +176,7 @@ async fn main() { // spawn the database handler task_tracker.spawn( db_handler::DbHandler::new(mqtt_receive, db.clone(), cli.batch_upsert_time * 1000) - .handling_loop(db_send, token.clone()), + .handling_loop(db_send.clone(), token.clone()), ); // spawn the database inserter, if we have it enabled if !cli.disable_data_upload { @@ -233,6 +234,10 @@ async fn main() { "/config/set/:configKey", post(car_command_controller::send_config_command).layer(Extension(client_sharable)), ) + // FILE INSERT + .route("/insert/file", post(file_insertion_controller::insert_file)) + .layer(Extension(db_send)) + .layer(DefaultBodyLimit::disable()) // for CORS handling .layer( CorsLayer::new() diff --git a/scylla-server/src/mod.rs b/scylla-server/src/mod.rs index b7017c61..dbc1df8b 100644 --- a/scylla-server/src/mod.rs +++ b/scylla-server/src/mod.rs @@ -1,4 +1,5 @@ // @generated pub mod command_data; +pub mod playback_data; pub mod serverdata; diff --git a/scylla-server/src/models.rs b/scylla-server/src/models.rs index 03990555..88f61a68 100644 --- a/scylla-server/src/models.rs +++ b/scylla-server/src/models.rs @@ -6,8 +6,8 @@ use serde::Serialize; #[diesel(table_name = crate::schema::data)] #[diesel(belongs_to(DataType, foreign_key = dataTypeName))] #[diesel(check_for_backend(diesel::pg::Pg))] +#[diesel(primary_key(dataTypeName, time))] pub struct Data { - pub id: i32, pub values: Option>>, pub dataTypeName: String, pub time: DateTime, @@ -26,6 +26,7 @@ pub struct DataType { #[derive(Queryable, Debug, Identifiable, Insertable, Selectable, Serialize, AsChangeset)] #[diesel(table_name = crate::schema::run)] +#[diesel(primary_key(id))] #[diesel(check_for_backend(diesel::pg::Pg))] pub struct Run { pub id: i32, diff --git a/scylla-server/src/proto/playback_data.proto b/scylla-server/src/proto/playback_data.proto new file mode 100644 index 00000000..42609973 --- /dev/null +++ b/scylla-server/src/proto/playback_data.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package playbackdata.v1; + +message PlaybackData { + string topic = 1; + string unit = 2; + // time since unix epoch in MICROSECONDS + uint64 time_us = 3; + repeated float values = 4; + +} \ No newline at end of file diff --git a/scylla-server/src/schema.rs b/scylla-server/src/schema.rs index 52414e46..95aea5dc 100644 --- a/scylla-server/src/schema.rs +++ b/scylla-server/src/schema.rs @@ -1,8 +1,7 @@ // @generated automatically by Diesel CLI. diesel::table! { - data (id) { - id -> Int4, + data (time, dataTypeName) { values -> Nullable>>, dataTypeName -> Text, time -> Timestamptz, diff --git a/scylla-server/src/services/data_service.rs b/scylla-server/src/services/data_service.rs index 3f15725e..383ff51c 100644 --- a/scylla-server/src/services/data_service.rs +++ b/scylla-server/src/services/data_service.rs @@ -40,11 +40,11 @@ pub async fn add_data( .get_result(db) } -/// Adds many datapoints via a batch insert +/// Adds many datapoints via a batch insert, skips any data which conflicts with existing data /// * `db` - The database connection to use /// * `client_data` - A list of data to batch insert /// returns: A result containing the number of rows inserted or the QueryError propogated by the db -pub async fn add_many( +pub fn add_many( db: &mut Database, client_data: Vec, ) -> Result { @@ -66,5 +66,6 @@ pub async fn add_many( }) .collect::>(), ) + .on_conflict_do_nothing() .execute(db) } diff --git a/scylla-server/src/services/run_service.rs b/scylla-server/src/services/run_service.rs index f12d3205..7897bbb5 100644 --- a/scylla-server/src/services/run_service.rs +++ b/scylla-server/src/services/run_service.rs @@ -6,7 +6,7 @@ use diesel::prelude::*; /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db pub async fn get_all_runs(db: &mut Database) -> Result, diesel::result::Error> { - run.load(db) + run.order(id.asc()).get_results(db) } /// Gets a single run by its id