Skip to content

Commit

Permalink
Fix Backend Build
Browse files Browse the repository at this point in the history
  • Loading branch information
Peyton-McKee committed Jan 14, 2025
1 parent 91d911e commit bd01681
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
-- CreateTable
CREATE TABLE "run" (
"id" TEXT NOT NULL,
"runId" SERIAL NOT NULL,
"driverName" TEXT NOT NULL,
"notes" TEXT NOT NULL,
"driverName" TEXT NOT NULL DEFAULT '',
"locationName" TEXT NOT NULL DEFAULT '',
"notes" TEXT NOT NULL DEFAULT '',
"time" TIMESTAMPTZ NOT NULL,

CONSTRAINT "run_pkey" PRIMARY KEY ("id")
CONSTRAINT "run_pkey" PRIMARY KEY ("runId")
);

-- CreateTable
CREATE TABLE "data" (
"values" DOUBLE PRECISION[],
"values" REAL[] NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"dataTypeName" TEXT NOT NULL,
"runId" TEXT NOT NULL,
"runId" INTEGER NOT NULL,

CONSTRAINT "data_pkey" PRIMARY KEY ("time","dataTypeName")
);
Expand All @@ -32,4 +32,4 @@ CREATE TABLE "data_type" (
ALTER TABLE "data" ADD CONSTRAINT "data_dataTypeName_fkey" FOREIGN KEY ("dataTypeName") REFERENCES "data_type"("name") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "data" ADD CONSTRAINT "data_runId_fkey" FOREIGN KEY ("runId") REFERENCES "run"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
ALTER TABLE "data" ADD CONSTRAINT "data_runId_fkey" FOREIGN KEY ("runId") REFERENCES "run"("runId") ON DELETE RESTRICT ON UPDATE CASCADE;
16 changes: 8 additions & 8 deletions charybdis/local-prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ datasource db {
}

model run {
id String @id @default(uuid())
runId Int @default(autoincrement())
driverName String
notes String
time DateTime @db.Timestamptz()
runId Int @id @default(autoincrement())
driverName String @default("")
locationName String @default("")
notes String @default("")
time DateTime @db.Timestamptz()
data data[]
}

model data {
values Float[]
values Float[] @db.Real
time DateTime @db.Timestamptz()
dataTypeName String
dataType data_type @relation(fields: [dataTypeName], references: [name])
runId String
run run @relation(fields: [runId], references: [id])
runId Int
run run @relation(fields: [runId], references: [runId])
@@id([time, dataTypeName])
}
Expand Down
14 changes: 7 additions & 7 deletions scylla-server/migrations/2024-11-10-031516_create_all/up.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
-- CreateTable
CREATE TABLE "run" (
"id" TEXT NOT NULL,
"runId" SERIAL NOT NULL,
"driverName" TEXT NOT NULL,
"notes" TEXT NOT NULL,
"driverName" TEXT NOT NULL DEFAULT '',
"locationName" TEXT NOT NULL DEFAULT '',
"notes" TEXT NOT NULL DEFAULT '',
"time" TIMESTAMPTZ NOT NULL,

CONSTRAINT "run_pkey" PRIMARY KEY ("id")
CONSTRAINT "run_pkey" PRIMARY KEY ("runId")
);

-- CreateTable
CREATE TABLE "data" (
"values" DOUBLE PRECISION[],
"values" REAL[] NOT NULL,
"time" TIMESTAMPTZ NOT NULL,
"dataTypeName" TEXT NOT NULL,
"runId" TEXT NOT NULL,
"runId" INTEGER NOT NULL,

CONSTRAINT "data_pkey" PRIMARY KEY ("time","dataTypeName")
);
Expand All @@ -32,4 +32,4 @@ CREATE TABLE "data_type" (
ALTER TABLE "data" ADD CONSTRAINT "data_dataTypeName_fkey" FOREIGN KEY ("dataTypeName") REFERENCES "data_type"("name") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "data" ADD CONSTRAINT "data_runId_fkey" FOREIGN KEY ("runId") REFERENCES "run"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
ALTER TABLE "data" ADD CONSTRAINT "data_runId_fkey" FOREIGN KEY ("runId") REFERENCES "run"("runId") ON DELETE RESTRICT ON UPDATE CASCADE;
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn insert_file(
let mut run_iter = run_service::get_all_runs(&mut db)
.await?
.into_iter()
.map(|f| (f.id, f.time.timestamp_micros() as u64))
.map(|f| (f.runId, f.time.timestamp_micros() as u64))
.peekable();
let mut run_rng: RangeInclusiveMap<u64, i32> = RangeInclusiveMap::new();
// this actual formulates the list, where keys are ranges of times (us) and the values are the run IDs
Expand Down
2 changes: 1 addition & 1 deletion scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn new_run(State(pool): State<PoolHandle>) -> Result<Json<PublicRun>,
let mut db = pool.get().await?;
let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?;

crate::RUN_ID.store(run_data.id, Ordering::Relaxed);
crate::RUN_ID.store(run_data.runId, Ordering::Relaxed);
tracing::info!(
"Starting new run with ID: {}",
crate::RUN_ID.load(Ordering::Relaxed)
Expand Down
26 changes: 2 additions & 24 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use tokio::{sync::mpsc::Sender, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};

use crate::services::{data_service, data_type_service, run_service};
use crate::{ClientData, PoolHandle, RUN_ID};
use crate::services::{data_service, data_type_service};
use crate::{ClientData, PoolHandle};

/// A few threads to manage the processing and inserting of special types,
/// upserting of metadata for data, and batch uploading the database
Expand Down Expand Up @@ -227,28 +227,6 @@ impl DbHandler {
self.datatype_list.insert(msg.name.clone());
}

// Check for GPS points, insert them into current run if available
if msg.name == "TPU/GPS/Location" {
debug!("Upserting run with location points!");
let Ok(mut database) = self.pool.get().await else {
warn!("Could not get connection for db points update");
return;
};
// ensure lat AND long present in message, just a sanity check
if msg.values.len() < 2 {
warn!("GPS message found without both lat and long!");
} else if let Err(err) = run_service::update_run_with_coords(
&mut database,
RUN_ID.load(std::sync::atomic::Ordering::Relaxed),
msg.values[0].into(),
msg.values[1].into(),
)
.await
{
warn!("DB error run gps points upsert: {:?}", err);
}
}

// no matter what, batch upload the message
trace!("Pushing msg to queue: {:?}", msg);
self.data_queue.push(msg);
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.expect("Could not create initial run!");
debug!("Configuring current run: {:?}", curr_run);

RUN_ID.store(curr_run.id, Ordering::Relaxed);
RUN_ID.store(curr_run.runId, Ordering::Relaxed);
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
Expand All @@ -226,7 +226,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
token.clone(),
MqttProcessorOptions {
mqtt_path: cli.siren_host_url,
initial_run: curr_run.id,
initial_run: curr_run.runId,
static_rate_limit_time: cli.static_rate_limit_value,
rate_limit_mode: cli.rate_limit_mode,
upload_ratio: cli.socketio_discard_percent,
Expand Down
16 changes: 7 additions & 9 deletions scylla-server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use diesel::prelude::*;
use serde::Serialize;

/// Use this struct when querying data
#[derive(Queryable, Debug, Identifiable, Insertable, Selectable, Serialize, AsChangeset)]
#[derive(Debug, Identifiable, Insertable, Selectable, Serialize, AsChangeset, Queryable)]
#[diesel(table_name = crate::schema::data)]
#[diesel(belongs_to(DataType, foreign_key = dataTypeName))]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(primary_key(dataTypeName, time))]
pub struct Data {
pub values: Vec<Option<f32>>,
pub dataTypeName: String,
pub time: DateTime<Utc>,
pub dataTypeName: String,
pub runId: i32,
}

Expand All @@ -33,7 +33,7 @@ pub struct DataInsert {
}

#[derive(Queryable, Debug, Identifiable, Insertable, Selectable, Serialize, AsChangeset)]
#[diesel(table_name = crate::schema::dataType)]
#[diesel(table_name = crate::schema::data_type)]
#[diesel(primary_key(name))]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct DataType {
Expand All @@ -44,14 +44,12 @@ pub struct DataType {

#[derive(Queryable, Debug, Identifiable, Insertable, Selectable, Serialize, AsChangeset)]
#[diesel(table_name = crate::schema::run)]
#[diesel(primary_key(id))]
#[diesel(primary_key(runId))]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Run {
pub id: i32,
pub locationName: Option<String>,
pub latitude: Option<f64>,
pub longitude: Option<f64>,
pub driverName: Option<String>,
pub runId: i32,
pub driverName: String,
pub locationName: String,
pub notes: String,
pub time: DateTime<Utc>,
}
8 changes: 4 additions & 4 deletions scylla-server/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

diesel::table! {
data (time, dataTypeName) {
values -> Nullable<Array<Nullable<Float8>>>,
values -> Array<Nullable<Float4>>,
time -> Timestamptz,
dataTypeName -> Text,
runId -> Text,
runId -> Int4,
}
}

Expand All @@ -18,10 +18,10 @@ diesel::table! {
}

diesel::table! {
run (id) {
id -> Text,
run (runId) {
runId -> Int4,
driverName -> Text,
locationName -> Text,
notes -> Text,
time -> Timestamptz,
}
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/services/data_type_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{models::DataType, schema::dataType::dsl::*, Database};
use crate::{models::DataType, schema::data_type::dsl::*, Database};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;

Expand All @@ -8,7 +8,7 @@ use diesel_async::RunQueryDsl;
pub async fn get_all_data_types(
db: &mut Database<'_>,
) -> Result<Vec<DataType>, diesel::result::Error> {
dataType.load(db).await
data_type.load(db).await
}

/// Upserts a datatype, either creating or updating one depending on its existence
Expand All @@ -28,7 +28,7 @@ pub async fn upsert_data_type(
unit: new_unit,
nodeName: node_name,
};
diesel::insert_into(dataType)
diesel::insert_into(data_type)
.values(&val)
.on_conflict(name)
.do_update() // actually allows for the upsert ability
Expand Down
21 changes: 2 additions & 19 deletions scylla-server/src/services/run_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use diesel_async::RunQueryDsl;
/// * `db` - The prisma client to make the call to
/// returns: A result containing the data or the QueryError propogated by the db
pub async fn get_all_runs(db: &mut Database<'_>) -> Result<Vec<Run>, diesel::result::Error> {
run.order(id.asc()).get_results(db).await
run.order(runId.asc()).get_results(db).await
}

/// Gets a single run by its id
Expand Down Expand Up @@ -46,24 +46,7 @@ pub async fn create_run_with_id(
run_id: i32,
) -> Result<Run, diesel::result::Error> {
diesel::insert_into(run)
.values((time.eq(timestamp), id.eq(run_id)))
.get_result(db)
.await
}

/// Updates a run with GPS points
/// * `db` - The prisma client to make the call to
/// * `run_id` - The run id to upsert
/// * `lat` - The latitude
/// * `long` - The longitude
pub async fn update_run_with_coords(
db: &mut Database<'_>,
run_id: i32,
lat: f64,
long: f64,
) -> Result<Run, diesel::result::Error> {
diesel::update(run.filter(id.eq(run_id)))
.set((latitude.eq(lat), longitude.eq(long)))
.values((time.eq(timestamp), runId.eq(run_id)))
.get_result(db)
.await
}
6 changes: 3 additions & 3 deletions scylla-server/src/transformers/run_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub struct PublicRun {
impl From<crate::models::Run> for PublicRun {
fn from(value: crate::models::Run) -> Self {
PublicRun {
id: value.id,
location_name: value.locationName.unwrap_or_default(),
driver_name: value.driverName.unwrap_or_default(),
id: value.runId,
driver_name: value.driverName,
location_name: value.locationName,
time_ms: value.time.timestamp_millis(),
notes: value.notes,
}
Expand Down

0 comments on commit bd01681

Please sign in to comment.