Skip to content

Commit

Permalink
Insert plaintext protobuf into Scylla (#247)
Browse files Browse the repository at this point in the history
* operation file insertion code

* fmt and fix

* consolidate batching logic in db_handler

* fix primary keys, do not insert items for which no run can be found.

* simplification and perf
  • Loading branch information
jr1221 authored Dec 21, 2024
1 parent e51902c commit 2517000
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 32 deletions.
1 change: 1 addition & 0 deletions scylla-server/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ prisma.rs
# protobuf
serverdata.rs
command_data.rs
playback_data.rs
63 changes: 55 additions & 8 deletions scylla-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ diesel = { version = "2.2.4", features = ["postgres", "r2d2", "chrono"] }
pq-sys = { version = "0.6", features = ["bundled"] }
dotenvy = "0.15"
serde = "1.0.203"
protobuf-codegen = "3.5.1"
protobuf = "3.5.1"
protobuf-codegen = "3.7.1"
protobuf = { version = "3.7.1", features = ["with-bytes"] }
tokio = { version = "1.38.0", features = ["full", "tracing"] }
axum = "0.7.5"
axum = { version = "0.7.5", features = ["multipart"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
Expand All @@ -28,12 +28,14 @@ axum-extra = { version = "0.9.3", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
rangemap = "1.5.1"
axum-macros = "0.4.2"

[features]
top = ["dep:console-subscriber"]

[build-dependencies]
protobuf-codegen = "3.5.1"
protobuf-codegen = "3.7.1"

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions scylla-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fn main() {
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
.input("src/proto/playback_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
13 changes: 7 additions & 6 deletions scylla-server/migrations/2024-11-10-031516_create_all/up.sql
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- CreateTable
CREATE TABLE "run" (
"id" SERIAL NOT NULL,
"locationName" TEXT,
"latitude" DOUBLE PRECISION,
"longitude" DOUBLE PRECISION,
"driverName" TEXT,
"notes" TEXT DEFAULT '',
"notes" TEXT NOT NULL DEFAULT '',
"time" TIMESTAMPTZ NOT NULL,

CONSTRAINT "run_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "data" (
"id" SERIAL NOT NULL,
"values" DOUBLE PRECISION[],
"dataTypeName" TEXT NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"runId" INTEGER NOT NULL,

CONSTRAINT "data_pkey" PRIMARY KEY ("id")
PRIMARY KEY("time", "dataTypeName")
);
-- SELECT * FROM create_hypertable("data", by_range("time"));
-- SELECT * FROM add_dimension("data", by_hash("dataTypeNmae", 4));


-- CreateTable
CREATE TABLE "dataType" (
Expand All @@ -34,9 +38,6 @@ CREATE TABLE "dataType" (
-- CreateIndex
CREATE UNIQUE INDEX "run_id_key" ON "run"("id");

-- CreateIndex
CREATE UNIQUE INDEX "data_id_time_key" ON "data"("id", "time");

-- CreateIndex
CREATE UNIQUE INDEX "dataType_name_key" ON "dataType"("name");

Expand Down
95 changes: 95 additions & 0 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use axum::{
extract::{Multipart, State},
Extension,
};
use axum_macros::debug_handler;
use chrono::DateTime;
use protobuf::CodedInputStream;
use rangemap::RangeInclusiveMap;
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};

use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle};

/// Inserts a file using http multipart
/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded
// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails
#[debug_handler]
pub async fn insert_file(
State(pool): State<PoolHandle>,
Extension(batcher): Extension<mpsc::Sender<Vec<ClientData>>>,
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
let mut db = pool.get()?;
debug!("Warming up run ID map!");
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
.into_iter()
.map(|f| (f.id, f.time.timestamp_micros() as u64))
.peekable();
let mut run_rng: RangeInclusiveMap<u64, i32> = RangeInclusiveMap::new();
// this actual formulates the list, where keys are ranges of times (us) and the values are the run IDs
while let Some(it) = run_iter.next() {
match run_iter.peek() {
Some(next) => {
run_rng.insert(it.1..=next.1, it.0);
}
// if this is the last item in the list
None => {
run_rng.insert(it.1..=u64::MAX, it.0);
continue;
}
}
}

// iterate through all files
debug!("Converting file data to insertable data!");
while let Some(field) = multipart.next_field().await.unwrap() {
// round up all of the protobuf segments as a giant list
let data = field.bytes().await.unwrap();
let mut count_bad_run = 0usize;
let mut insertable_data: Vec<ClientData> = Vec::new();
{
// this cannot be used across an await, hence scoped
let mut stream = CodedInputStream::from_tokio_bytes(&data);
loop {
match stream.read_message::<playback_data::PlaybackData>() {
Ok(f) => {
trace!("Decoded file msg: {}", f);
let f = match run_rng.get(&f.time_us) {
Some(a) => ClientData {
run_id: *a,
name: f.topic.clone(),
unit: f.unit,
values: f.values,
timestamp: DateTime::from_timestamp_micros(f.time_us as i64)
.unwrap(),
node: f.topic.split_once('/').unwrap_or_default().0.to_owned(),
},
None => {
count_bad_run += 1;
continue;
}
};
insertable_data.push(f);
}
Err(e) => {
info!("Exiting from read loop {}", e);
break;
}
}
}
}
info!(
"Inserting {} points. {} points could not be assigned IDs.",
insertable_data.len(),
count_bad_run
);
if let Err(err) = batcher.send(insertable_data).await {
warn!("Error sending file insert data to batcher! {}", err);
};
}
info!("Finished file insert request!");
Ok("Successfully sent all to batcher!".to_string())
}
2 changes: 2 additions & 0 deletions scylla-server/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub mod car_command_controller;
pub mod data_controller;
pub mod data_type_controller;
pub mod run_controller;

pub mod file_insertion_controller;
23 changes: 17 additions & 6 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ impl DbHandler {
info!("{} batches remaining!", batch_queue.len()+1);
// do not spawn new tasks in this mode, see below comment for chunk_size math
let chunk_size = final_msgs.len() / ((final_msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", final_msgs.len());
continue;
}
for chunk in final_msgs.chunks(chunk_size).collect::<Vec<_>>() {
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&mut database, chunk.to_vec()).await
"A cleanup chunk uploaded: {:?}",
data_service::add_many(&mut database, chunk.to_vec())
);
}
}
Expand All @@ -78,9 +82,16 @@ impl DbHandler {
// max for batch is 65535/4 params per message, hence the below, rounded down with a margin for safety
// TODO avoid this code batch uploading the remainder messages as a new batch, combine it with another safely
let chunk_size = msgs.len() / ((msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", msgs.len());
continue;
}
debug!("Batch uploading {} chunks in parrallel", msgs.len() / chunk_size);
for chunk in msgs.chunks(chunk_size).collect::<Vec<_>>() {
tokio::spawn(DbHandler::batch_upload(chunk.to_vec(), pool.clone()));
let owned = chunk.to_vec();
let pool = pool.clone();
tokio::task::spawn_blocking(move || {
DbHandler::batch_upload(owned, pool)});
}
debug!(
"DB send: {} of {}",
Expand Down Expand Up @@ -110,13 +121,13 @@ impl DbHandler {
}
}

#[instrument(level = Level::DEBUG, skip(msg, pool))]
async fn batch_upload(msg: Vec<ClientData>, pool: PoolHandle) {
//#[instrument(level = Level::DEBUG, skip(msg, pool))]
fn batch_upload(msg: Vec<ClientData>, pool: PoolHandle) {
let Ok(mut database) = pool.get() else {
warn!("Could not get connection for batch upload!");
return;
};
match data_service::add_many(&mut database, msg).await {
match data_service::add_many(&mut database, msg) {
Ok(count) => info!("Batch uploaded: {:?}", count),
Err(err) => warn!("Error in batch upload: {:?}", err),
}
Expand Down
1 change: 1 addition & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod models;
pub mod schema;

pub mod command_data;
pub mod playback_data;
pub mod serverdata;

pub mod transformers;
Expand Down
Loading

0 comments on commit 2517000

Please sign in to comment.