diff --git a/.github/workflows/client-build.yml b/.github/workflows/client-build.yml index cc5fa273..b7e6fdac 100644 --- a/.github/workflows/client-build.yml +++ b/.github/workflows/client-build.yml @@ -57,6 +57,6 @@ jobs: provenance: false build-args: | PROD=true - BACKEND_URL=http://192.168.100.1:8000 + BACKEND_URL=http://192.168.100.11:8000 MAP_ACCESS_TOKEN=pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw diff --git a/scylla-server/Cargo.lock b/scylla-server/Cargo.lock index b3704400..62c335f8 100755 --- a/scylla-server/Cargo.lock +++ b/scylla-server/Cargo.lock @@ -3225,6 +3225,7 @@ version = "0.0.1" dependencies = [ "axum 0.7.5", "axum-extra", + "chrono", "clap", "console-subscriber", "prisma-client-rust", @@ -3234,6 +3235,7 @@ dependencies = [ "ringbuffer", "rumqttc", "serde", + "serde_json", "socketioxide", "tokio", "tokio-util", @@ -3311,12 +3313,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.117" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "indexmap 2.2.6", "itoa", + "memchr", "ryu", "serde", ] diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index 4980650d..7539bd4c 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -23,6 +23,8 @@ console-subscriber = { version = "0.3.0", optional = true } ringbuffer = "0.15.0" clap = { version = "4.5.11", features = ["derive", "env"] } axum-extra = { version = "0.9.3", features = ["query"] } +chrono = { version = "0.4.38", features = ["serde"] } +serde_json = "1.0.128" [features] top = ["dep:console-subscriber"] diff --git a/scylla-server/prisma/seed.rs b/scylla-server/prisma/seed.rs index 6ed5f8f1..3129c951 100644 --- a/scylla-server/prisma/seed.rs +++ b/scylla-server/prisma/seed.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use prisma_client_rust::{chrono, QueryError}; +use prisma_client_rust::QueryError; use scylla_server::{ prisma::PrismaClient, processors::ClientData, @@ -35,8 +35,7 @@ async fn main() -> Result<(), QueryError> { client.system().delete_many(vec![]).exec().await?; - let created_run = - run_service::create_run(&client, chrono::offset::Utc::now().timestamp_millis()).await?; + let created_run = run_service::create_run(&client, chrono::offset::Utc::now()).await?; system_service::upsert_system(&client, "Data And Controls".to_string(), created_run.id).await?; driver_service::upsert_driver(&client, "Fergus".to_string(), created_run.id).await?; @@ -67,72 +66,72 @@ async fn main() -> Result<(), QueryError> { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["20".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis(), + values: vec![20f32], + timestamp: chrono::offset::Utc::now(), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["21".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 1000, + values: vec![21f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(1000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["22".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 2000, + values: vec![22f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(2000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["17".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 3000, + values: vec![17f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(3000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["25".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 4000, + values: vec![17f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(4000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["30".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 5000, + values: vec![17f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(5000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["38".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 6000, + values: vec![17f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(6000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["32".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 7000, + values: vec![17f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(7000), node: "BMS".to_string(), }, ClientData { run_id: created_run.id, name: "Pack-Temp".to_string(), unit: "C".to_string(), - values: vec!["26".to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis() + 8000, + values: vec![17f32], + timestamp: chrono::offset::Utc::now() + Duration::from_millis(8000), node: "BMS".to_string(), }, ], @@ -215,8 +214,8 @@ async fn simulate_route(db: Database, curr_run: i32) -> Result<(), QueryError> { run_id: curr_run, name: "Points".to_string(), unit: "Coord".to_string(), - values: vec![inter_lat.to_string(), inter_long.to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis(), + values: vec![inter_lat as f32, inter_long as f32], + timestamp: chrono::offset::Utc::now(), node: "TPU".to_string(), }, ) diff --git a/scylla-server/src/controllers/run_controller.rs b/scylla-server/src/controllers/run_controller.rs index 31cb517c..e4df1e95 100644 --- a/scylla-server/src/controllers/run_controller.rs +++ b/scylla-server/src/controllers/run_controller.rs @@ -2,7 +2,6 @@ use axum::{ extract::{Path, State}, Extension, Json, }; -use prisma_client_rust::chrono; use tokio::sync::mpsc; use tracing::warn; @@ -43,8 +42,7 @@ pub async fn new_run( State(db): State, Extension(channel): Extension>, ) -> Result, ScyllaError> { - let run_data = - run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()).await?; + let run_data = run_service::create_run(&db, chrono::offset::Utc::now()).await?; // notify the mqtt receiver a new run has been created if let Err(err) = channel.send(run_data.clone()).await { diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 5d9c8f87..96326ed8 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -6,7 +6,6 @@ use axum::{ Extension, Router, }; use clap::Parser; -use prisma_client_rust::chrono; use rumqttc::v5::AsyncClient; use scylla_server::{ controllers::{ @@ -196,7 +195,7 @@ async fn main() { None } else { // creates the initial run - let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()) + let curr_run = run_service::create_run(&db, chrono::offset::Utc::now()) .await .expect("Could not create initial run!"); debug!("Configuring current run: {:?}", curr_run); diff --git a/scylla-server/src/processors/db_handler.rs b/scylla-server/src/processors/db_handler.rs index dedd9c59..629b8c70 100644 --- a/scylla-server/src/processors/db_handler.rs +++ b/scylla-server/src/processors/db_handler.rs @@ -18,8 +18,8 @@ use super::{ClientData, LocationData}; /// A struct defining an in progress location packet struct LocLock { location_name: Option, - points: Option<(f64, f64)>, - radius: Option, + points: Option<(f32, f32)>, + radius: Option, } impl LocLock { @@ -37,12 +37,12 @@ impl LocLock { } /// Add points to the packet - pub fn add_points(&mut self, lat: f64, long: f64) { + pub fn add_points(&mut self, lat: f32, long: f32) { self.points = Some((lat, long)); } /// Add a radius to the packet - pub fn add_radius(&mut self, radius: f64) { + pub fn add_radius(&mut self, radius: f32) { self.radius = Some(radius); } @@ -251,10 +251,7 @@ impl DbHandler { debug!("Upserting driver: {:?}", msg.values); if let Err(err) = driver_service::upsert_driver( &self.db, - msg.values - .first() - .unwrap_or(&"PizzaTheHut".to_string()) - .to_string(), + (*msg.values.first().unwrap_or(&0.0f32)).to_string(), msg.run_id, ) .await @@ -264,22 +261,15 @@ impl DbHandler { } "location" => { debug!("Upserting location name: {:?}", msg.values); - self.location_lock.add_loc_name( - msg.values - .first() - .unwrap_or(&"PizzaTheHut".to_string()) - .to_string(), - ); + self.location_lock + .add_loc_name((*msg.values.first().unwrap_or(&0.0f32)).to_string()); self.is_location = true; } "system" => { debug!("Upserting system: {:?}", msg.values); if let Err(err) = system_service::upsert_system( &self.db, - msg.values - .first() - .unwrap_or(&"PizzaTheHut".to_string()) - .to_string(), + (*msg.values.first().unwrap_or(&0.0f32)).to_string(), msg.run_id, ) .await @@ -290,28 +280,15 @@ impl DbHandler { "GPS-Location" => { debug!("Upserting location points: {:?}", msg.values); self.location_lock.add_points( - msg.values - .first() - .unwrap_or(&"PizzaTheHut".to_string()) - .parse::() - .unwrap_or_default(), - msg.values - .get(1) - .unwrap_or(&"PizzaTheHut".to_string()) - .parse::() - .unwrap_or_default(), + *msg.values.first().unwrap_or(&0.0f32), + *msg.values.get(1).unwrap_or(&0.0f32), ); self.is_location = true; } "Radius" => { debug!("Upserting location radius: {:?}", msg.values); - self.location_lock.add_radius( - msg.values - .first() - .unwrap_or(&"PizzaTheHut".to_string()) - .parse::() - .unwrap_or_default(), - ); + self.location_lock + .add_radius(*msg.values.first().unwrap_or(&0.0f32)); self.is_location = true; } _ => {} @@ -324,9 +301,9 @@ impl DbHandler { if let Err(err) = location_service::upsert_location( &self.db, loc.location_name, - loc.lat, - loc.long, - loc.radius, + loc.lat as f64, + loc.long as f64, + loc.radius as f64, msg.run_id, ) .await diff --git a/scylla-server/src/processors/mock_data.rs b/scylla-server/src/processors/mock_data.rs index 0ca1c447..e557f3bd 100644 --- a/scylla-server/src/processors/mock_data.rs +++ b/scylla-server/src/processors/mock_data.rs @@ -1,4 +1,4 @@ -use super::mock_processor::{MockData, MockStringData}; +use super::mock_processor::MockData; pub const BASE_MOCK_DATA: [MockData; 17] = [ MockData { @@ -121,16 +121,3 @@ pub const BASE_MOCK_DATA: [MockData; 17] = [ max: 600.0, }, ]; - -pub const BASE_MOCK_STRING_DATA: [MockStringData; 2] = [ - MockStringData { - name: "Driver", - unit: "String", - vals: "Fergus", - }, - MockStringData { - name: "Location", - unit: "String", - vals: "Max", - }, -]; diff --git a/scylla-server/src/processors/mock_processor.rs b/scylla-server/src/processors/mock_processor.rs index c6f8b789..4757ba4f 100644 --- a/scylla-server/src/processors/mock_processor.rs +++ b/scylla-server/src/processors/mock_processor.rs @@ -1,47 +1,32 @@ use std::time::Duration; -use prisma_client_rust::{chrono, serde_json}; use rand::Rng; use socketioxide::SocketIo; use tracing::warn; -use super::{ - mock_data::{BASE_MOCK_DATA, BASE_MOCK_STRING_DATA}, - ClientData, -}; +use super::{mock_data::BASE_MOCK_DATA, ClientData}; #[derive(Clone, Copy)] pub struct MockData { pub name: &'static str, pub unit: &'static str, pub num_of_vals: u8, - pub min: f64, - pub max: f64, + pub min: f32, + pub max: f32, } impl MockData { - fn get_values(&self) -> Vec { - let mut val_vec: Vec = vec![]; + fn get_values(&self) -> Vec { + let mut val_vec: Vec = vec![]; // for each point, get a random number in the range for _ in 0..self.num_of_vals { - val_vec.push( - rand::thread_rng() - .gen_range((self.min)..(self.max)) - .to_string(), - ); + val_vec.push(rand::thread_rng().gen_range((self.min)..(self.max))); } val_vec } } -#[derive(Clone, Copy)] -pub struct MockStringData { - pub name: &'static str, - pub unit: &'static str, - pub vals: &'static str, -} - pub struct MockProcessor { curr_run: i32, io: SocketIo, @@ -54,33 +39,18 @@ impl MockProcessor { pub async fn generate_mock(self) { loop { - // get a random mock datapoint the first 0 to len of number mock data is for the non string and x to len of string mocks is a string mock index. - let index = rand::thread_rng() - .gen_range(0..(BASE_MOCK_DATA.len() + BASE_MOCK_STRING_DATA.len())); + // get a random mock datapoint the first 0 to len of number mock data + let index = rand::thread_rng().gen_range(0..(BASE_MOCK_DATA.len())); - // if we are doing non-string mock this loop - let client_data: ClientData = if index < BASE_MOCK_DATA.len() { - let dat = BASE_MOCK_DATA[index]; + let dat = BASE_MOCK_DATA[index]; - ClientData { - run_id: self.curr_run, - name: dat.name.to_string(), - unit: dat.unit.to_string(), - values: dat.get_values(), - timestamp: chrono::offset::Utc::now().timestamp_millis(), - node: "".to_string(), // uneeded for socket use only - } - // do a string mock - } else { - let dat = BASE_MOCK_STRING_DATA[index - BASE_MOCK_DATA.len()]; - ClientData { - run_id: self.curr_run, - name: dat.name.to_string(), - unit: dat.unit.to_string(), - values: vec![dat.vals.to_string()], - timestamp: chrono::offset::Utc::now().timestamp_millis(), - node: "".to_string(), // uneeded for socket use only - } + let client_data: ClientData = ClientData { + run_id: self.curr_run, + name: dat.name.to_string(), + unit: dat.unit.to_string(), + values: dat.get_values(), + timestamp: chrono::offset::Utc::now(), + node: "".to_string(), // uneeded for socket use only }; match self.io.emit( diff --git a/scylla-server/src/processors/mod.rs b/scylla-server/src/processors/mod.rs index afb3bbef..a1f9b84a 100644 --- a/scylla-server/src/processors/mod.rs +++ b/scylla-server/src/processors/mod.rs @@ -1,3 +1,6 @@ +use chrono::serde::ts_milliseconds; +use chrono::{DateTime, Utc}; + pub mod db_handler; mod mock_data; pub mod mock_processor; @@ -13,9 +16,12 @@ pub struct ClientData { pub run_id: i32, pub name: String, pub unit: String, - pub values: Vec, - pub timestamp: i64, + pub values: Vec, + /// Client expects time in milliseconds, so serialize as such + #[serde(with = "ts_milliseconds")] + pub timestamp: DateTime, + /// client doesnt parse node #[serde(skip_serializing)] pub node: String, } @@ -25,7 +31,7 @@ pub struct ClientData { #[derive(Debug)] struct LocationData { location_name: String, - lat: f64, - long: f64, - radius: f64, + lat: f32, + long: f32, + radius: f32, } diff --git a/scylla-server/src/processors/mqtt_processor.rs b/scylla-server/src/processors/mqtt_processor.rs index 78153d98..5f366c2f 100644 --- a/scylla-server/src/processors/mqtt_processor.rs +++ b/scylla-server/src/processors/mqtt_processor.rs @@ -4,7 +4,7 @@ use std::{ time::{Duration, SystemTime}, }; -use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json}; +use prisma_client_rust::bigdecimal::ToPrimitive; use protobuf::Message; use ringbuffer::RingBuffer; use rumqttc::v5::{ @@ -25,7 +25,6 @@ use crate::{ }; use super::ClientData; -use std::borrow::Cow; /// The chief processor of incoming mqtt data, this handles /// - mqtt state @@ -154,7 +153,7 @@ impl MqttProcessor { Some(msg) => msg, None => continue }; - latency_ringbuffer.push(chrono::offset::Utc::now().timestamp_millis() - msg.timestamp); + latency_ringbuffer.push((chrono::offset::Utc::now() - msg.timestamp).num_milliseconds()); self.send_db_msg(msg.clone()).await; self.send_socket_msg(msg, &mut upload_counter); }, @@ -169,8 +168,8 @@ impl MqttProcessor { node: "Internal".to_string(), unit: "".to_string(), run_id: self.curr_run, - timestamp: chrono::offset::Utc::now().timestamp_millis(), - values: vec![sockets.len().to_string()] + timestamp: chrono::offset::Utc::now(), + values: vec![sockets.len() as f32] }; self.send_socket_msg(client_data, &mut upload_counter); } else { @@ -190,10 +189,10 @@ impl MqttProcessor { node: "Internal".to_string(), unit: "ms".to_string(), run_id: self.curr_run, - timestamp: chrono::offset::Utc::now().timestamp_millis(), - values: vec![avg_latency.to_string()] + timestamp: chrono::offset::Utc::now(), + values: vec![avg_latency as f32] }; - trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&"n/a".to_string())); + trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&0.0f32)); self.send_socket_msg(client_data, &mut upload_counter); } Some(new_run) = self.new_run_channel.recv() => { @@ -254,48 +253,69 @@ impl MqttProcessor { let data_type = split.1.replace('/', "-"); - // extract the unix time, returning the current time instead if either the "ts" user property isnt present or it isnt parsable - // note the Cow magic involves the return from the map is a borrow, but the unwrap cannot as we dont own it - let unix_time = msg - .properties - .unwrap_or_default() - .user_properties - .iter() - .map(Cow::Borrowed) - .find(|f| f.0 == "ts") - .unwrap_or_else(|| { - debug!("Could not find timestamp in mqtt, using system time"); - Cow::Owned(( - "ts".to_string(), - chrono::offset::Utc::now().timestamp_millis().to_string(), - )) - }) - .1 - .parse::() - .unwrap_or_else(|err| { - warn!("Invalid timestamp in mqtt, using system time: {}", err); - chrono::offset::Utc::now().timestamp_millis() - }); - - // ts check for bad sources of time which may return 1970 - // if both system time and packet timestamp are before year 2000, the message cannot be recorded - let unix_clean = if unix_time < 963014966000 { - debug!("Timestamp before year 2000: {}", unix_time); - let sys_time = chrono::offset::Utc::now().timestamp_millis(); - if sys_time < 963014966000 { - warn!("System has no good time, discarding message!"); + // extract the unix time + // levels of time priority + // - A: The time packaged in the protobuf, to microsecond precision + // - B: The time packaged in the MQTT header, to millisecond precision (hence the * 1000 on B) + // - C: The local scylla system time + // note protobuf defaults to 0 for unfilled time, so consider it as an unset time + let unix_time = if data.time > 0 { + // A + let Some(unix_time) = chrono::DateTime::from_timestamp_micros(data.time as i64) else { + warn!( + "Corrupted time in protobuf: {}, discarding message!", + data.time + ); return None; - } - sys_time - } else { + }; unix_time + } else { + // B + match match msg + .properties + .unwrap_or_default() + .user_properties + .iter() + .find(|f| f.0 == "ts") + { + Some(val) => { + let Ok(time_parsed) = val.1.parse::() else { + warn!("Corrupted time in mqtt header, discarding message!"); + return None; + }; + chrono::DateTime::from_timestamp_millis(time_parsed) + } + None => None, + } { + Some(e) => e, + None => { + // C + debug!("Could not extract time, using system time!"); + chrono::offset::Utc::now() + } + } }; + // ts check for bad sources of time which may return 1970 + // if both system time and packet timestamp are before year 2000, the message cannot be recorded + let unix_clean = + if unix_time < chrono::DateTime::from_timestamp_millis(963014966000).unwrap() { + debug!("Timestamp before year 2000: {}", unix_time.to_string()); + let sys_time = chrono::offset::Utc::now(); + if sys_time < chrono::DateTime::from_timestamp_millis(963014966000).unwrap() { + warn!("System has no good time, discarding message!"); + return None; + } + sys_time + } else { + unix_time + }; + Some(ClientData { run_id: self.curr_run, name: data_type, unit: data.unit, - values: data.value, + values: data.values, timestamp: unix_clean, node: node.to_string(), }) diff --git a/scylla-server/src/proto/serverdata.proto b/scylla-server/src/proto/serverdata.proto index 98f812f0..a9b7e44c 100644 --- a/scylla-server/src/proto/serverdata.proto +++ b/scylla-server/src/proto/serverdata.proto @@ -3,6 +3,12 @@ syntax = "proto3"; package serverdata.v1; message ServerData { - repeated string value = 1; + // ensure old type is reserved + reserved 1; + reserved "value"; + string unit = 2; + // time since unix epoch in MICROSECONDS + uint64 time = 3; + repeated float values = 4; } diff --git a/scylla-server/src/services/data_service.rs b/scylla-server/src/services/data_service.rs index dd5c212e..1cfb5387 100644 --- a/scylla-server/src/services/data_service.rs +++ b/scylla-server/src/services/data_service.rs @@ -1,4 +1,4 @@ -use prisma_client_rust::{chrono::DateTime, QueryError}; +use prisma_client_rust::QueryError; use crate::{prisma, processors::ClientData, Database}; @@ -43,16 +43,10 @@ pub async fn add_data( db.data() .create( prisma::data_type::name::equals(client_data.name), - DateTime::from_timestamp_millis(client_data.timestamp) - .expect("Could not parse timestamp") - .fixed_offset(), + client_data.timestamp.fixed_offset(), prisma::run::id::equals(client_data.run_id), vec![prisma::data::values::set( - client_data - .values - .iter() - .map(|f| f.parse::().unwrap_or_default()) - .collect(), + client_data.values.iter().map(|f| *f as f64).collect(), )], ) .select(public_data::select()) @@ -72,15 +66,10 @@ pub async fn add_many(db: &Database, client_data: Vec) -> Result().unwrap_or_default()) - .collect(), + f.values.iter().map(|f| *f as f64).collect(), )], ) }) diff --git a/scylla-server/src/services/run_service.rs b/scylla-server/src/services/run_service.rs index 5520851e..b775e36b 100644 --- a/scylla-server/src/services/run_service.rs +++ b/scylla-server/src/services/run_service.rs @@ -1,6 +1,7 @@ use std::vec; -use prisma_client_rust::{chrono::DateTime, QueryError}; +use chrono::{DateTime, Utc}; +use prisma_client_rust::QueryError; use crate::{ prisma::{self}, @@ -43,16 +44,14 @@ pub async fn get_run_by_id( /// Creates a run /// * `db` - The prisma client to make the call to -/// * `timestamp` - The unix time since epoch in miliseconds when the run starts +/// * `timestamp` - time when the run starts /// returns: A result containing the data or the QueryError propogated by the db -pub async fn create_run(db: &Database, timestamp: i64) -> Result { +pub async fn create_run( + db: &Database, + timestamp: DateTime, +) -> Result { db.run() - .create( - DateTime::from_timestamp_millis(timestamp) - .expect("Could not parse timestamp") - .fixed_offset(), - vec![], - ) + .create(timestamp.fixed_offset(), vec![]) .select(public_run::select()) .exec() .await @@ -60,21 +59,16 @@ pub async fn create_run(db: &Database, timestamp: i64) -> Result, run_id: i32, ) -> Result { db.run() - .create( - DateTime::from_timestamp_millis(timestamp) - .expect("Could not parse timestamp") - .fixed_offset(), - vec![prisma::run::id::set(run_id)], - ) + .create(timestamp.fixed_offset(), vec![prisma::run::id::set(run_id)]) .select(public_run::select()) .exec() .await diff --git a/scylla-server/src/transformers/data_transformer.rs b/scylla-server/src/transformers/data_transformer.rs index 6fd20b3d..3728e26a 100644 --- a/scylla-server/src/transformers/data_transformer.rs +++ b/scylla-server/src/transformers/data_transformer.rs @@ -1,19 +1,42 @@ +use std::cmp::Ordering; + use serde::Serialize; use crate::{processors::ClientData, services::data_service}; /// The struct defining the data format sent to the client -#[derive(Serialize, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Serialize, Debug)] pub struct PublicData { + /// time in MILLISECONDS pub time: i64, - pub values: Vec, + pub values: Vec, +} +// custom impls to avoid comparing values fields +impl Ord for PublicData { + fn cmp(&self, other: &Self) -> Ordering { + self.time.cmp(&other.time) + } +} + +impl PartialOrd for PublicData { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } } +impl PartialEq for PublicData { + fn eq(&self, other: &Self) -> bool { + self.time == other.time + } +} + +impl Eq for PublicData {} + /// convert the prisma type to the client type for JSON encoding impl From<&data_service::public_data::Data> for PublicData { fn from(value: &data_service::public_data::Data) -> Self { PublicData { - values: value.values.iter().map(f64::to_string).collect(), + values: value.values.clone(), time: value.time.timestamp_millis(), } } @@ -23,8 +46,8 @@ impl From<&data_service::public_data::Data> for PublicData { impl From for PublicData { fn from(value: ClientData) -> Self { PublicData { - time: value.timestamp, - values: value.values, + time: value.timestamp.timestamp_millis(), + values: value.values.iter().map(|f| *f as f64).collect(), } } } diff --git a/scylla-server/src/transformers/run_transformer.rs b/scylla-server/src/transformers/run_transformer.rs index b1e18b6b..55791b9a 100644 --- a/scylla-server/src/transformers/run_transformer.rs +++ b/scylla-server/src/transformers/run_transformer.rs @@ -15,6 +15,7 @@ pub struct PublicRun { pub driver_name: String, #[serde(rename = "systemName")] pub system_name: String, + /// time in MILLISECONDS pub time: i64, } diff --git a/scylla-server/tests/data_service_test.rs b/scylla-server/tests/data_service_test.rs index 68c027fe..6bcc6bb5 100644 --- a/scylla-server/tests/data_service_test.rs +++ b/scylla-server/tests/data_service_test.rs @@ -15,7 +15,8 @@ const TEST_KEYWORD: &str = "test"; async fn test_data_service() -> Result<(), QueryError> { let db = cleanup_and_prepare().await?; - run_service::create_run_with_id(&db, 0, 0).await?; + run_service::create_run_with_id(&db, chrono::DateTime::from_timestamp_millis(0).unwrap(), 0) + .await?; node_service::upsert_node(&db, TEST_KEYWORD.to_owned()).await?; data_type_service::upsert_data_type( &db, @@ -41,16 +42,17 @@ async fn test_data_add() -> Result<(), QueryError> { TEST_KEYWORD.to_owned(), ) .await?; - let run_data = run_service::create_run(&db, 999).await?; + let run_data = + run_service::create_run(&db, chrono::DateTime::from_timestamp_millis(999).unwrap()).await?; let data = data_service::add_data( &db, ClientData { - values: vec!["0".to_owned()], + values: vec![0f32], unit: "A".to_owned(), run_id: run_data.id, name: TEST_KEYWORD.to_owned(), - timestamp: 1000, + timestamp: chrono::DateTime::from_timestamp_millis(1000).unwrap(), node: "Irrelevant".to_string(), }, ) @@ -60,7 +62,7 @@ async fn test_data_add() -> Result<(), QueryError> { PublicData::from(&data), PublicData { time: 1000, - values: vec!["0".to_owned()] + values: vec![0f64] } ); @@ -87,11 +89,11 @@ async fn test_data_no_prereqs() -> Result<(), QueryError> { data_service::add_data( &db, ClientData { - values: vec!["0".to_owned()], + values: vec![0f32], unit: "A".to_owned(), run_id: 0, name: TEST_KEYWORD.to_owned(), - timestamp: 1000, + timestamp: chrono::DateTime::from_timestamp_millis(1000).unwrap(), node: "Irrelevant".to_string(), }, ) @@ -107,17 +109,22 @@ async fn test_data_no_prereqs() -> Result<(), QueryError> { TEST_KEYWORD.to_owned(), ) .await?; - run_service::create_run_with_id(&db, 1000, 0).await?; + run_service::create_run_with_id( + &db, + chrono::DateTime::from_timestamp_millis(1000).unwrap(), + 0, + ) + .await?; // now shouldnt fail as it and node does exist data_service::add_data( &db, ClientData { - values: vec!["0".to_owned()], + values: vec![0f32], unit: "A".to_owned(), run_id: 0, name: TEST_KEYWORD.to_owned(), - timestamp: 1000, + timestamp: chrono::DateTime::from_timestamp_millis(1000).unwrap(), node: "Irrelevant".to_string(), }, ) diff --git a/scylla-server/tests/driver_service_test.rs b/scylla-server/tests/driver_service_test.rs index f5233c72..c15ed83f 100644 --- a/scylla-server/tests/driver_service_test.rs +++ b/scylla-server/tests/driver_service_test.rs @@ -24,7 +24,9 @@ async fn test_create_driver() -> Result<(), QueryError> { driver_service::upsert_driver( &db, TEST_KEYWORD.to_owned(), - run_service::create_run(&db, 10001).await?.id, + run_service::create_run(&db, chrono::DateTime::from_timestamp_millis(1001).unwrap()) + .await? + .id, ) .await?; diff --git a/scylla-server/tests/location_service_test.rs b/scylla-server/tests/location_service_test.rs index 72f476aa..ebd90931 100644 --- a/scylla-server/tests/location_service_test.rs +++ b/scylla-server/tests/location_service_test.rs @@ -20,7 +20,9 @@ async fn test_get_all_locations_and_upsert() -> Result<(), QueryError> { 100.0, 200.0, 300.0, - run_service::create_run(&db, 10001).await?.id, + run_service::create_run(&db, chrono::DateTime::from_timestamp_millis(1001).unwrap()) + .await? + .id, ) .await?; diff --git a/scylla-server/tests/run_service_test.rs b/scylla-server/tests/run_service_test.rs index 1f4aeba5..039a519a 100644 --- a/scylla-server/tests/run_service_test.rs +++ b/scylla-server/tests/run_service_test.rs @@ -20,7 +20,8 @@ async fn test_get_run_by_id() -> Result<(), QueryError> { let db = cleanup_and_prepare().await?; // add a run - let run_c = run_service::create_run(&db, 1).await?; + let run_c = + run_service::create_run(&db, chrono::DateTime::from_timestamp_millis(1).unwrap()).await?; // get that run let run = run_service::get_run_by_id(&db, run_c.id) diff --git a/scylla-server/tests/system_service_test.rs b/scylla-server/tests/system_service_test.rs index 2bf2ee1b..dd27d8cc 100644 --- a/scylla-server/tests/system_service_test.rs +++ b/scylla-server/tests/system_service_test.rs @@ -17,7 +17,8 @@ const TEST_KEYWORD: &str = "test"; async fn test_upsert_system_create() -> Result<(), QueryError> { let db = cleanup_and_prepare().await?; - let run = run_service::create_run(&db, 101).await?; + let run = + run_service::create_run(&db, chrono::DateTime::from_timestamp_millis(101).unwrap()).await?; let _ = system_service::upsert_system(&db, TEST_KEYWORD.to_owned(), run.id).await?; @@ -49,7 +50,9 @@ async fn test_get_upsert_system() -> Result<(), QueryError> { system_service::upsert_system( &db, TEST_KEYWORD.to_owned(), - run_service::create_run(&db, 101).await?.id, + run_service::create_run(&db, chrono::DateTime::from_timestamp_millis(101).unwrap()) + .await? + .id, ) .await?;