Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diesel Performance Fixes, Batching Improvements, New Allocator #262

Merged
merged 16 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,510 changes: 913 additions & 597 deletions scylla-server/Cargo.lock

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,35 @@ edition = "2021"
default-run = "scylla-server"

[dependencies]
diesel = { version = "2.2.4", features = ["postgres", "r2d2", "chrono"] }
diesel = { version = "2.2.4", features = ["postgres", "chrono"] }
pq-sys = { version = "0.6.3", features = ["bundled_without_openssl"] }
dotenvy = "0.15"
serde = "1.0.203"
protobuf-codegen = "3.7.1"
protobuf = { version = "3.7.1", features = ["with-bytes"] }
tokio = { version = "1.38.0", features = ["full", "tracing"] }
axum = { version = "0.7.5", features = ["multipart"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
axum = { version = "0.8.1", features = ["multipart"] }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
socketioxide = { version = "0.15.1", features = ["tracing"] }
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "main"}
tokio-util = { version= "0.7.11", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] }
rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
console-subscriber = { version = "0.4.1", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }
axum-extra = { version = "0.10.0", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde_json = "1.0.128"
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
rangemap = "1.5.1"
axum-macros = "0.4.2"
axum-macros = "0.5.0"
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper", "sync-connection-wrapper", "tokio"] }
rustc-hash = "2.1.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

[features]
top = ["dep:console-subscriber"]
Expand All @@ -43,6 +47,12 @@ codegen-units = 1
panic = "abort"
strip = true # Automatically strip symbols from the binary.

[profile.profiling]
inherits = "release"
debug = true
strip = false

[[bin]]
name = "scylla-server"
path = "src/main.rs"

44 changes: 33 additions & 11 deletions scylla-server/integration_test.sh
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
#!/bin/sh
echo "Starting db"
cd ../compose
docker compose up -d odyssey-timescale

cd ../scylla-server
echo "Migrating DB"
diesel migration run
# Navigate to the compose directory
echo "Navigating to compose directory..."
cd ../compose || { echo "Compose directory not found"; exit 1; }

echo "Running tests"
DATABASE_URL=postgresql://postgres:[email protected]:5432/postgres cargo test -- --test-threads=1
# Remove any existing odyssey-timescale container
echo "Stopping and removing any existing odyssey-timescale container..."
docker rm -f odyssey-timescale 2>/dev/null || echo "No existing container to remove."

echo "Exiting"
cd ../compose
docker compose down
# Start a new odyssey-timescale container
echo "Starting a new odyssey-timescale container..."
docker compose up -d odyssey-timescale || { echo "Failed to start odyssey-timescale"; exit 1; }

# Wait for the database to initialize
echo "Waiting for the database to initialize..."
sleep 3

# Navigate to the scylla-server directory
cd ../scylla-server || { echo "scylla-server directory not found"; exit 1; }

# Run database migrations
echo "Running database migrations..."
DATABASE_URL=postgresql://postgres:[email protected]:5432/postgres diesel migration run || { echo "Migration failed"; exit 1; }

# Run tests
echo "Running tests..."
DATABASE_URL=postgresql://postgres:[email protected]:5432/postgres cargo test -- --test-threads=1 || { echo "Tests failed"; exit 1; }

# Navigate back to the compose directory
cd ../compose || { echo "Compose directory not found"; exit 1; }

# Stop and clean up containers
echo "Stopping and cleaning up containers..."
docker compose down || { echo "Failed to clean up containers"; exit 1; }

echo "Script completed successfully!"
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE TABLE "run" (

-- CreateTable
CREATE TABLE "data" (
"values" DOUBLE PRECISION[],
"values" REAL [] NOT NULL check ("values" <> '{}' AND array_position("values", NULL) IS NULL),
"dataTypeName" TEXT NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"runId" INTEGER NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/data_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn get_data(
State(pool): State<PoolHandle>,
Path((data_type_name, run_id)): Path<(String, i32)>,
) -> Result<Json<Vec<PublicData>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let data = data_service::get_data(&mut db, data_type_name, run_id).await?;

// map data to frontend data types according to the From func of the client struct
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/data_type_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
pub async fn get_all_data_types(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicDataType>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let data_types = data_type_service::get_all_data_types(&mut db).await?;

let transformed_data_types: Vec<PublicDataType> =
Expand Down
9 changes: 6 additions & 3 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn insert_file(
mut multipart: Multipart,
) -> Result<String, ScyllaError> {
// create a run ID cache
let mut db = pool.get()?;
let mut db = pool.get().await?;
debug!("Warming up run ID map!");
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
Expand All @@ -45,9 +45,12 @@ pub async fn insert_file(

// iterate through all files
debug!("Converting file data to insertable data!");
while let Some(field) = multipart.next_field().await.unwrap() {
while let Ok(Some(field)) = multipart.next_field().await {
// round up all of the protobuf segments as a giant list
let data = field.bytes().await.unwrap();
let Ok(data) = field.bytes().await else {
warn!("Could not decode file insert, perhaps it was interrupted!");
continue;
};
let mut count_bad_run = 0usize;
let mut insertable_data: Vec<ClientData> = Vec::new();
{
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
pub async fn get_all_runs(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicRun>>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::get_all_runs(&mut db).await?;

let transformed_run_data: Vec<PublicRun> = run_data.into_iter().map(PublicRun::from).collect();
Expand All @@ -26,7 +26,7 @@ pub async fn get_run_by_id(
State(pool): State<PoolHandle>,
Path(run_id): Path<i32>,
) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::get_run_by_id(&mut db, run_id).await?;

if run_data.is_none() {
Expand All @@ -43,7 +43,7 @@ pub async fn get_run_by_id(
/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
pub async fn new_run(State(pool): State<PoolHandle>) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get()?;
let mut db = pool.get().await?;
let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?;

crate::RUN_ID.store(run_data.id, Ordering::Relaxed);
Expand Down
Loading
Loading