From 53f25592383ca87eba3627115add9c5e97548eb6 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sat, 30 Nov 2024 15:53:20 -0500 Subject: [PATCH] consolidate batching logic in db_handler --- .../controllers/file_insertion_controller.rs | 24 +++++++++++-------- scylla-server/src/db_handler.rs | 4 ++++ scylla-server/src/main.rs | 3 ++- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs index b3612b45..30ea74b6 100644 --- a/scylla-server/src/controllers/file_insertion_controller.rs +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -1,22 +1,24 @@ -use axum::extract::{Multipart, State}; +use axum::{ + extract::{Multipart, State}, + Extension, +}; use axum_macros::debug_handler; use chrono::DateTime; use protobuf::CodedInputStream; use rangemap::RangeInclusiveMap; +use tokio::sync::mpsc; use tokio_util::bytes::Buf; -use tracing::{debug, info, trace}; +use tracing::{debug, info, trace, warn}; -use crate::{ - error::ScyllaError, - playback_data, - services::{data_service, run_service}, - ClientData, PoolHandle, -}; +use crate::{error::ScyllaError, playback_data, services::run_service, ClientData, PoolHandle}; +/// Inserts a file using http multipart +/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded // super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails #[debug_handler] pub async fn insert_file( State(pool): State, + Extension(batcher): Extension>>, mut multipart: Multipart, ) -> Result { // create a run ID cache @@ -95,7 +97,9 @@ pub async fn insert_file( .collect::>() .len() ); - data_service::add_many(&mut db, new_data).await?; + if let Err(err) = batcher.send(new_data).await { + warn!("Error sending file insert data to batcher! {}", err); + }; } - Ok("Successfully inserted all!".to_string()) + Ok("Successfully sent all to batcher!".to_string()) } diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index 35cba368..b47c88ce 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -78,6 +78,10 @@ impl DbHandler { // max for batch is 65535/4 params per message, hence the below, rounded down with a margin for safety // TODO avoid this code batch uploading the remainder messages as a new batch, combine it with another safely let chunk_size = msgs.len() / ((msgs.len() / 16380) + 1); + if chunk_size == 0 { + warn!("Could not insert {} messages, chunk size zero!", msgs.len()); + continue; + } debug!("Batch uploading {} chunks in parrallel", msgs.len() / chunk_size); for chunk in msgs.chunks(chunk_size).collect::>() { tokio::spawn(DbHandler::batch_upload(chunk.to_vec(), pool.clone())); diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index a3667f7b..d2cf4b6e 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -176,7 +176,7 @@ async fn main() { // spawn the database handler task_tracker.spawn( db_handler::DbHandler::new(mqtt_receive, db.clone(), cli.batch_upsert_time * 1000) - .handling_loop(db_send, token.clone()), + .handling_loop(db_send.clone(), token.clone()), ); // spawn the database inserter, if we have it enabled if !cli.disable_data_upload { @@ -236,6 +236,7 @@ async fn main() { ) // FILE INSERT .route("/insert/file", post(file_insertion_controller::insert_file)) + .layer(Extension(db_send)) .layer(DefaultBodyLimit::disable()) // for CORS handling .layer(