Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert plaintext protobuf into Scylla #247

Merged
merged 7 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scylla-server/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ prisma.rs
# protobuf
serverdata.rs
command_data.rs
playback_data.rs
63 changes: 55 additions & 8 deletions scylla-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ diesel = { version = "2.2.4", features = ["postgres", "r2d2", "chrono"] }
pq-sys = { version = "0.6", features = ["bundled"] }
dotenvy = "0.15"
serde = "1.0.203"
protobuf-codegen = "3.5.1"
protobuf = "3.5.1"
protobuf-codegen = "3.7.1"
protobuf = { version = "3.7.1", features = ["with-bytes"] }
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1.38.0", features = ["full", "tracing"] }
axum = "0.7.5"
axum = { version = "0.7.5", features = ["multipart"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
Expand All @@ -28,12 +28,14 @@ axum-extra = { version = "0.9.3", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
rangemap = "1.5.1"
axum-macros = "0.4.2"

[features]
top = ["dep:console-subscriber"]

[build-dependencies]
protobuf-codegen = "3.5.1"
protobuf-codegen = "3.7.1"

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions scylla-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fn main() {
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
.input("src/proto/playback_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
13 changes: 7 additions & 6 deletions scylla-server/migrations/2024-11-10-031516_create_all/up.sql
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- CreateTable
CREATE TABLE "run" (
"id" SERIAL NOT NULL,
"locationName" TEXT,
"latitude" DOUBLE PRECISION,
"longitude" DOUBLE PRECISION,
"driverName" TEXT,
"notes" TEXT DEFAULT '',
"notes" TEXT NOT NULL DEFAULT '',
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
"time" TIMESTAMPTZ NOT NULL,

CONSTRAINT "run_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "data" (
"id" SERIAL NOT NULL,
"values" DOUBLE PRECISION[],
"dataTypeName" TEXT NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"runId" INTEGER NOT NULL,

CONSTRAINT "data_pkey" PRIMARY KEY ("id")
PRIMARY KEY("time", "dataTypeName")
);
-- SELECT * FROM create_hypertable("data", by_range("time"));
-- SELECT * FROM add_dimension("data", by_hash("dataTypeNmae", 4));


-- CreateTable
CREATE TABLE "dataType" (
Expand All @@ -34,9 +38,6 @@ CREATE TABLE "dataType" (
-- CreateIndex
CREATE UNIQUE INDEX "run_id_key" ON "run"("id");

-- CreateIndex
CREATE UNIQUE INDEX "data_id_time_key" ON "data"("id", "time");

-- CreateIndex
CREATE UNIQUE INDEX "dataType_name_key" ON "dataType"("name");

Expand Down
95 changes: 95 additions & 0 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use axum::{
extract::{Multipart, State},
Extension,
};
use axum_macros::debug_handler;
use chrono::DateTime;
use protobuf::CodedInputStream;
use rangemap::RangeInclusiveMap;
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};

use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle};

/// Inserts a file using http multipart
/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded
// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails
#[debug_handler]
pub async fn insert_file(
State(pool): State<PoolHandle>,
Extension(batcher): Extension<mpsc::Sender<Vec<ClientData>>>,
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
let mut db = pool.get()?;
debug!("Warming up run ID map!");
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
.into_iter()
.map(|f| (f.id, f.time.timestamp_micros() as u64))
.peekable();
let mut run_rng: RangeInclusiveMap<u64, i32> = RangeInclusiveMap::new();
// this actual formulates the list, where keys are ranges of times (us) and the values are the run IDs
while let Some(it) = run_iter.next() {
match run_iter.peek() {
Some(next) => {
run_rng.insert(it.1..=next.1, it.0);
}
// if this is the last item in the list
None => {
run_rng.insert(it.1..=u64::MAX, it.0);
continue;
}
}
}

// iterate through all files
debug!("Converting file data to insertable data!");
while let Some(field) = multipart.next_field().await.unwrap() {
// round up all of the protobuf segments as a giant list
let data = field.bytes().await.unwrap();
let mut count_bad_run = 0usize;
let mut insertable_data: Vec<ClientData> = Vec::new();
{
// this cannot be used across an await, hence scoped
let mut stream = CodedInputStream::from_tokio_bytes(&data);
loop {
match stream.read_message::<playback_data::PlaybackData>() {
Ok(f) => {
trace!("Decoded file msg: {}", f);
let f = match run_rng.get(&f.time_us) {
Some(a) => ClientData {
run_id: *a,
name: f.topic.clone(),
unit: f.unit,
values: f.values,
timestamp: DateTime::from_timestamp_micros(f.time_us as i64)
.unwrap(),
node: f.topic.split_once('/').unwrap_or_default().0.to_owned(),
},
None => {
count_bad_run += 1;
continue;
}
};
insertable_data.push(f);
}
Err(e) => {
info!("Exiting from read loop {}", e);
break;
}
}
}
}
info!(
"Inserting {} points. {} points could not be assigned IDs.",
insertable_data.len(),
count_bad_run
);
if let Err(err) = batcher.send(insertable_data).await {
warn!("Error sending file insert data to batcher! {}", err);
};
}
info!("Finished file insert request!");
Ok("Successfully sent all to batcher!".to_string())
}
2 changes: 2 additions & 0 deletions scylla-server/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub mod car_command_controller;
pub mod data_controller;
pub mod data_type_controller;
pub mod run_controller;

pub mod file_insertion_controller;
23 changes: 17 additions & 6 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ impl DbHandler {
info!("{} batches remaining!", batch_queue.len()+1);
// do not spawn new tasks in this mode, see below comment for chunk_size math
let chunk_size = final_msgs.len() / ((final_msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", final_msgs.len());
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
for chunk in final_msgs.chunks(chunk_size).collect::<Vec<_>>() {
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&mut database, chunk.to_vec()).await
"A cleanup chunk uploaded: {:?}",
data_service::add_many(&mut database, chunk.to_vec())
);
}
}
Expand All @@ -78,9 +82,16 @@ impl DbHandler {
// max for batch is 65535/4 params per message, hence the below, rounded down with a margin for safety
// TODO avoid this code batch uploading the remainder messages as a new batch, combine it with another safely
let chunk_size = msgs.len() / ((msgs.len() / 16380) + 1);
if chunk_size == 0 {
warn!("Could not insert {} messages, chunk size zero!", msgs.len());
continue;
}
debug!("Batch uploading {} chunks in parrallel", msgs.len() / chunk_size);
for chunk in msgs.chunks(chunk_size).collect::<Vec<_>>() {
tokio::spawn(DbHandler::batch_upload(chunk.to_vec(), pool.clone()));
let owned = chunk.to_vec();
let pool = pool.clone();
tokio::task::spawn_blocking(move || {
DbHandler::batch_upload(owned, pool)});
}
debug!(
"DB send: {} of {}",
Expand Down Expand Up @@ -110,13 +121,13 @@ impl DbHandler {
}
}

#[instrument(level = Level::DEBUG, skip(msg, pool))]
async fn batch_upload(msg: Vec<ClientData>, pool: PoolHandle) {
//#[instrument(level = Level::DEBUG, skip(msg, pool))]
fn batch_upload(msg: Vec<ClientData>, pool: PoolHandle) {
let Ok(mut database) = pool.get() else {
warn!("Could not get connection for batch upload!");
return;
};
match data_service::add_many(&mut database, msg).await {
match data_service::add_many(&mut database, msg) {
Ok(count) => info!("Batch uploaded: {:?}", count),
Err(err) => warn!("Error in batch upload: {:?}", err),
}
Expand Down
1 change: 1 addition & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod models;
pub mod schema;

pub mod command_data;
pub mod playback_data;
pub mod serverdata;

pub mod transformers;
Expand Down
Loading
Loading