Skip to content

Commit

Permalink
use real QC pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Jan 9, 2025
1 parent 6556b35 commit bd9be7e
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 42 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ serde = { version = "1.0.217", features = ["derive"] }
thiserror = "1.0.69"
tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] }
tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] }
toml = "0.8.19"
1 change: 1 addition & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
toml.workspace = true
61 changes: 39 additions & 22 deletions ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use axum::{
use bb8::PooledConnection;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, Utc};
use chronoutil::RelativeDuration;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use rove::data_switch::{SpaceSpec, TimeSpec, Timestamp};
use rove::data_switch::{TimeSpec, Timestamp};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
Expand All @@ -21,6 +22,7 @@ use tokio_postgres::NoTls;
#[cfg(feature = "kafka")]
pub mod kvkafka;
pub mod permissions;
pub mod qc_pipelines;
use permissions::{ParamPermitTable, StationPermitTable};

#[derive(Error, Debug)]
Expand All @@ -33,6 +35,8 @@ pub enum Error {
Parse(String),
#[error("qc system returned an error: {0}")]
Qc(#[from] rove::scheduler::Error),
#[error("rove connector returned an error: {0}")]
Connector(#[from] rove::data_switch::Error),
#[error("RwLock was poisoned: {0}")]
Lock(String),
#[error("Could not read environment variable: {0}")]
Expand Down Expand Up @@ -76,7 +80,8 @@ struct IngestorState {
db_pool: PgConnectionPool,
param_conversions: ParamConversions, // converts param codes to element ids
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
qc_scheduler: Arc<rove::Scheduler>,
rove_connector: Arc<rove_connector::Connector>,
qc_pipelines: Arc<HashMap<(i32, RelativeDuration), rove::Pipeline>>,
}

impl FromRef<IngestorState> for PgConnectionPool {
Expand All @@ -97,9 +102,15 @@ impl FromRef<IngestorState> for Arc<RwLock<(ParamPermitTable, StationPermitTable
}
}

impl FromRef<IngestorState> for Arc<rove::Scheduler> {
fn from_ref(state: &IngestorState) -> Arc<rove::Scheduler> {
state.qc_scheduler.clone()
impl FromRef<IngestorState> for Arc<rove_connector::Connector> {
fn from_ref(state: &IngestorState) -> Arc<rove_connector::Connector> {
state.rove_connector.clone()
}
}

impl FromRef<IngestorState> for Arc<HashMap<(i32, RelativeDuration), rove::Pipeline>> {
fn from_ref(state: &IngestorState) -> Arc<HashMap<(i32, RelativeDuration), rove::Pipeline>> {
state.qc_pipelines.clone()
}
}

Expand Down Expand Up @@ -204,8 +215,9 @@ pub async fn insert_data(

pub async fn qc_data(
chunks: &Vec<DataChunk<'_>>,
scheduler: &rove::Scheduler,
conn: &mut PooledPgConn<'_>,
rove_connector: &rove_connector::Connector,
pipelines: &HashMap<(i32, RelativeDuration), rove::Pipeline>,
) -> Result<(), Error> {
// TODO: see conflict resolution issues on queries in `insert_data`
// On periodic or consistency QC pipelines, we should be checking the provenance table to
Expand Down Expand Up @@ -245,19 +257,19 @@ pub async fn qc_data(
for datum in chunk.data.iter() {
let time_spec =
TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution);
let space_spec = SpaceSpec::One(datum.timeseries_id.to_string());
// TODO: load and fetch real pipeline
let pipeline = "sample_pipeline";
let rove_output = scheduler
.validate_direct(
"lard",
&[] as &[&str],
let pipeline = match pipelines.get(&(datum.param_id, time_resolution)) {
Some(pipeline) => pipeline,
None => continue,
};
let data = rove_connector
.fetch_one(
datum.timeseries_id,
&time_spec,
&space_spec,
pipeline,
None,
pipeline.num_leading_required,
pipeline.num_trailing_required,
)
.await?;
let rove_output = rove::Scheduler::schedule_tests(pipeline, data)?;

let first_fail = rove_output.iter().find(|check_result| {
if let Some(result) = check_result.results.first() {
Expand All @@ -276,7 +288,8 @@ pub async fn qc_data(
qc_results.push(QcResult {
timeseries_id: datum.timeseries_id,
timestamp: chunk.timestamp,
pipeline: pipeline.to_string(),
// TODO: should this encode more info? In theory the param/type can be deduced from the DB anyway
pipeline: "fresh".to_string(),
flag,
fail_condition,
});
Expand Down Expand Up @@ -339,7 +352,8 @@ async fn handle_kldata(
State(pool): State<PgConnectionPool>,
State(param_conversions): State<ParamConversions>,
State(permit_table): State<Arc<RwLock<(ParamPermitTable, StationPermitTable)>>>,
State(qc_scheduler): State<Arc<rove::Scheduler>>,
State(rove_connector): State<Arc<rove_connector::Connector>>,
State(qc_pipelines): State<Arc<HashMap<(i32, RelativeDuration), rove::Pipeline>>>,
body: String,
) -> Json<KldataResp> {
let result: Result<usize, Error> = async {
Expand All @@ -353,7 +367,7 @@ async fn handle_kldata(

insert_data(&data, &mut conn).await?;

qc_data(&data, &qc_scheduler, &mut conn).await?;
qc_data(&data, &mut conn, &rove_connector, &qc_pipelines).await?;

Ok(message_id)
}
Expand Down Expand Up @@ -404,12 +418,14 @@ pub async fn run(
db_pool: PgConnectionPool,
param_conversion_path: &str,
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
qc_scheduler: rove::Scheduler,
rove_connector: rove_connector::Connector,
qc_pipelines: HashMap<(i32, RelativeDuration), rove::Pipeline>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// set up param conversion map
let param_conversions = get_conversions(param_conversion_path)?;

let qc_scheduler = Arc::new(qc_scheduler);
let rove_connector = Arc::new(rove_connector);
let qc_pipelines = Arc::new(qc_pipelines);

// build our application with a single route
let app = Router::new()
Expand All @@ -418,7 +434,8 @@ pub async fn run(
db_pool,
param_conversions,
permit_tables,
qc_scheduler,
rove_connector,
qc_pipelines,
});

// run our app with hyper, listening globally on port 3001
Expand Down
43 changes: 25 additions & 18 deletions ingestion/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use bb8_postgres::PostgresConnectionManager;
use rove::{
data_switch::{DataConnector, DataSwitch},
load_pipelines,
};
use lard_ingestion::qc_pipelines::load_pipelines;
use rove_connector::Connector;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use std::sync::{Arc, RwLock};
use tokio_postgres::NoTls;

use lard_ingestion::permissions;
Expand Down Expand Up @@ -37,15 +31,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let db_pool = bb8::Pool::builder().build(manager).await?;

// QC system
let scheduler = rove::Scheduler::new(
load_pipelines("").unwrap(),
DataSwitch::new(HashMap::from([(
String::from("lard"),
Box::new(Connector {
pool: db_pool.clone(),
}) as Box<dyn DataConnector + Send>,
)])),
);
// NOTE: Keeping this vesion around in case we want it for the periodic checks
// let scheduler = rove::Scheduler::new(
// load_pipelines("").unwrap(),
// DataSwitch::new(HashMap::from([(
// String::from("lard"),
// Box::new(Connector {
// pool: db_pool.clone(),
// }) as Box<dyn DataConnector + Send>,
// )])),
// );
let rove_connector = Connector {
pool: db_pool.clone(),
};

let qc_pipelines = load_pipelines("qc_pipelines")?;

println!("Spawing task to fetch permissions from StInfoSys...");
// background task to refresh permit tables every 30 mins
Expand Down Expand Up @@ -79,5 +79,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// Set up and run our server + database
println!("Ingestion server started!");
lard_ingestion::run(db_pool, PARAMCONV, permit_tables, scheduler).await
lard_ingestion::run(
db_pool,
PARAMCONV,
permit_tables,
rove_connector,
qc_pipelines,
)
.await
}
48 changes: 48 additions & 0 deletions ingestion/src/qc_pipelines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use chronoutil::RelativeDuration;
use rove::pipeline::{self, derive_num_leading_trailing, Pipeline};
use serde::Deserialize;
use std::{collections::HashMap, path::Path};

#[derive(Deserialize)]
struct Header {
param_id: i32,
time_resolution: String,
#[allow(dead_code)]
sensor: Vec<i32>,
}

#[derive(Deserialize)]
struct PipelineDef {
header: Header,
pipeline: Pipeline,
}

pub fn load_pipelines(
path: impl AsRef<Path>,
) -> Result<HashMap<(i32, RelativeDuration), Pipeline>, pipeline::Error> {
std::fs::read_dir(path)?
.map(|entry| {
let entry = entry?;

if !entry.file_type()?.is_file() {
return Err(pipeline::Error::DirectoryStructure);
}

let mut pipeline_def: PipelineDef =
toml::from_str(&std::fs::read_to_string(entry.path())?)?;
(
pipeline_def.pipeline.num_leading_required,
pipeline_def.pipeline.num_trailing_required,
) = derive_num_leading_trailing(&pipeline_def.pipeline);

let key = (
pipeline_def.header.param_id,
// TODO: remove unwrap
RelativeDuration::parse_from_iso8601(&pipeline_def.header.time_resolution).unwrap(),
);

Ok(Some((key, pipeline_def.pipeline)))
})
.filter_map(Result::transpose)
.collect()
}
46 changes: 46 additions & 0 deletions qc_pipelines/fresh/TA_PT1H.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[header]
# TODO: reconsider format of station/level/sensor filtering. Is this supposed to be a regex?
# Would we be better served with a blocklist/allowlist?
# stations = "*"
param_id = 211
time_resolution = "PT1H"
# level = "*"
sensor = [0]

[[step]]
name = "special_value_check"
[step.special_value_check]
special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999]

[[step]]
name = "range_check"
[step.range_check]
min = -55
max = 50

[[step]]
name = "climate_range_check"
[step.range_check_dynamic]
source = "netcdf" # TODO: define a neat spec for this?

[[step]]
name = "step_check"
[step.step_check]
max = 18.6

[[step]]
name = "flatline_check"
[step.flatline_check]
max = 10

[[step]]
name = "spike_check"
[step.spike_check]
max = 18.6

[[step]]
name = "model_consistency_check"
[step.model_consistency_check]
model_source = "lustre"
model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model
threshold = 3.0 # FIXME: made up value by Ingrid
2 changes: 1 addition & 1 deletion rove_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn regularize(
}

impl Connector {
async fn fetch_one(
pub async fn fetch_one(
&self,
ts_id: i32,
time_spec: &TimeSpec,
Expand Down

0 comments on commit bd9be7e

Please sign in to comment.