From bd9be7e6352f2d0031edcdb265892838405bed82 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Thu, 9 Jan 2025 10:51:25 +0100 Subject: [PATCH] use real QC pipelines --- Cargo.lock | 3 +- Cargo.toml | 1 + ingestion/Cargo.toml | 1 + ingestion/src/lib.rs | 61 +++++++++++++++++++++------------ ingestion/src/main.rs | 43 +++++++++++++---------- ingestion/src/qc_pipelines.rs | 48 ++++++++++++++++++++++++++ qc_pipelines/fresh/TA_PT1H.toml | 46 +++++++++++++++++++++++++ rove_connector/src/lib.rs | 2 +- 8 files changed, 163 insertions(+), 42 deletions(-) create mode 100644 ingestion/src/qc_pipelines.rs create mode 100644 qc_pipelines/fresh/TA_PT1H.toml diff --git a/Cargo.lock b/Cargo.lock index c1364f52..48a5558f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1498,6 +1498,7 @@ dependencies = [ "thiserror", "tokio", "tokio-postgres", + "toml", ] [[package]] @@ -2347,7 +2348,7 @@ dependencies = [ [[package]] name = "rove" version = "0.1.1" -source = "git+https://github.com/metno/rove.git?branch=lard_fixes#1d79a555847f9223c9f6ee459e7a480ff18f41bd" +source = "git+https://github.com/metno/rove.git?branch=lard_fixes#5f70d99d59b0d9d1c8fc301893684c919cab286f" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 3671ffda..f073cf2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ serde = { version = "1.0.217", features = ["derive"] } thiserror = "1.0.69" tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } +toml = "0.8.19" diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 8b9e5fe8..1450c9c5 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -27,3 +27,4 @@ serde.workspace = true thiserror.workspace = true tokio.workspace = true tokio-postgres.workspace = true +toml.workspace = true diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index dcec644e..ecf75574 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -7,9 +7,10 @@ use axum::{ use bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Utc}; +use chronoutil::RelativeDuration; use futures::stream::FuturesUnordered; use futures::StreamExt; -use rove::data_switch::{SpaceSpec, TimeSpec, Timestamp}; +use rove::data_switch::{TimeSpec, Timestamp}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -21,6 +22,7 @@ use tokio_postgres::NoTls; #[cfg(feature = "kafka")] pub mod kvkafka; pub mod permissions; +pub mod qc_pipelines; use permissions::{ParamPermitTable, StationPermitTable}; #[derive(Error, Debug)] @@ -33,6 +35,8 @@ pub enum Error { Parse(String), #[error("qc system returned an error: {0}")] Qc(#[from] rove::scheduler::Error), + #[error("rove connector returned an error: {0}")] + Connector(#[from] rove::data_switch::Error), #[error("RwLock was poisoned: {0}")] Lock(String), #[error("Could not read environment variable: {0}")] @@ -76,7 +80,8 @@ struct IngestorState { db_pool: PgConnectionPool, param_conversions: ParamConversions, // converts param codes to element ids permit_tables: Arc>, - qc_scheduler: Arc, + rove_connector: Arc, + qc_pipelines: Arc>, } impl FromRef for PgConnectionPool { @@ -97,9 +102,15 @@ impl FromRef for Arc for Arc { - fn from_ref(state: &IngestorState) -> Arc { - state.qc_scheduler.clone() +impl FromRef for Arc { + fn from_ref(state: &IngestorState) -> Arc { + state.rove_connector.clone() + } +} + +impl FromRef for Arc> { + fn from_ref(state: &IngestorState) -> Arc> { + state.qc_pipelines.clone() } } @@ -204,8 +215,9 @@ pub async fn insert_data( pub async fn qc_data( chunks: &Vec>, - scheduler: &rove::Scheduler, conn: &mut PooledPgConn<'_>, + rove_connector: &rove_connector::Connector, + pipelines: &HashMap<(i32, RelativeDuration), rove::Pipeline>, ) -> Result<(), Error> { // TODO: see conflict resolution issues on queries in `insert_data` // On periodic or consistency QC pipelines, we should be checking the provenance table to @@ -245,19 +257,19 @@ pub async fn qc_data( for datum in chunk.data.iter() { let time_spec = TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution); - let space_spec = SpaceSpec::One(datum.timeseries_id.to_string()); - // TODO: load and fetch real pipeline - let pipeline = "sample_pipeline"; - let rove_output = scheduler - .validate_direct( - "lard", - &[] as &[&str], + let pipeline = match pipelines.get(&(datum.param_id, time_resolution)) { + Some(pipeline) => pipeline, + None => continue, + }; + let data = rove_connector + .fetch_one( + datum.timeseries_id, &time_spec, - &space_spec, - pipeline, - None, + pipeline.num_leading_required, + pipeline.num_trailing_required, ) .await?; + let rove_output = rove::Scheduler::schedule_tests(pipeline, data)?; let first_fail = rove_output.iter().find(|check_result| { if let Some(result) = check_result.results.first() { @@ -276,7 +288,8 @@ pub async fn qc_data( qc_results.push(QcResult { timeseries_id: datum.timeseries_id, timestamp: chunk.timestamp, - pipeline: pipeline.to_string(), + // TODO: should this encode more info? In theory the param/type can be deduced from the DB anyway + pipeline: "fresh".to_string(), flag, fail_condition, }); @@ -339,7 +352,8 @@ async fn handle_kldata( State(pool): State, State(param_conversions): State, State(permit_table): State>>, - State(qc_scheduler): State>, + State(rove_connector): State>, + State(qc_pipelines): State>>, body: String, ) -> Json { let result: Result = async { @@ -353,7 +367,7 @@ async fn handle_kldata( insert_data(&data, &mut conn).await?; - qc_data(&data, &qc_scheduler, &mut conn).await?; + qc_data(&data, &mut conn, &rove_connector, &qc_pipelines).await?; Ok(message_id) } @@ -404,12 +418,14 @@ pub async fn run( db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, - qc_scheduler: rove::Scheduler, + rove_connector: rove_connector::Connector, + qc_pipelines: HashMap<(i32, RelativeDuration), rove::Pipeline>, ) -> Result<(), Box> { // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; - let qc_scheduler = Arc::new(qc_scheduler); + let rove_connector = Arc::new(rove_connector); + let qc_pipelines = Arc::new(qc_pipelines); // build our application with a single route let app = Router::new() @@ -418,7 +434,8 @@ pub async fn run( db_pool, param_conversions, permit_tables, - qc_scheduler, + rove_connector, + qc_pipelines, }); // run our app with hyper, listening globally on port 3001 diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index 2a9e6b62..6bee0586 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -1,13 +1,7 @@ use bb8_postgres::PostgresConnectionManager; -use rove::{ - data_switch::{DataConnector, DataSwitch}, - load_pipelines, -}; +use lard_ingestion::qc_pipelines::load_pipelines; use rove_connector::Connector; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; use tokio_postgres::NoTls; use lard_ingestion::permissions; @@ -37,15 +31,21 @@ async fn main() -> Result<(), Box> { let db_pool = bb8::Pool::builder().build(manager).await?; // QC system - let scheduler = rove::Scheduler::new( - load_pipelines("").unwrap(), - DataSwitch::new(HashMap::from([( - String::from("lard"), - Box::new(Connector { - pool: db_pool.clone(), - }) as Box, - )])), - ); + // NOTE: Keeping this vesion around in case we want it for the periodic checks + // let scheduler = rove::Scheduler::new( + // load_pipelines("").unwrap(), + // DataSwitch::new(HashMap::from([( + // String::from("lard"), + // Box::new(Connector { + // pool: db_pool.clone(), + // }) as Box, + // )])), + // ); + let rove_connector = Connector { + pool: db_pool.clone(), + }; + + let qc_pipelines = load_pipelines("qc_pipelines")?; println!("Spawing task to fetch permissions from StInfoSys..."); // background task to refresh permit tables every 30 mins @@ -79,5 +79,12 @@ async fn main() -> Result<(), Box> { // Set up and run our server + database println!("Ingestion server started!"); - lard_ingestion::run(db_pool, PARAMCONV, permit_tables, scheduler).await + lard_ingestion::run( + db_pool, + PARAMCONV, + permit_tables, + rove_connector, + qc_pipelines, + ) + .await } diff --git a/ingestion/src/qc_pipelines.rs b/ingestion/src/qc_pipelines.rs new file mode 100644 index 00000000..feafa29b --- /dev/null +++ b/ingestion/src/qc_pipelines.rs @@ -0,0 +1,48 @@ +use chronoutil::RelativeDuration; +use rove::pipeline::{self, derive_num_leading_trailing, Pipeline}; +use serde::Deserialize; +use std::{collections::HashMap, path::Path}; + +#[derive(Deserialize)] +struct Header { + param_id: i32, + time_resolution: String, + #[allow(dead_code)] + sensor: Vec, +} + +#[derive(Deserialize)] +struct PipelineDef { + header: Header, + pipeline: Pipeline, +} + +pub fn load_pipelines( + path: impl AsRef, +) -> Result, pipeline::Error> { + std::fs::read_dir(path)? + .map(|entry| { + let entry = entry?; + + if !entry.file_type()?.is_file() { + return Err(pipeline::Error::DirectoryStructure); + } + + let mut pipeline_def: PipelineDef = + toml::from_str(&std::fs::read_to_string(entry.path())?)?; + ( + pipeline_def.pipeline.num_leading_required, + pipeline_def.pipeline.num_trailing_required, + ) = derive_num_leading_trailing(&pipeline_def.pipeline); + + let key = ( + pipeline_def.header.param_id, + // TODO: remove unwrap + RelativeDuration::parse_from_iso8601(&pipeline_def.header.time_resolution).unwrap(), + ); + + Ok(Some((key, pipeline_def.pipeline))) + }) + .filter_map(Result::transpose) + .collect() +} diff --git a/qc_pipelines/fresh/TA_PT1H.toml b/qc_pipelines/fresh/TA_PT1H.toml new file mode 100644 index 00000000..e16c67e9 --- /dev/null +++ b/qc_pipelines/fresh/TA_PT1H.toml @@ -0,0 +1,46 @@ +[header] +# TODO: reconsider format of station/level/sensor filtering. Is this supposed to be a regex? +# Would we be better served with a blocklist/allowlist? +# stations = "*" +param_id = 211 +time_resolution = "PT1H" +# level = "*" +sensor = [0] + +[[step]] +name = "special_value_check" +[step.special_value_check] +special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999] + +[[step]] +name = "range_check" +[step.range_check] +min = -55 +max = 50 + +[[step]] +name = "climate_range_check" +[step.range_check_dynamic] +source = "netcdf" # TODO: define a neat spec for this? + +[[step]] +name = "step_check" +[step.step_check] +max = 18.6 + +[[step]] +name = "flatline_check" +[step.flatline_check] +max = 10 + +[[step]] +name = "spike_check" +[step.spike_check] +max = 18.6 + +[[step]] +name = "model_consistency_check" +[step.model_consistency_check] +model_source = "lustre" +model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model +threshold = 3.0 # FIXME: made up value by Ingrid diff --git a/rove_connector/src/lib.rs b/rove_connector/src/lib.rs index fe3b6f9a..2456a7ec 100644 --- a/rove_connector/src/lib.rs +++ b/rove_connector/src/lib.rs @@ -91,7 +91,7 @@ fn regularize( } impl Connector { - async fn fetch_one( + pub async fn fetch_one( &self, ts_id: i32, time_spec: &TimeSpec,