Skip to content

Commit

Permalink
operation file insertion code
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Nov 23, 2024
1 parent 0532c39 commit 2694ffa
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 5 deletions.
1 change: 1 addition & 0 deletions scylla-server/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ prisma.rs
# protobuf
serverdata.rs
command_data.rs
playback_data.rs
46 changes: 46 additions & 0 deletions scylla-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ serde = "1.0.203"
protobuf-codegen = "3.5.1"
protobuf = "3.5.1"
tokio = { version = "1.38.0", features = ["full", "tracing"] }
axum = "0.7.5"
axum = { version = "0.7.5", features = ["multipart"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
Expand All @@ -28,6 +28,8 @@ axum-extra = { version = "0.9.3", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
rangemap = "1.5.1"
axum-macros = "0.4.2"

[features]
top = ["dep:console-subscriber"]
Expand Down
1 change: 1 addition & 0 deletions scylla-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fn main() {
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
.input("src/proto/playback_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
101 changes: 101 additions & 0 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use axum::extract::{Multipart, State};
use axum_macros::debug_handler;
use chrono::DateTime;
use protobuf::CodedInputStream;
use rangemap::RangeInclusiveMap;
use tokio_util::bytes::Buf;
use tracing::{debug, info, trace};

use crate::{
error::ScyllaError,
playback_data,
services::{data_service, run_service},
ClientData, PoolHandle,
};

// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails
#[debug_handler]
pub async fn insert_file(
State(pool): State<PoolHandle>,
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
let mut db = pool.get()?;
debug!("Warming up run ID map!");
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
.into_iter()
.map(|f| (f.id, f.time.timestamp_micros() as u64))
.peekable();
let mut run_rng: RangeInclusiveMap<u64, i32> = RangeInclusiveMap::new();
// this actual formulates the list, where keys are ranges of times (us) and the values are the run IDs
while let Some(it) = run_iter.next() {
match run_iter.peek() {
Some(next) => {
run_rng.insert(it.1..=next.1, it.0);
}
// if this is the last item in the list
None => {
run_rng.insert(it.1..=std::u64::MAX, it.0);
continue;
}
}
}

// iterate through all files
while let Some(field) = multipart.next_field().await.unwrap() {
// round up all of the protobuf segments as a giant list
let mut data = field.bytes().await.unwrap().reader();
let mut insertable_data: Vec<playback_data::PlaybackData> = vec![];
{
// this cannot be used across an await, hence scoped
let mut stream = CodedInputStream::new(&mut data);
loop {
match stream.read_message::<playback_data::PlaybackData>() {
Ok(a) => {
trace!("Decoded file msg: {}", a);
insertable_data.push(a);
}
Err(e) => {
trace!("Exiting from read loop {}", e);
break;
}
}
}
}

debug!("Mapping data to ClientData type, with inferred run IDs!");
let new_data: Vec<ClientData> = insertable_data
.into_iter()
.map(|f| match run_rng.get(&f.time_us) {
Some(a) => ClientData {
run_id: *a,
name: f.topic.clone(),
unit: f.unit,
values: f.values,
timestamp: DateTime::from_timestamp_micros(f.time_us as i64).unwrap(),
node: f.topic.split_once('/').unwrap_or_default().0.to_owned(),
},
None => ClientData {
run_id: -1,
name: f.topic.clone(),
unit: f.unit,
values: f.values,
timestamp: DateTime::from_timestamp_micros(f.time_us as i64).unwrap(),
node: f.topic.split_once('/').unwrap_or_default().0.to_owned(),
},
})
.collect();
info!(
"Inserting {} points, {} of which have no run ID (-1).",
new_data.len(),
new_data
.iter()
.filter(|x| x.run_id == -1)
.collect::<Vec<_>>()
.len()
);
data_service::add_many(&mut db, new_data).await?;
}
Ok("Successfully inserted all!".to_string())
}
2 changes: 2 additions & 0 deletions scylla-server/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub mod car_command_controller;
pub mod data_controller;
pub mod data_type_controller;
pub mod run_controller;

pub mod file_insertion_controller;
1 change: 1 addition & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod schema;

pub mod command_data;
pub mod serverdata;
pub mod playback_data;

pub mod transformers;

Expand Down
6 changes: 5 additions & 1 deletion scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use axum::{
extract::DefaultBodyLimit,
http::Method,
routing::{get, post},
Extension, Router,
Expand All @@ -20,7 +21,7 @@ use scylla_server::{
controllers::{
self,
car_command_controller::{self},
data_type_controller, run_controller,
data_type_controller, file_insertion_controller, run_controller,
},
services::run_service::{self},
PoolHandle, RateLimitMode,
Expand Down Expand Up @@ -233,6 +234,9 @@ async fn main() {
"/config/set/:configKey",
post(car_command_controller::send_config_command).layer(Extension(client_sharable)),
)
// FILE INSERT
.route("/insert/file", post(file_insertion_controller::insert_file))
.layer(DefaultBodyLimit::disable())
// for CORS handling
.layer(
CorsLayer::new()
Expand Down
1 change: 1 addition & 0 deletions scylla-server/src/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @generated

pub mod command_data;
pub mod playback_data;
pub mod serverdata;
2 changes: 1 addition & 1 deletion scylla-server/src/mqtt_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl MqttProcessor {
// get the node and datatype from the topic extracted at the beginning
let node = split.0;

let data_type = String::from(split.1);
let data_type = String::from(topic);

// extract the unix time
// levels of time priority
Expand Down
12 changes: 12 additions & 0 deletions scylla-server/src/proto/playback_data.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";

package playbackdata.v1;

message PlaybackData {
string topic = 1;
string unit = 2;
// time since unix epoch in MICROSECONDS
uint64 time_us = 3;
repeated float values = 4;

}
3 changes: 2 additions & 1 deletion scylla-server/src/services/data_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn add_data(
.get_result(db)
}

/// Adds many datapoints via a batch insert
/// Adds many datapoints via a batch insert, skips any data which conflicts with existing data
/// * `db` - The database connection to use
/// * `client_data` - A list of data to batch insert
/// returns: A result containing the number of rows inserted or the QueryError propogated by the db
Expand All @@ -66,5 +66,6 @@ pub async fn add_many(
})
.collect::<Vec<_>>(),
)
.on_conflict_do_nothing()
.execute(db)
}
2 changes: 1 addition & 1 deletion scylla-server/src/services/run_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use diesel::prelude::*;
/// * `db` - The prisma client to make the call to
/// returns: A result containing the data or the QueryError propogated by the db
pub async fn get_all_runs(db: &mut Database) -> Result<Vec<Run>, diesel::result::Error> {
run.load(db)
run.order(id.asc()).get_results(db)
}

/// Gets a single run by its id
Expand Down

0 comments on commit 2694ffa

Please sign in to comment.