From 4069304b70c5e84a43efaac270bb855952f2cd9a Mon Sep 17 00:00:00 2001 From: tee8z Date: Tue, 27 Aug 2024 19:22:01 -0400 Subject: [PATCH] Add optional coordinator pubkey for entry (#29) --- Cargo.lock | 2 + oracle/Cargo.toml | 8 +- oracle/src/db/event_data.rs | 1559 +---------------- oracle/src/db/event_db_migrations.rs | 1 + oracle/src/db/mod.rs | 1623 +++++++++++++++++- oracle/src/file_access.rs | 6 +- oracle/src/lib.rs | 2 - oracle/src/oracle.rs | 44 +- oracle/src/routes/stations/weather_routes.rs | 10 +- oracle/src/ser/mod.rs | 2 - oracle/src/ser/utc_datetime.rs | 19 - oracle/src/ser/utc_option_datetime.rs | 31 - oracle/src/startup.rs | 15 +- oracle/tests/api/create_event.rs | 2 + oracle/tests/api/create_event_entry.rs | 4 + oracle/tests/api/etl_workflow.rs | 10 + oracle/tests/api/get_events.rs | 3 + 17 files changed, 1727 insertions(+), 1614 deletions(-) delete mode 100644 oracle/src/ser/mod.rs delete mode 100644 oracle/src/ser/utc_datetime.rs delete mode 100644 oracle/src/ser/utc_option_datetime.rs diff --git a/Cargo.lock b/Cargo.lock index a02d8a9..b232372 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1105,6 +1105,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -2300,6 +2301,7 @@ dependencies = [ "fern", "futures", "h2", + "hex", "hyper", "itertools", "log", diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index bdcbc43..e946ba7 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -24,6 +24,7 @@ nostr = "0.33.0" duckdb = { version = "1.0.0" } fern = { version = "0.6.2", features = ["colored"] } futures = "0.3.28" +hex = "0.4.3" hyper = "1.4.0" h2 = "0.4.5" itertools = "0.13.0" @@ -38,7 +39,12 @@ rustix = "0.38.19" scooby = "0.5.0" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.117" -time = { version = "0.3.36", features = ["parsing", "formatting", "macros"] } +time = { version = "0.3.36", features = [ + "parsing", + "formatting", + "macros", + "serde", +] } thiserror = "1.0.62" tokio = { version = "1.35.1", features = [ "macros", diff --git a/oracle/src/db/event_data.rs b/oracle/src/db/event_data.rs index b8c93ad..c33b700 100644 --- a/oracle/src/db/event_data.rs +++ b/oracle/src/db/event_data.rs @@ -1,28 +1,21 @@ -use super::{run_migrations, weather_data, Forecast, Observation}; -use crate::oracle::Oracle; -use crate::utc_datetime; -use anyhow::anyhow; -use dlctix::bitcoin::XOnlyPublicKey; -use dlctix::secp::{MaybeScalar, Scalar}; -use dlctix::EventAnnouncement; -use duckdb::arrow::datatypes::ToByteSlice; -use duckdb::types::{OrderedMap, ToSqlOutput, Type, Value}; -use duckdb::{ - ffi, params, params_from_iter, AccessMode, Config, Connection, ErrorCode, Row, ToSql, +use super::{run_migrations, CreateEventData, Event, EventFilter, EventSummary}; + +use crate::{ + ActiveEvent, Forecasted, Observed, SignEvent, ToRawSql, ValueOptions, Weather, WeatherChoices, + WeatherChoicesWithEntry, WeatherEntry, }; -use itertools::Itertools; +use dlctix::bitcoin::XOnlyPublicKey; +use duckdb::types::Value; +use duckdb::{params, params_from_iter, AccessMode, Config, Connection}; use log::{debug, info}; use regex::Regex; use scooby::postgres::{insert_into, select, update, with, Aliasable, Joinable, Parameters}; -use serde::{Deserialize, Serialize}; use serde_json::to_vec; use std::collections::HashMap; use std::time::Duration as StdDuration; use time::format_description::well_known::Rfc3339; -use time::macros::format_description; -use time::{Date, Duration, OffsetDateTime, UtcOffset}; +use time::OffsetDateTime; use tokio::time::timeout; -use utoipa::{IntoParams, ToSchema}; use uuid::Uuid; pub struct EventData { @@ -254,7 +247,8 @@ impl EventData { signing_date, observation_date, locations, - event_annoucement) VALUES(?,?,?,?,?,?,?,?,?)", + event_annoucement, + coordinator_pubkey) VALUES(?,?,?,?,?,?,?,?,?,?)", )?; stmt.execute(params![ event.id.to_string(), @@ -266,11 +260,27 @@ impl EventData { observation_date, locations_sql, annoucement_bytes, + event.coordinator_pubkey ])?; Ok(event.into()) } + pub async fn get_event_coordinator_pubkey( + &self, + event_id: Uuid, + ) -> Result { + let coordinator_pubkey = select("coordinator_pubkey") + .from("events") + .where_("id = $1"); + let query_str = self.prepare_query(coordinator_pubkey.to_string()); + debug!("query_str: {}", query_str); + let conn = self.new_readonly_connection_retry().await?; + let mut stmt = conn.prepare(&query_str)?; + let sql_params = params_from_iter(vec![event_id.to_string()]); + stmt.query_row(sql_params, |row| row.get(0)) + } + pub async fn add_event_entry( &self, entry: WeatherEntry, @@ -362,6 +372,10 @@ impl EventData { .collect(); info!("insert values: {:?}", insert_values); + if insert_values.is_empty() { + debug!("entry values were emtpy, skipping creating entry"); + return Ok(()); + } let conn = self.new_write_connection_retry().await?; let mut weather_stmt = conn.prepare(&query_str)?; @@ -676,9 +690,9 @@ impl EventData { .from("events") .where_("id = $1"); - let conn = self.new_readonly_connection_retry().await?; let query_str = self.prepare_query(event_select.to_string()); debug!("query_str: {}", query_str); + let conn = self.new_readonly_connection_retry().await?; let mut stmt = conn.prepare(&query_str)?; let sql_params = params_from_iter(vec![id.to_string()]); stmt.query_row(sql_params, |row| row.try_into()) @@ -781,1512 +795,3 @@ impl EventData { fixed_params.to_string() } } - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -pub struct CreateEvent { - /// Client needs to provide a valid Uuidv7 - pub id: Uuid, - #[serde(with = "utc_datetime")] - /// Time at which the attestation will be added to the event, needs to be after the observation date - pub signing_date: OffsetDateTime, - #[serde(with = "utc_datetime")] - /// Date of when the weather observations occured (midnight UTC), all entries must be made before this time - pub observation_date: OffsetDateTime, - /// NOAA observation stations used in this event - pub locations: Vec, - /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) - pub number_of_values_per_entry: usize, - /// Total number of allowed entries into the event - pub total_allowed_entries: usize, - /// Total amount of places that are part of the winnings split - pub number_of_places_win: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CreateEventData { - /// Provide UUIDv7 to use for looking up the event - pub id: Uuid, - #[serde(with = "utc_datetime")] - /// Time at which the attestation will be added to the event - pub signing_date: OffsetDateTime, - #[serde(with = "utc_datetime")] - /// Date of when the weather observations occured (midnight UTC), all entries must be made before this time - pub observation_date: OffsetDateTime, - // NOAA observation stations used in this event - pub locations: Vec, - /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) - pub number_of_values_per_entry: i64, - pub total_allowed_entries: i64, - pub number_of_places_win: i64, - /// Used to sign the result of the event being watched - pub nonce: Scalar, - /// Used in constructing the dlctix transactions - pub event_annoucement: EventAnnouncement, -} - -impl CreateEventData { - //TODO: use the builder pattern - #[allow(clippy::too_many_arguments)] - pub fn new( - oracle: &Oracle, - event_id: Uuid, - observation_date: OffsetDateTime, - signing_date: OffsetDateTime, - locations: Vec, - total_allowed_entries: usize, - number_of_places_win: usize, - number_of_values_per_entry: usize, - ) -> Result { - if event_id.get_version_num() != 7 { - return Err(anyhow!( - "Client needs to provide a valid Uuidv7 for event id {}", - event_id - )); - } - if observation_date > signing_date { - return Err(anyhow::anyhow!( - "Signing date {} needs to be after observation date {}", - signing_date.format(&Rfc3339).unwrap(), - observation_date.format(&Rfc3339).unwrap() - )); - } - - let public_key = oracle.raw_public_key(); - // number_of_values_per_entry * 2 == max value, create array from max value to 0 - // determine all possible messages that we might sign - let max_number_of_points_per_value_in_entry = 2; - let possible_scores: Vec = (0..=(number_of_values_per_entry - * max_number_of_points_per_value_in_entry)) - .map(|val| val as i64) - .collect(); - - // allows us to have comps where say the top 3 scores split the pot - let possible_outcome_rankings: Vec> = possible_scores - .iter() - .combinations(number_of_places_win) - //Sort possible combinations in desc order - .map(|mut combos| { - combos.sort_by_key(|n| i64::MAX - *n); - combos - }) - .filter(|combination| { - // Check if the combination is sorted in descending order, if not filter out of possible outcomes - combination.windows(2).all(|window| window[0] >= window[1]) - }) - .map(|combination| combination.into_iter().cloned().collect()) - .collect(); - info!("outcomes: {:?}", possible_outcome_rankings); - // holds all possible scoring results of the event - let outcome_messages: Vec> = possible_outcome_rankings - .into_iter() - .map(|inner_vec| { - inner_vec - .into_iter() - .flat_map(|num| num.to_be_bytes()) - .collect::>() - }) - .collect(); - - let mut rng = rand::thread_rng(); - let nonce = Scalar::random(&mut rng); - let nonce_point = nonce.base_point_mul(); - // Manually set expiry to 7 days after the signature should have been proveded so users can get their funds back - let expiry = signing_date - .saturating_add(Duration::DAY * 7) - .unix_timestamp() as u32; - - // The actual accounement the oracle is going to attest the outcome - let event_annoucement = EventAnnouncement { - oracle_pubkey: public_key.into(), - nonce_point, - outcome_messages, - expiry: Some(expiry), - }; - - Ok(Self { - id: event_id, - observation_date, - signing_date, - nonce, - total_allowed_entries: total_allowed_entries as i64, - number_of_places_win: number_of_places_win as i64, - number_of_values_per_entry: number_of_values_per_entry as i64, - locations, - event_annoucement, - }) - } -} - -impl From for Event { - fn from(value: CreateEventData) -> Self { - Self { - id: value.id, - signing_date: value.signing_date, - observation_date: value.observation_date, - locations: value.locations, - total_allowed_entries: value.total_allowed_entries, - number_of_places_win: value.number_of_places_win, - number_of_values_per_entry: value.number_of_values_per_entry, - event_annoucement: value.event_annoucement, - nonce: value.nonce, - status: EventStatus::default(), - entry_ids: vec![], - entries: vec![], - weather: vec![], - attestation: None, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, IntoParams)] -pub struct EventFilter { - // TODO: add more options, proper pagination and search - pub limit: Option, - pub event_ids: Option>, -} - -impl Default for EventFilter { - fn default() -> Self { - Self { - limit: Some(100_usize), - event_ids: None, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct SignEvent { - pub id: Uuid, - #[serde(with = "utc_datetime")] - pub signing_date: OffsetDateTime, - #[serde(with = "utc_datetime")] - pub observation_date: OffsetDateTime, - pub status: EventStatus, - pub nonce: Scalar, - pub event_annoucement: EventAnnouncement, - pub number_of_places_win: i64, - pub number_of_values_per_entry: i64, - pub attestation: Option, -} - -impl SignEvent { - pub fn update_status(&mut self) { - self.status = get_status(self.observation_date, self.attestation) - } -} - -impl<'a> TryFrom<&Row<'a>> for SignEvent { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - //raw date format 2024-08-11 00:27:39.013046-04 - let sql_time_format = format_description!( - "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" - ); - let mut sign_events = SignEvent { - id: row - .get::(0) - .map(|val| Uuid::parse_str(&val))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - signing_date: row - .get::(1) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - observation_date: row - .get::(2) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - status: EventStatus::default(), - number_of_places_win: row.get::(3)?, - number_of_values_per_entry: row.get::(4)?, - attestation: row - .get::(5) - .map(|v| { - let blob_attestation = match v { - Value::Blob(raw) => raw, - _ => vec![], - }; - if !blob_attestation.is_empty() { - //TODO: handle the conversion more gracefully than unwrap - Some(MaybeScalar::from_slice(blob_attestation.to_byte_slice()).unwrap()) - } else { - None - } - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(5, Type::Any, Box::new(e)))?, - nonce: row - .get::(6) - .map(|raw| { - let blob = match raw { - Value::Blob(val) => val, - _ => vec![], - }; - serde_json::from_slice(&blob) - })? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(6, Type::Any, Box::new(e)))?, - event_annoucement: row - .get::(7) - .map(|raw| { - let blob = match raw { - Value::Blob(val) => val, - _ => vec![], - }; - serde_json::from_slice(&blob) - })? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(7, Type::Any, Box::new(e)))?, - }; - sign_events.update_status(); - Ok(sign_events) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct ActiveEvent { - pub id: Uuid, - pub locations: Vec, - #[serde(with = "utc_datetime")] - pub signing_date: OffsetDateTime, - #[serde(with = "utc_datetime")] - pub observation_date: OffsetDateTime, - pub status: EventStatus, - pub total_allowed_entries: i64, - pub total_entries: i64, - pub number_of_values_per_entry: i64, - pub number_of_places_win: i64, - pub attestation: Option, -} - -impl ActiveEvent { - pub fn update_status(&mut self) { - self.status = get_status(self.observation_date, self.attestation) - } -} - -#[derive(Debug, Default, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub enum EventStatus { - /// Observation date has not passed yet and entries can be added - #[default] - Live, - /// Currently in the Observation date, entries cannot be added - Running, - /// Event Observation window has finished, not yet signed - Completed, - /// Event has completed and been signed by the oracle - Signed, -} - -impl std::fmt::Display for EventStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Live => write!(f, "live"), - Self::Running => write!(f, "running"), - Self::Completed => write!(f, "completed"), - Self::Signed => write!(f, "signed"), - } - } -} - -impl TryFrom<&str> for EventStatus { - type Error = anyhow::Error; - - fn try_from(s: &str) -> Result { - match s { - "live" => Ok(EventStatus::Live), - "running" => Ok(EventStatus::Running), - "completed" => Ok(EventStatus::Completed), - "signed" => Ok(EventStatus::Signed), - val => Err(anyhow!("invalid status: {}", val)), - } - } -} - -impl TryFrom for EventStatus { - type Error = anyhow::Error; - - fn try_from(s: String) -> Result { - match s.as_str() { - "live" => Ok(EventStatus::Live), - "running" => Ok(EventStatus::Running), - "completed" => Ok(EventStatus::Completed), - "signed" => Ok(EventStatus::Signed), - val => Err(anyhow!("invalid status: {}", val)), - } - } -} - -impl<'a> TryFrom<&Row<'a>> for ActiveEvent { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - //raw date format 2024-08-11 00:27:39.013046-04 - let sql_time_format = format_description!( - "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" - ); - let mut active_events = ActiveEvent { - id: row - .get::(0) - .map(|val| Uuid::parse_str(&val))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - signing_date: row - .get::(1) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - observation_date: row - .get::(2) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - locations: row - .get::(3) - .map(|locations| { - let list_locations = match locations { - Value::List(list) => list, - _ => vec![], - }; - let mut locations_conv = vec![]; - for value in list_locations.iter() { - if let Value::Text(location) = value { - locations_conv.push(location.clone()) - } - } - locations_conv - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, - total_allowed_entries: row.get::(4)?, - status: EventStatus::default(), - total_entries: row.get::(5)?, - number_of_places_win: row.get::(6)?, - number_of_values_per_entry: row.get::(7)?, - attestation: row - .get::(8) - .map(|v| { - let blob_attestation = match v { - Value::Blob(raw) => raw, - _ => vec![], - }; - if !blob_attestation.is_empty() { - //TODO: handle the conversion more gracefully than unwrap - Some(MaybeScalar::from_slice(blob_attestation.to_byte_slice()).unwrap()) - } else { - None - } - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(8, Type::Any, Box::new(e)))?, - }; - active_events.update_status(); - Ok(active_events) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct EventSummary { - pub id: Uuid, - #[serde(with = "utc_datetime")] - /// Time at which the attestation will be added to the event - pub signing_date: OffsetDateTime, - #[serde(with = "utc_datetime")] - /// Date of when the weather observations occured - pub observation_date: OffsetDateTime, - /// NOAA observation stations used in this event - pub locations: Vec, - /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) - pub number_of_values_per_entry: i64, - /// Current status of the event, where in the lifecyle are we (LIVE, RUNNING, COMPLETED, SIGNED, defaults to LIVE) - pub status: EventStatus, - /// Knowing the total number of entries, how many can place - /// The dlctix coordinator can determine how many transactions to create - pub total_allowed_entries: i64, - /// Needs to all be generated at the start - pub total_entries: i64, - pub number_of_places_win: i64, - /// The forecasted and observed values for each station on the event date - pub weather: Vec, - /// When added it means the oracle has signed that the current data is the final result - pub attestation: Option, - /// Used to sign the result of the event being watched - pub nonce: Scalar, -} - -impl EventSummary { - pub fn update_status(&mut self) { - self.status = get_status(self.observation_date, self.attestation) - } -} - -pub fn get_status( - observation_date: OffsetDateTime, - attestation: Option, -) -> EventStatus { - //always have the events run for a single day for now - if observation_date < OffsetDateTime::now_utc() - && observation_date.saturating_sub(Duration::days(1)) > OffsetDateTime::now_utc() - && attestation.is_none() - { - return EventStatus::Running; - } - - if observation_date < OffsetDateTime::now_utc() - && observation_date.saturating_sub(Duration::days(1)) < OffsetDateTime::now_utc() - && attestation.is_none() - { - return EventStatus::Completed; - } - - if attestation.is_some() { - return EventStatus::Signed; - } - //default to live - EventStatus::Live -} - -impl<'a> TryFrom<&Row<'a>> for EventSummary { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - //raw date format 2024-08-11 00:27:39.013046-04 - let sql_time_format = format_description!( - "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" - ); - let mut event_summary = EventSummary { - id: row - .get::(0) - .map(|val| Uuid::parse_str(&val))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - signing_date: row - .get::(1) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - observation_date: row - .get::(2) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - status: EventStatus::default(), - locations: row - .get::(3) - .map(|locations| { - let list_locations = match locations { - Value::List(list) => list, - _ => vec![], - }; - let mut locations_conv = vec![]; - for value in list_locations.iter() { - if let Value::Text(location) = value { - locations_conv.push(location.clone()) - } - } - locations_conv - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, - total_allowed_entries: row.get::(4)?, - total_entries: row.get::(5)?, - number_of_places_win: row.get::(6)?, - number_of_values_per_entry: row.get::(7)?, - attestation: row - .get::(8) - .map(|v| { - let blob_attestation = match v { - Value::Blob(raw) => raw, - _ => vec![], - }; - if !blob_attestation.is_empty() { - //TODO: handle the conversion more gracefully than unwrap - Some(MaybeScalar::from_slice(blob_attestation.to_byte_slice()).unwrap()) - } else { - None - } - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(8, Type::Any, Box::new(e)))?, - nonce: row - .get::(9) - .map(|raw| { - let blob = match raw { - Value::Blob(val) => val, - _ => vec![], - }; - serde_json::from_slice(&blob) - })? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(9, Type::Any, Box::new(e)))?, - weather: vec![], - }; - event_summary.update_status(); - Ok(event_summary) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct Event { - pub id: Uuid, - #[serde(with = "utc_datetime")] - /// Time at which the attestation will be added to the event - pub signing_date: OffsetDateTime, - #[serde(with = "utc_datetime")] - /// Date of when the weather observations occured - pub observation_date: OffsetDateTime, - /// NOAA observation stations used in this event - pub locations: Vec, - /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) - pub number_of_values_per_entry: i64, - /// Current status of the event, where in the lifecyle are we (LIVE, RUNNING, COMPLETED, SIGNED) - pub status: EventStatus, - /// Knowing the total number of entries, how many can place - /// The dlctix coordinator can determine how many transactions to create - pub total_allowed_entries: i64, - /// Needs to all be generated at the start - pub entry_ids: Vec, - pub number_of_places_win: i64, - /// All entries into this event, wont be returned until date of observation begins and will be ranked by score - pub entries: Vec, - /// The forecasted and observed values for each station on the event date - pub weather: Vec, - /// Nonce the oracle committed to use as part of signing final results - pub nonce: Scalar, - /// Holds the predefined outcomes the oracle will attest to at event complet - pub event_annoucement: EventAnnouncement, - /// When added it means the oracle has signed that the current data is the final result - pub attestation: Option, -} - -impl Event { - pub fn update_status(&mut self) { - self.status = get_status(self.observation_date, self.attestation) - } -} - -impl<'a> TryFrom<&Row<'a>> for Event { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - //raw date format 2024-08-11 00:27:39.013046-04 - let sql_time_format = format_description!( - "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" - ); - let mut oracle_event_data = Event { - id: row - .get::(0) - .map(|val| { - debug!("{}", val.to_string()); - Uuid::parse_str(&val) - })? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - signing_date: row - .get::(1) - .map(|val| { - debug!("{}", val.to_string()); - OffsetDateTime::parse(&val, &sql_time_format) - })? - .map(|val| { - debug!("{}", val.to_string()); - val.to_offset(UtcOffset::UTC) - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - observation_date: row - .get::(2) - .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? - .map(|val| val.to_offset(UtcOffset::UTC)) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - event_annoucement: row - .get::(3) - .map(|raw| { - let blob = match raw { - Value::Blob(val) => val, - _ => vec![], - }; - serde_json::from_slice(&blob) - })? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, - locations: row - .get::(4) - .map(|locations| { - let list_locations = match locations { - Value::List(list) => list, - _ => vec![], - }; - let mut locations_conv = vec![]; - for value in list_locations.iter() { - if let Value::Text(location) = value { - locations_conv.push(location.clone()) - } - } - info!("locations: {:?}", locations_conv); - locations_conv - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(4, Type::Any, Box::new(e)))?, - total_allowed_entries: row.get::(5)?, - number_of_places_win: row.get::(6)?, - number_of_values_per_entry: row.get::(7)?, - attestation: row - .get::(8) - .map(|v| { - info!("val: {:?}", v); - let blob_attestation = match v { - Value::Blob(raw) => raw, - _ => vec![], - }; - if !blob_attestation.is_empty() { - //TODO: handle the conversion more gracefully than unwrap - let converted: MaybeScalar = - serde_json::from_slice(&blob_attestation).unwrap(); - Some(converted) - } else { - None - } - }) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(8, Type::Any, Box::new(e)))?, - nonce: row - .get::(9) - .map(|raw| { - let blob = match raw { - Value::Blob(val) => val, - _ => vec![], - }; - serde_json::from_slice(&blob) - })? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(9, Type::Any, Box::new(e)))?, - status: EventStatus::default(), - //These nested values have to be made by more quries - entry_ids: vec![], - entries: vec![], - weather: vec![], - }; - oracle_event_data.update_status(); - Ok(oracle_event_data) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct Weather { - pub station_id: String, - pub observed: Option, - pub forecasted: Forecasted, -} - -impl<'a> TryFrom<&Row<'a>> for Weather { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - let observed: Option = row - .get::(1) - .map(|raw_observed| match raw_observed.clone() { - Value::Struct(observed) => Some(observed.try_into().map_err(|e: anyhow::Error| { - duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - Some(format!( - "error formatting observed: {:?} {}", - raw_observed, e - )), - ) - })), - _ => None, - }) - .and_then(|option_inner_result| match option_inner_result { - Some(inner_result) => inner_result.map(Some), - None => Ok(None), - })?; - - let forecasted: Forecasted = - row.get::(2) - .map(|raw_forecasted| match raw_forecasted.clone() { - Value::Struct(forecasted) => { - forecasted.try_into().map_err(|e: anyhow::Error| { - duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - Some(format!( - "error formatting forecast: {:?} {}", - raw_forecasted, e - )), - ) - }) - } - _ => Err(duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - None, - )), - })??; - Ok(Weather { - station_id: row.get::(0)?, - forecasted, - observed, - }) - } -} - -impl TryFrom<&Forecast> for Forecasted { - type Error = weather_data::Error; - fn try_from(value: &Forecast) -> Result { - let format = format_description!("[year]-[month]-[day]"); - let date = Date::parse(&value.date, format)?; - let datetime = date.with_hms(0, 0, 0).unwrap(); - let datetime_off = datetime.assume_offset(UtcOffset::from_hms(0, 0, 0).unwrap()); - Ok(Self { - date: datetime_off, - temp_low: value.temp_low, - temp_high: value.temp_high, - wind_speed: value.wind_speed, - }) - } -} - -impl TryInto for &OrderedMap { - type Error = duckdb::Error; - - fn try_into(self) -> Result { - let values: Vec<&Value> = self.values().collect(); - - let station_id = values - .first() - .ok_or_else(|| { - duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - Some(String::from("unable to convert station_id")), - ) - }) - .and_then(|raw_station| match raw_station { - Value::Text(station) => Ok(station.clone()), - _ => Err(duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - Some(format!( - "error converting station id into string: {:?}", - raw_station - )), - )), - })?; - let observed: Option = if let Some(Value::Struct(observed)) = values.get(1) { - let observed_converted = observed.try_into().map_err(|e| { - duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - Some(format!("error converting observed: {}", e)), - ) - })?; - Some(observed_converted) - } else { - None - }; - let forecasted = values - .get(2) - .ok_or_else(|| anyhow!("forecasted not found in the map")) - .and_then(|raw_forecasted| match raw_forecasted { - Value::Struct(forecasted) => forecasted.try_into(), - _ => Err(anyhow!( - "error converting forecasted into struct: {:?}", - raw_forecasted - )), - }) - .map_err(|e| { - duckdb::Error::DuckDBFailure( - ffi::Error { - code: ErrorCode::TypeMismatch, - extended_code: 0, - }, - Some(e.to_string()), - ) - })?; - Ok(Weather { - station_id, - observed, - forecasted, - }) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct Observed { - #[serde(with = "utc_datetime")] - pub date: OffsetDateTime, - pub temp_low: i64, - pub temp_high: i64, - pub wind_speed: i64, -} - -impl TryFrom<&Observation> for Observed { - type Error = weather_data::Error; - fn try_from(value: &Observation) -> Result { - Ok(Self { - date: OffsetDateTime::parse(&value.start_time, &Rfc3339)?, - temp_low: value.temp_low.round() as i64, - temp_high: value.temp_high.round() as i64, - wind_speed: value.wind_speed, - }) - } -} - -impl TryInto for &OrderedMap { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - debug!("raw observed: {:?}", self); - let values: Vec<&Value> = self.values().collect(); - let date = values - .first() - .ok_or_else(|| anyhow!("date not found in the map")) - .and_then(|raw_date| match raw_date { - Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), - v => Err(anyhow!( - "error converting observed date into OffsetDatetime: {:?}, {:?}", - raw_date, - v - )), - }) - .and_then(|timestamp| { - OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( - |e| { - anyhow!( - "error parsing observed date into offsetdatetime: {} {}", - timestamp, - e - ) - }, - ) - }) - .map(|val| val.to_offset(UtcOffset::UTC))?; - - let temp_low = values - .get(1) - .ok_or_else(|| anyhow!("temp_low not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let temp_high = values - .get(2) - .ok_or_else(|| anyhow!("temp_high not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let wind_speed = values - .get(3) - .ok_or_else(|| anyhow!("wind_speed not found in the map")) - .and_then(|raw_speed| match raw_speed { - Value::Int(speed) => Ok(*speed as i64), - _ => Err(anyhow!( - "error converting wind_speed into int: {:?}", - raw_speed - )), - })?; - - Ok(Observed { - date, - temp_low, - temp_high, - wind_speed, - }) - } -} - -impl TryInto for OrderedMap { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - debug!("raw observed: {:?}", self); - let values: Vec<&Value> = self.values().collect(); - let date = values - .first() - .ok_or_else(|| anyhow!("date not found in the map")) - .and_then(|raw_date| match raw_date { - Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), - v => Err(anyhow!( - "error converting observed date into OffsetDatetime: {:?}, {:?}", - raw_date, - v - )), - }) - .and_then(|timestamp| { - OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( - |e| { - anyhow!( - "error parsing observed date into offsetdatetime: {} {}", - timestamp, - e - ) - }, - ) - }) - .map(|val| val.to_offset(UtcOffset::UTC))?; - - let temp_low = values - .get(1) - .ok_or_else(|| anyhow!("temp_low not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let temp_high = values - .get(2) - .ok_or_else(|| anyhow!("temp_high not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let wind_speed = values - .get(3) - .ok_or_else(|| anyhow!("wind_speed not found in the map")) - .and_then(|raw_speed| match raw_speed { - Value::Int(speed) => Ok(*speed as i64), - _ => Err(anyhow!( - "error converting wind_speed into int: {:?}", - raw_speed - )), - })?; - - Ok(Observed { - date, - temp_low, - temp_high, - wind_speed, - }) - } -} - -impl ToSql for Observed { - fn to_sql(&self) -> duckdb::Result> { - let ordered_struct: OrderedMap = OrderedMap::from(vec![ - ( - String::from("date"), - Value::Text(self.date.format(&Rfc3339).unwrap()), - ), - (String::from("temp_low"), Value::Int(self.temp_low as i32)), - (String::from("temp_high"), Value::Int(self.temp_high as i32)), - ( - String::from("wind_speed"), - Value::Int(self.wind_speed as i32), - ), - ]); - Ok(ToSqlOutput::Owned(Value::Struct(ordered_struct))) - } -} - -impl ToRawSql for Observed { - fn to_raw_sql(&self) -> String { - // Done because the rust library doesn't natively support writing structs to the db just yet, - // Eventually we should be able to delete this code - // example of how to write a struct to duckdb: `INSERT INTO t1 VALUES (ROW('a', 42));` - let mut vals = String::new(); - vals.push_str("ROW('"); - let data_str = self.date.format(&Rfc3339).unwrap(); - vals.push_str(&data_str); - vals.push_str(r#"',"#); - vals.push_str(&format!("{}", self.temp_low)); - vals.push(','); - vals.push_str(&format!("{}", self.temp_high)); - vals.push(','); - vals.push_str(&format!("{}", self.wind_speed)); - vals.push(')'); - vals - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct Forecasted { - #[serde(with = "utc_datetime")] - pub date: OffsetDateTime, - pub temp_low: i64, - pub temp_high: i64, - pub wind_speed: i64, -} - -impl TryInto for &OrderedMap { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let values: Vec<&Value> = self.values().collect(); - let date = values - .first() - .ok_or_else(|| anyhow!("date not found in the map")) - .and_then(|raw_date| match raw_date { - Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), - _ => Err(anyhow!( - "error converting date into OffsetDatetime: {:?}", - raw_date - )), - }) - .and_then(|timestamp| { - OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( - |e| { - anyhow!( - "error parsing forecast date into offsetdatetime: {} {}", - timestamp, - e - ) - }, - ) - }) - .map(|val| val.to_offset(UtcOffset::UTC))?; - - let temp_low = values - .get(1) - .ok_or_else(|| anyhow!("temp_low not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let temp_high = values - .get(2) - .ok_or_else(|| anyhow!("temp_high not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let wind_speed = values - .get(3) - .ok_or_else(|| anyhow!("wind_speed not found in the map")) - .and_then(|raw_speed| match raw_speed { - Value::Int(speed) => Ok(*speed as i64), - _ => Err(anyhow!( - "error converting wind_speed into int: {:?}", - raw_speed - )), - })?; - - Ok(Forecasted { - date, - temp_low, - temp_high, - wind_speed, - }) - } -} - -impl TryInto for OrderedMap { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let values: Vec<&Value> = self.values().collect(); - let date = values - .first() - .ok_or_else(|| anyhow!("date not found in the map")) - .and_then(|raw_date| match raw_date { - Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), - _ => Err(anyhow!( - "error converting date into OffsetDatetime: {:?}", - raw_date - )), - }) - .and_then(|timestamp| { - OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( - |e| { - anyhow!( - "error parsing forecast date into offsetdatetime: {} {}", - timestamp, - e - ) - }, - ) - }) - .map(|val| val.to_offset(UtcOffset::UTC))?; - - let temp_low = values - .get(1) - .ok_or_else(|| anyhow!("temp_low not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let temp_high = values - .get(2) - .ok_or_else(|| anyhow!("temp_high not found in the map")) - .and_then(|raw_temp| match raw_temp { - Value::Int(temp) => Ok(*temp as i64), - _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), - })?; - - let wind_speed = values - .get(3) - .ok_or_else(|| anyhow!("wind_speed not found in the map")) - .and_then(|raw_speed| match raw_speed { - Value::Int(speed) => Ok(*speed as i64), - _ => Err(anyhow!( - "error converting wind_speed into int: {:?}", - raw_speed - )), - })?; - - Ok(Forecasted { - date, - temp_low, - temp_high, - wind_speed, - }) - } -} - -pub trait ToRawSql { - /// Converts Rust value to raw valid DuckDB sql string (if user input make sure to validate before adding to db) - fn to_raw_sql(&self) -> String; -} - -impl ToRawSql for Forecasted { - fn to_raw_sql(&self) -> String { - // Done because the rust library doesn't natively support writing structs to the db just yet, - // Eventually we should be able to delete this code - // example of how to write a struct to duckdb: `INSERT INTO t1 VALUES (ROW('a', 42));` - let mut vals = String::new(); - vals.push_str("ROW('"); - let data_str = self.date.format(&Rfc3339).unwrap(); - vals.push_str(&data_str); - vals.push_str(r#"',"#); - vals.push_str(&format!("{}", self.temp_low)); - vals.push(','); - vals.push_str(&format!("{}", self.temp_high)); - vals.push(','); - vals.push_str(&format!("{}", self.wind_speed)); - vals.push(')'); - vals - } -} - -impl ToSql for Forecasted { - fn to_sql(&self) -> duckdb::Result> { - let ordered_struct: OrderedMap = OrderedMap::from(vec![ - ( - String::from("date"), - Value::Text(self.date.format(&Rfc3339).unwrap()), - ), - (String::from("temp_low"), Value::Int(self.temp_low as i32)), - (String::from("temp_high"), Value::Int(self.temp_high as i32)), - ( - String::from("wind_speed"), - Value::Int(self.wind_speed as i32), - ), - ]); - Ok(ToSqlOutput::Owned(Value::Struct(ordered_struct))) - } -} - -// Once submitted for now don't allow changes -// Decide if we want to add a pubkey for who submitted the entry? -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -pub struct AddEventEntry { - /// Client needs to provide a valid Uuidv7 - pub id: Uuid, - pub event_id: Uuid, - pub expected_observations: Vec, -} - -impl From for WeatherEntry { - fn from(value: AddEventEntry) -> Self { - WeatherEntry { - id: value.id, - event_id: value.event_id, - expected_observations: value.expected_observations, - score: None, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct WeatherEntry { - pub id: Uuid, - pub event_id: Uuid, - pub expected_observations: Vec, - /// A score wont appear until the observation_date has begun - pub score: Option, -} - -impl TryInto for &OrderedMap { - type Error = anyhow::Error; - fn try_into(self) -> Result { - debug!("raw weather entry: {:?}", self); - let values: Vec<&Value> = self.values().collect(); - let id = values - .first() - .ok_or_else(|| anyhow!("id not found in the map")) - .and_then(|raw_id| match raw_id { - Value::Text(id) => Ok(id), - _ => Err(anyhow!( - "error converting weather entry id into string: {:?}", - raw_id - )), - }) - .and_then(|id| { - Uuid::parse_str(id) - .map_err(|e| anyhow!("error converting weather entry id into uuid: {}", e)) - })?; - - let event_id = values - .get(1) - .ok_or_else(|| anyhow!("event_id not found in the map")) - .and_then(|raw_id| match raw_id { - Value::Text(id) => Ok(id), - _ => Err(anyhow!( - "error converting weather event id into string: {:?}", - raw_id - )), - }) - .and_then(|id| { - Uuid::parse_str(id) - .map_err(|e| anyhow!("error converting weather event id into uuid: {}", e)) - })?; - - let expected_observations = values - .get(2) - .ok_or_else(|| anyhow!("expect_observations not found in the map")) - .and_then(|raw| match raw { - Value::List(expected_observations) => Ok(expected_observations), - _ => Err(anyhow!( - "error converting expect_observations into struct: {:?}", - raw - )), - }) - .and_then(|weather_choices| { - let mut converted = vec![]; - for weather_choice in weather_choices { - let weather_struct_choice = match weather_choice { - Value::Struct(weather_choice_struct) => weather_choice_struct.try_into()?, - _ => { - return Err(anyhow!( - "error converting weather_choice into struct: {:?}", - weather_choice - )) - } - }; - converted.push(weather_struct_choice); - } - Ok(converted) - })?; - - let score = values.get(3).and_then(|raw_id| match raw_id { - Value::Int(id) => Some(*id as i64), - _ => None, - }); - - Ok(WeatherEntry { - id, - event_id, - score, - expected_observations, - }) - } -} - -impl<'a> TryFrom<&Row<'a>> for WeatherEntry { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - Ok(WeatherEntry { - id: row - .get::(0) - .map(|val| Uuid::parse_str(&val))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - event_id: row - .get::(1) - .map(|val| Uuid::parse_str(&val))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - score: row - .get::>(2) - .map(|val| val.filter(|&val| val != 0))?, - expected_observations: vec![], - }) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct WeatherChoicesWithEntry { - pub entry_id: Uuid, - // NOAA weather stations we're using - pub stations: String, - pub temp_high: Option, - pub temp_low: Option, - pub wind_speed: Option, -} - -impl<'a> TryFrom<&Row<'a>> for WeatherChoicesWithEntry { - type Error = duckdb::Error; - fn try_from(row: &Row<'a>) -> Result { - Ok(WeatherChoicesWithEntry { - entry_id: row - .get::(0) - .map(|val| Uuid::parse_str(&val))? - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - stations: row - .get::(1) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - temp_low: row - .get::>(2) - .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - temp_high: row - .get::>(3) - .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, - wind_speed: row - .get::>(4) - .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(4, Type::Any, Box::new(e)))?, - }) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub struct WeatherChoices { - // NOAA weather stations we're using - pub stations: String, - pub temp_high: Option, - pub temp_low: Option, - pub wind_speed: Option, -} - -impl From for WeatherChoices { - fn from(value: WeatherChoicesWithEntry) -> Self { - Self { - stations: value.stations, - temp_high: value.temp_high, - temp_low: value.temp_low, - wind_speed: value.wind_speed, - } - } -} - -impl<'a> TryFrom<&Row<'a>> for WeatherChoices { - type Error = duckdb::Error; - - fn try_from(row: &Row) -> Result { - Ok(WeatherChoices { - stations: row - .get::(0) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, - temp_low: row - .get::>(1) - .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, - temp_high: row - .get::>(2) - .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - wind_speed: row - .get::>(3) - .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) - .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, - }) - } -} - -impl TryInto for &OrderedMap { - type Error = anyhow::Error; - fn try_into(self) -> Result { - debug!("raw weather choices: {:?}", self); - let values: Vec<&Value> = self.values().collect(); - let stations = values - .first() - .ok_or_else(|| anyhow!("stations not found in the map")) - .and_then(|raw_station| match raw_station { - Value::Text(station) => Ok(station.clone()), - _ => Err(anyhow!( - "error converting station id into string: {:?}", - raw_station - )), - })?; - let temp_low = values.get(1).and_then(|raw_temp| match raw_temp { - Value::Text(temp) => ValueOptions::try_from(temp.clone()).ok(), - _ => None, - }); - let temp_high = values.get(2).and_then(|raw_temp| match raw_temp { - Value::Text(temp) => ValueOptions::try_from(temp.clone()).ok(), - _ => None, - }); - let wind_speed = values - .get(3) - .and_then(|raw_wind_speed| match raw_wind_speed { - Value::Text(wind_speed) => ValueOptions::try_from(wind_speed.clone()).ok(), - _ => None, - }); - Ok(WeatherChoices { - stations, - temp_low, - temp_high, - wind_speed, - }) - } -} - -#[allow(clippy::from_over_into)] -impl Into for &WeatherChoices { - fn into(self) -> Value { - let temp_low = match self.temp_low.clone() { - Some(val) => Value::Text(val.to_string()), - None => Value::Null, - }; - let temp_high = match self.temp_high.clone() { - Some(val) => Value::Text(val.to_string()), - None => Value::Null, - }; - let wind_speed = match self.wind_speed.clone() { - Some(val) => Value::Text(val.to_string()), - None => Value::Null, - }; - let ordered_struct: OrderedMap = OrderedMap::from(vec![ - (String::from("stations"), Value::Text(self.stations.clone())), - (String::from("temp_low"), temp_low), - (String::from("temp_high"), temp_high), - (String::from("wind_speed"), wind_speed), - ]); - Value::Struct(ordered_struct) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] -pub enum ValueOptions { - Over, - // Par is what was forecasted for this value - Par, - Under, -} - -impl std::fmt::Display for ValueOptions { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Over => write!(f, "over"), - Self::Par => write!(f, "par"), - Self::Under => write!(f, "under"), - } - } -} - -impl TryFrom<&str> for ValueOptions { - type Error = anyhow::Error; - - fn try_from(s: &str) -> Result { - match s { - "over" => Ok(ValueOptions::Over), - "par" => Ok(ValueOptions::Par), - "under" => Ok(ValueOptions::Under), - val => Err(anyhow!("invalid option: {}", val)), - } - } -} - -impl TryFrom for ValueOptions { - type Error = anyhow::Error; - - fn try_from(s: String) -> Result { - match s.as_str() { - "over" => Ok(ValueOptions::Over), - "par" => Ok(ValueOptions::Par), - "under" => Ok(ValueOptions::Under), - val => Err(anyhow!("invalid option: {}", val)), - } - } -} diff --git a/oracle/src/db/event_db_migrations.rs b/oracle/src/db/event_db_migrations.rs index 4df7aae..494c4b2 100644 --- a/oracle/src/db/event_db_migrations.rs +++ b/oracle/src/db/event_db_migrations.rs @@ -59,6 +59,7 @@ pub fn create_initial_schema(conn: &mut Connection) -> Result<(), duckdb::Error> nonce BLOB NOT NULL, event_annoucement BLOB NOT NULL, locations TEXT[] NOT NULL, + coordinator_pubkey TEXT, attestation_signature BLOB ); diff --git a/oracle/src/db/mod.rs b/oracle/src/db/mod.rs index 65bb4b6..b0fdac5 100644 --- a/oracle/src/db/mod.rs +++ b/oracle/src/db/mod.rs @@ -1,7 +1,1628 @@ +use anyhow::anyhow; +use dlctix::bitcoin::{hashes::sha256, XOnlyPublicKey}; +use dlctix::musig2::secp256k1::schnorr::Signature; +use dlctix::musig2::secp256k1::{Message, PublicKey}; +use dlctix::secp::{MaybeScalar, Scalar}; +use dlctix::EventAnnouncement; +use duckdb::arrow::datatypes::ToByteSlice; +use duckdb::types::{OrderedMap, ToSqlOutput, Type, Value}; +use duckdb::{ffi, ErrorCode, Row, ToSql}; +use itertools::Itertools; +use log::{debug, info}; +use serde::{Deserialize, Serialize}; +use time::format_description::well_known::Rfc3339; +use time::macros::format_description; +use time::{Date, Duration, OffsetDateTime, UtcOffset}; +use utoipa::{IntoParams, ToSchema}; +use uuid::Uuid; + pub mod event_data; pub mod event_db_migrations; pub mod weather_data; - pub use event_data::*; pub use event_db_migrations::*; pub use weather_data::{Forecast, Observation, Station, WeatherData}; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct CreateEvent { + /// Client needs to provide a valid Uuidv7 + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + /// Time at which the attestation will be added to the event, needs to be after the observation date + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + /// Date of when the weather observations occured (midnight UTC), all entries must be made before this time + pub observation_date: OffsetDateTime, + /// NOAA observation stations used in this event + pub locations: Vec, + /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) + pub number_of_values_per_entry: usize, + /// Total number of allowed entries into the event + pub total_allowed_entries: usize, + /// Total amount of places that are part of the winnings split + pub number_of_places_win: usize, + /// Add a coordinator that will use the event entries in a competition + pub coordinator: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct CreateEventMessage { + /// Client needs to provide a valid Uuidv7 + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + /// Time at which the attestation will be added to the event, needs to be after the observation date + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + /// Date of when the weather observations occured (midnight UTC), all entries must be made before this time + pub observation_date: OffsetDateTime, + /// NOAA observation stations used in this event + pub locations: Vec, + /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) + pub number_of_values_per_entry: usize, + /// Total number of allowed entries into the event + pub total_allowed_entries: usize, + /// Total amount of places that are part of the winnings split + pub number_of_places_win: usize, +} + +impl CreateEventMessage { + pub fn message(&self) -> Result { + let message_str = serde_json::to_string(self)?; + let message = Message::from_hashed_data::(message_str.as_bytes()); + Ok(message) + } +} + +impl From for CreateEventMessage { + fn from(value: CreateEvent) -> Self { + Self { + id: value.id, + signing_date: value.signing_date, + observation_date: value.observation_date, + locations: value.locations, + number_of_values_per_entry: value.number_of_values_per_entry, + total_allowed_entries: value.total_allowed_entries, + number_of_places_win: value.number_of_places_win, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct CoordinatorInfo { + /// The pubkey of the coordinator + pub pubkey: String, + /// The values of the payload signed by the coordinator + pub signature: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateEventData { + /// Provide UUIDv7 to use for looking up the event + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + /// Time at which the attestation will be added to the event + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + /// Date of when the weather observations occured (midnight UTC), all entries must be made before this time + pub observation_date: OffsetDateTime, + // NOAA observation stations used in this event + pub locations: Vec, + /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) + pub number_of_values_per_entry: i64, + pub total_allowed_entries: i64, + pub number_of_places_win: i64, + /// Used to sign the result of the event being watched + pub nonce: Scalar, + /// Used in constructing the dlctix transactions + pub event_annoucement: EventAnnouncement, + /// The pubkey of the coordinator + pub coordinator_pubkey: Option, +} + +impl CreateEventData { + pub fn new(oracle_pubkey: PublicKey, event: CreateEvent) -> Result { + if event.id.get_version_num() != 7 { + return Err(anyhow!( + "Client needs to provide a valid Uuidv7 for event id {}", + event.id + )); + } + if event.observation_date > event.signing_date { + return Err(anyhow::anyhow!( + "Signing date {} needs to be after observation date {}", + event.signing_date.format(&Rfc3339).unwrap(), + event.observation_date.format(&Rfc3339).unwrap() + )); + } + + // number_of_values_per_entry * 2 == max value, create array from max value to 0 + // determine all possible messages that we might sign + let max_number_of_points_per_value_in_entry = 2; + let possible_scores: Vec = (0..=(event.number_of_values_per_entry + * max_number_of_points_per_value_in_entry)) + .map(|val| val as i64) + .collect(); + + // allows us to have comps where say the top 3 scores split the pot + let possible_outcome_rankings: Vec> = possible_scores + .iter() + .combinations(event.number_of_places_win) + //Sort possible combinations in desc order + .map(|mut combos| { + combos.sort_by_key(|n| i64::MAX - *n); + combos + }) + .filter(|combination| { + // Check if the combination is sorted in descending order, if not filter out of possible outcomes + combination.windows(2).all(|window| window[0] >= window[1]) + }) + .map(|combination| combination.into_iter().cloned().collect()) + .collect(); + info!("outcomes: {:?}", possible_outcome_rankings); + // holds all possible scoring results of the event + let outcome_messages: Vec> = possible_outcome_rankings + .into_iter() + .map(|inner_vec| { + inner_vec + .into_iter() + .flat_map(|num| num.to_be_bytes()) + .collect::>() + }) + .collect(); + + let mut rng = rand::thread_rng(); + let nonce = Scalar::random(&mut rng); + let nonce_point = nonce.base_point_mul(); + // Manually set expiry to 7 days after the signature should have been proveded so users can get their funds back + let expiry = event + .signing_date + .saturating_add(Duration::DAY * 7) + .unix_timestamp() as u32; + + // The actual accounement the oracle is going to attest the outcome + let event_annoucement = EventAnnouncement { + oracle_pubkey: oracle_pubkey.into(), + nonce_point, + outcome_messages, + expiry: Some(expiry), + }; + + Ok(Self { + id: event.id, + observation_date: event.observation_date, + signing_date: event.signing_date, + nonce, + total_allowed_entries: event.total_allowed_entries as i64, + number_of_places_win: event.number_of_places_win as i64, + number_of_values_per_entry: event.number_of_values_per_entry as i64, + locations: event.clone().locations, + event_annoucement, + coordinator_pubkey: event + .coordinator + .map(|v| Some(v.pubkey)) + .unwrap_or_default(), + }) + } +} + +/// Validates the received messages was created by the provided pubkey +pub fn validate(message: Message, pubkey_str: &str, signature: &str) -> Result<(), anyhow::Error> { + info!("pubkey: {} signature: {}", pubkey_str, signature); + let raw_signature: Vec = hex::decode(signature).unwrap(); + let sig: Signature = Signature::from_slice(raw_signature.as_slice()) + .map_err(|e| anyhow!("invalid signature: {}", e))?; + let raw_pubkey: Vec = hex::decode(pubkey_str).unwrap(); + let pubkey: XOnlyPublicKey = XOnlyPublicKey::from_slice(raw_pubkey.as_slice()) + .map_err(|e| anyhow!("invalid pubkey: {}", e))?; + sig.verify(&message, &pubkey).map_err(|e| { + anyhow!( + "invalid signature {} for pubkey {} {}", + signature, + pubkey, + e + ) + })?; + Ok(()) +} + +impl From for Event { + fn from(value: CreateEventData) -> Self { + Self { + id: value.id, + signing_date: value.signing_date, + observation_date: value.observation_date, + locations: value.locations, + total_allowed_entries: value.total_allowed_entries, + number_of_places_win: value.number_of_places_win, + number_of_values_per_entry: value.number_of_values_per_entry, + event_annoucement: value.event_annoucement, + nonce: value.nonce, + status: EventStatus::default(), + entry_ids: vec![], + entries: vec![], + weather: vec![], + attestation: None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, IntoParams)] +pub struct EventFilter { + // TODO: add more options, proper pagination and search + pub limit: Option, + pub event_ids: Option>, +} + +impl Default for EventFilter { + fn default() -> Self { + Self { + limit: Some(100_usize), + event_ids: None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct SignEvent { + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub observation_date: OffsetDateTime, + pub status: EventStatus, + pub nonce: Scalar, + pub event_annoucement: EventAnnouncement, + pub number_of_places_win: i64, + pub number_of_values_per_entry: i64, + pub attestation: Option, +} + +impl SignEvent { + pub fn update_status(&mut self) { + self.status = get_status(self.observation_date, self.attestation) + } +} + +impl<'a> TryFrom<&Row<'a>> for SignEvent { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + //raw date format 2024-08-11 00:27:39.013046-04 + let sql_time_format = format_description!( + "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" + ); + let mut sign_events = SignEvent { + id: row + .get::(0) + .map(|val| Uuid::parse_str(&val))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + signing_date: row + .get::(1) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + observation_date: row + .get::(2) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, + status: EventStatus::default(), + number_of_places_win: row.get::(3)?, + number_of_values_per_entry: row.get::(4)?, + attestation: row + .get::(5) + .map(|v| { + let blob_attestation = match v { + Value::Blob(raw) => raw, + _ => vec![], + }; + if !blob_attestation.is_empty() { + //TODO: handle the conversion more gracefully than unwrap + Some(MaybeScalar::from_slice(blob_attestation.to_byte_slice()).unwrap()) + } else { + None + } + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(5, Type::Any, Box::new(e)))?, + nonce: row + .get::(6) + .map(|raw| { + let blob = match raw { + Value::Blob(val) => val, + _ => vec![], + }; + serde_json::from_slice(&blob) + })? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(6, Type::Any, Box::new(e)))?, + event_annoucement: row + .get::(7) + .map(|raw| { + let blob = match raw { + Value::Blob(val) => val, + _ => vec![], + }; + serde_json::from_slice(&blob) + })? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(7, Type::Any, Box::new(e)))?, + }; + sign_events.update_status(); + Ok(sign_events) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct ActiveEvent { + pub id: Uuid, + pub locations: Vec, + #[serde(with = "time::serde::rfc3339")] + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub observation_date: OffsetDateTime, + pub status: EventStatus, + pub total_allowed_entries: i64, + pub total_entries: i64, + pub number_of_values_per_entry: i64, + pub number_of_places_win: i64, + pub attestation: Option, +} + +impl ActiveEvent { + pub fn update_status(&mut self) { + self.status = get_status(self.observation_date, self.attestation) + } +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub enum EventStatus { + /// Observation date has not passed yet and entries can be added + #[default] + Live, + /// Currently in the Observation date, entries cannot be added + Running, + /// Event Observation window has finished, not yet signed + Completed, + /// Event has completed and been signed by the oracle + Signed, +} + +impl std::fmt::Display for EventStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Live => write!(f, "live"), + Self::Running => write!(f, "running"), + Self::Completed => write!(f, "completed"), + Self::Signed => write!(f, "signed"), + } + } +} + +impl TryFrom<&str> for EventStatus { + type Error = anyhow::Error; + + fn try_from(s: &str) -> Result { + match s { + "live" => Ok(EventStatus::Live), + "running" => Ok(EventStatus::Running), + "completed" => Ok(EventStatus::Completed), + "signed" => Ok(EventStatus::Signed), + val => Err(anyhow!("invalid status: {}", val)), + } + } +} + +impl TryFrom for EventStatus { + type Error = anyhow::Error; + + fn try_from(s: String) -> Result { + match s.as_str() { + "live" => Ok(EventStatus::Live), + "running" => Ok(EventStatus::Running), + "completed" => Ok(EventStatus::Completed), + "signed" => Ok(EventStatus::Signed), + val => Err(anyhow!("invalid status: {}", val)), + } + } +} + +impl<'a> TryFrom<&Row<'a>> for ActiveEvent { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + //raw date format 2024-08-11 00:27:39.013046-04 + let sql_time_format = format_description!( + "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" + ); + let mut active_events = ActiveEvent { + id: row + .get::(0) + .map(|val| Uuid::parse_str(&val))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + signing_date: row + .get::(1) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + observation_date: row + .get::(2) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, + locations: row + .get::(3) + .map(|locations| { + let list_locations = match locations { + Value::List(list) => list, + _ => vec![], + }; + let mut locations_conv = vec![]; + for value in list_locations.iter() { + if let Value::Text(location) = value { + locations_conv.push(location.clone()) + } + } + locations_conv + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, + total_allowed_entries: row.get::(4)?, + status: EventStatus::default(), + total_entries: row.get::(5)?, + number_of_places_win: row.get::(6)?, + number_of_values_per_entry: row.get::(7)?, + attestation: row + .get::(8) + .map(|v| { + let blob_attestation = match v { + Value::Blob(raw) => raw, + _ => vec![], + }; + if !blob_attestation.is_empty() { + //TODO: handle the conversion more gracefully than unwrap + Some(MaybeScalar::from_slice(blob_attestation.to_byte_slice()).unwrap()) + } else { + None + } + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(8, Type::Any, Box::new(e)))?, + }; + active_events.update_status(); + Ok(active_events) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct EventSummary { + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + /// Time at which the attestation will be added to the event + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + /// Date of when the weather observations occured + pub observation_date: OffsetDateTime, + /// NOAA observation stations used in this event + pub locations: Vec, + /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) + pub number_of_values_per_entry: i64, + /// Current status of the event, where in the lifecyle are we (LIVE, RUNNING, COMPLETED, SIGNED, defaults to LIVE) + pub status: EventStatus, + /// Knowing the total number of entries, how many can place + /// The dlctix coordinator can determine how many transactions to create + pub total_allowed_entries: i64, + /// Needs to all be generated at the start + pub total_entries: i64, + pub number_of_places_win: i64, + /// The forecasted and observed values for each station on the event date + pub weather: Vec, + /// When added it means the oracle has signed that the current data is the final result + pub attestation: Option, + /// Used to sign the result of the event being watched + pub nonce: Scalar, +} + +impl EventSummary { + pub fn update_status(&mut self) { + self.status = get_status(self.observation_date, self.attestation) + } +} + +pub fn get_status( + observation_date: OffsetDateTime, + attestation: Option, +) -> EventStatus { + //always have the events run for a single day for now + if observation_date < OffsetDateTime::now_utc() + && observation_date.saturating_sub(Duration::days(1)) > OffsetDateTime::now_utc() + && attestation.is_none() + { + return EventStatus::Running; + } + + if observation_date < OffsetDateTime::now_utc() + && observation_date.saturating_sub(Duration::days(1)) < OffsetDateTime::now_utc() + && attestation.is_none() + { + return EventStatus::Completed; + } + + if attestation.is_some() { + return EventStatus::Signed; + } + //default to live + EventStatus::Live +} + +impl<'a> TryFrom<&Row<'a>> for EventSummary { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + //raw date format 2024-08-11 00:27:39.013046-04 + let sql_time_format = format_description!( + "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" + ); + let mut event_summary = EventSummary { + id: row + .get::(0) + .map(|val| Uuid::parse_str(&val))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + signing_date: row + .get::(1) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + observation_date: row + .get::(2) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, + status: EventStatus::default(), + locations: row + .get::(3) + .map(|locations| { + let list_locations = match locations { + Value::List(list) => list, + _ => vec![], + }; + let mut locations_conv = vec![]; + for value in list_locations.iter() { + if let Value::Text(location) = value { + locations_conv.push(location.clone()) + } + } + locations_conv + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, + total_allowed_entries: row.get::(4)?, + total_entries: row.get::(5)?, + number_of_places_win: row.get::(6)?, + number_of_values_per_entry: row.get::(7)?, + attestation: row + .get::(8) + .map(|v| { + let blob_attestation = match v { + Value::Blob(raw) => raw, + _ => vec![], + }; + if !blob_attestation.is_empty() { + //TODO: handle the conversion more gracefully than unwrap + Some(MaybeScalar::from_slice(blob_attestation.to_byte_slice()).unwrap()) + } else { + None + } + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(8, Type::Any, Box::new(e)))?, + nonce: row + .get::(9) + .map(|raw| { + let blob = match raw { + Value::Blob(val) => val, + _ => vec![], + }; + serde_json::from_slice(&blob) + })? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(9, Type::Any, Box::new(e)))?, + weather: vec![], + }; + event_summary.update_status(); + Ok(event_summary) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct Event { + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + /// Time at which the attestation will be added to the event + pub signing_date: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + /// Date of when the weather observations occured + pub observation_date: OffsetDateTime, + /// NOAA observation stations used in this event + pub locations: Vec, + /// The number of values that can be selected per entry in the event (default to number_of_locations * 3, (temp_low, temp_high, wind_speed)) + pub number_of_values_per_entry: i64, + /// Current status of the event, where in the lifecyle are we (LIVE, RUNNING, COMPLETED, SIGNED) + pub status: EventStatus, + /// Knowing the total number of entries, how many can place + /// The dlctix coordinator can determine how many transactions to create + pub total_allowed_entries: i64, + /// Needs to all be generated at the start + pub entry_ids: Vec, + pub number_of_places_win: i64, + /// All entries into this event, wont be returned until date of observation begins and will be ranked by score + pub entries: Vec, + /// The forecasted and observed values for each station on the event date + pub weather: Vec, + /// Nonce the oracle committed to use as part of signing final results + pub nonce: Scalar, + /// Holds the predefined outcomes the oracle will attest to at event complet + pub event_annoucement: EventAnnouncement, + /// When added it means the oracle has signed that the current data is the final result + pub attestation: Option, +} + +impl Event { + pub fn update_status(&mut self) { + self.status = get_status(self.observation_date, self.attestation) + } +} + +impl<'a> TryFrom<&Row<'a>> for Event { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + //raw date format 2024-08-11 00:27:39.013046-04 + let sql_time_format = format_description!( + "[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]" + ); + let mut oracle_event_data = Event { + id: row + .get::(0) + .map(|val| { + debug!("{}", val.to_string()); + Uuid::parse_str(&val) + })? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + signing_date: row + .get::(1) + .map(|val| { + debug!("{}", val.to_string()); + OffsetDateTime::parse(&val, &sql_time_format) + })? + .map(|val| { + debug!("{}", val.to_string()); + val.to_offset(UtcOffset::UTC) + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + observation_date: row + .get::(2) + .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? + .map(|val| val.to_offset(UtcOffset::UTC)) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, + event_annoucement: row + .get::(3) + .map(|raw| { + let blob = match raw { + Value::Blob(val) => val, + _ => vec![], + }; + serde_json::from_slice(&blob) + })? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, + locations: row + .get::(4) + .map(|locations| { + let list_locations = match locations { + Value::List(list) => list, + _ => vec![], + }; + let mut locations_conv = vec![]; + for value in list_locations.iter() { + if let Value::Text(location) = value { + locations_conv.push(location.clone()) + } + } + info!("locations: {:?}", locations_conv); + locations_conv + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(4, Type::Any, Box::new(e)))?, + total_allowed_entries: row.get::(5)?, + number_of_places_win: row.get::(6)?, + number_of_values_per_entry: row.get::(7)?, + attestation: row + .get::(8) + .map(|v| { + info!("val: {:?}", v); + let blob_attestation = match v { + Value::Blob(raw) => raw, + _ => vec![], + }; + if !blob_attestation.is_empty() { + //TODO: handle the conversion more gracefully than unwrap + let converted: MaybeScalar = + serde_json::from_slice(&blob_attestation).unwrap(); + Some(converted) + } else { + None + } + }) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(8, Type::Any, Box::new(e)))?, + nonce: row + .get::(9) + .map(|raw| { + let blob = match raw { + Value::Blob(val) => val, + _ => vec![], + }; + serde_json::from_slice(&blob) + })? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(9, Type::Any, Box::new(e)))?, + status: EventStatus::default(), + //These nested values have to be made by more quries + entry_ids: vec![], + entries: vec![], + weather: vec![], + }; + oracle_event_data.update_status(); + Ok(oracle_event_data) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct Weather { + pub station_id: String, + pub observed: Option, + pub forecasted: Forecasted, +} + +impl<'a> TryFrom<&Row<'a>> for Weather { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + let observed: Option = row + .get::(1) + .map(|raw_observed| match raw_observed.clone() { + Value::Struct(observed) => Some(observed.try_into().map_err(|e: anyhow::Error| { + duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + Some(format!( + "error formatting observed: {:?} {}", + raw_observed, e + )), + ) + })), + _ => None, + }) + .and_then(|option_inner_result| match option_inner_result { + Some(inner_result) => inner_result.map(Some), + None => Ok(None), + })?; + + let forecasted: Forecasted = + row.get::(2) + .map(|raw_forecasted| match raw_forecasted.clone() { + Value::Struct(forecasted) => { + forecasted.try_into().map_err(|e: anyhow::Error| { + duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + Some(format!( + "error formatting forecast: {:?} {}", + raw_forecasted, e + )), + ) + }) + } + _ => Err(duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + None, + )), + })??; + Ok(Weather { + station_id: row.get::(0)?, + forecasted, + observed, + }) + } +} + +impl TryFrom<&Forecast> for Forecasted { + type Error = weather_data::Error; + fn try_from(value: &Forecast) -> Result { + let format = format_description!("[year]-[month]-[day]"); + let date = Date::parse(&value.date, format)?; + let datetime = date.with_hms(0, 0, 0).unwrap(); + let datetime_off = datetime.assume_offset(UtcOffset::from_hms(0, 0, 0).unwrap()); + Ok(Self { + date: datetime_off, + temp_low: value.temp_low, + temp_high: value.temp_high, + wind_speed: value.wind_speed, + }) + } +} + +impl TryInto for &OrderedMap { + type Error = duckdb::Error; + + fn try_into(self) -> Result { + let values: Vec<&Value> = self.values().collect(); + + let station_id = values + .first() + .ok_or_else(|| { + duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + Some(String::from("unable to convert station_id")), + ) + }) + .and_then(|raw_station| match raw_station { + Value::Text(station) => Ok(station.clone()), + _ => Err(duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + Some(format!( + "error converting station id into string: {:?}", + raw_station + )), + )), + })?; + let observed: Option = if let Some(Value::Struct(observed)) = values.get(1) { + let observed_converted = observed.try_into().map_err(|e| { + duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + Some(format!("error converting observed: {}", e)), + ) + })?; + Some(observed_converted) + } else { + None + }; + let forecasted = values + .get(2) + .ok_or_else(|| anyhow!("forecasted not found in the map")) + .and_then(|raw_forecasted| match raw_forecasted { + Value::Struct(forecasted) => forecasted.try_into(), + _ => Err(anyhow!( + "error converting forecasted into struct: {:?}", + raw_forecasted + )), + }) + .map_err(|e| { + duckdb::Error::DuckDBFailure( + ffi::Error { + code: ErrorCode::TypeMismatch, + extended_code: 0, + }, + Some(e.to_string()), + ) + })?; + Ok(Weather { + station_id, + observed, + forecasted, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct Observed { + #[serde(with = "time::serde::rfc3339")] + pub date: OffsetDateTime, + pub temp_low: i64, + pub temp_high: i64, + pub wind_speed: i64, +} + +impl TryFrom<&Observation> for Observed { + type Error = weather_data::Error; + fn try_from(value: &Observation) -> Result { + Ok(Self { + date: OffsetDateTime::parse(&value.start_time, &Rfc3339)?, + temp_low: value.temp_low.round() as i64, + temp_high: value.temp_high.round() as i64, + wind_speed: value.wind_speed, + }) + } +} + +impl TryInto for &OrderedMap { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + debug!("raw observed: {:?}", self); + let values: Vec<&Value> = self.values().collect(); + let date = values + .first() + .ok_or_else(|| anyhow!("date not found in the map")) + .and_then(|raw_date| match raw_date { + Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), + v => Err(anyhow!( + "error converting observed date into OffsetDatetime: {:?}, {:?}", + raw_date, + v + )), + }) + .and_then(|timestamp| { + OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( + |e| { + anyhow!( + "error parsing observed date into offsetdatetime: {} {}", + timestamp, + e + ) + }, + ) + }) + .map(|val| val.to_offset(UtcOffset::UTC))?; + + let temp_low = values + .get(1) + .ok_or_else(|| anyhow!("temp_low not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let temp_high = values + .get(2) + .ok_or_else(|| anyhow!("temp_high not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let wind_speed = values + .get(3) + .ok_or_else(|| anyhow!("wind_speed not found in the map")) + .and_then(|raw_speed| match raw_speed { + Value::Int(speed) => Ok(*speed as i64), + _ => Err(anyhow!( + "error converting wind_speed into int: {:?}", + raw_speed + )), + })?; + + Ok(Observed { + date, + temp_low, + temp_high, + wind_speed, + }) + } +} + +impl TryInto for OrderedMap { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + debug!("raw observed: {:?}", self); + let values: Vec<&Value> = self.values().collect(); + let date = values + .first() + .ok_or_else(|| anyhow!("date not found in the map")) + .and_then(|raw_date| match raw_date { + Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), + v => Err(anyhow!( + "error converting observed date into OffsetDatetime: {:?}, {:?}", + raw_date, + v + )), + }) + .and_then(|timestamp| { + OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( + |e| { + anyhow!( + "error parsing observed date into offsetdatetime: {} {}", + timestamp, + e + ) + }, + ) + }) + .map(|val| val.to_offset(UtcOffset::UTC))?; + + let temp_low = values + .get(1) + .ok_or_else(|| anyhow!("temp_low not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let temp_high = values + .get(2) + .ok_or_else(|| anyhow!("temp_high not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let wind_speed = values + .get(3) + .ok_or_else(|| anyhow!("wind_speed not found in the map")) + .and_then(|raw_speed| match raw_speed { + Value::Int(speed) => Ok(*speed as i64), + _ => Err(anyhow!( + "error converting wind_speed into int: {:?}", + raw_speed + )), + })?; + + Ok(Observed { + date, + temp_low, + temp_high, + wind_speed, + }) + } +} + +impl ToSql for Observed { + fn to_sql(&self) -> duckdb::Result> { + let ordered_struct: OrderedMap = OrderedMap::from(vec![ + ( + String::from("date"), + Value::Text(self.date.format(&Rfc3339).unwrap()), + ), + (String::from("temp_low"), Value::Int(self.temp_low as i32)), + (String::from("temp_high"), Value::Int(self.temp_high as i32)), + ( + String::from("wind_speed"), + Value::Int(self.wind_speed as i32), + ), + ]); + Ok(ToSqlOutput::Owned(Value::Struct(ordered_struct))) + } +} + +impl ToRawSql for Observed { + fn to_raw_sql(&self) -> String { + // Done because the rust library doesn't natively support writing structs to the db just yet, + // Eventually we should be able to delete this code + // example of how to write a struct to duckdb: `INSERT INTO t1 VALUES (ROW('a', 42));` + let mut vals = String::new(); + vals.push_str("ROW('"); + let data_str = self.date.format(&Rfc3339).unwrap(); + vals.push_str(&data_str); + vals.push_str(r#"',"#); + vals.push_str(&format!("{}", self.temp_low)); + vals.push(','); + vals.push_str(&format!("{}", self.temp_high)); + vals.push(','); + vals.push_str(&format!("{}", self.wind_speed)); + vals.push(')'); + vals + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct Forecasted { + #[serde(with = "time::serde::rfc3339")] + pub date: OffsetDateTime, + pub temp_low: i64, + pub temp_high: i64, + pub wind_speed: i64, +} + +impl TryInto for &OrderedMap { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + let values: Vec<&Value> = self.values().collect(); + let date = values + .first() + .ok_or_else(|| anyhow!("date not found in the map")) + .and_then(|raw_date| match raw_date { + Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), + _ => Err(anyhow!( + "error converting date into OffsetDatetime: {:?}", + raw_date + )), + }) + .and_then(|timestamp| { + OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( + |e| { + anyhow!( + "error parsing forecast date into offsetdatetime: {} {}", + timestamp, + e + ) + }, + ) + }) + .map(|val| val.to_offset(UtcOffset::UTC))?; + + let temp_low = values + .get(1) + .ok_or_else(|| anyhow!("temp_low not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let temp_high = values + .get(2) + .ok_or_else(|| anyhow!("temp_high not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let wind_speed = values + .get(3) + .ok_or_else(|| anyhow!("wind_speed not found in the map")) + .and_then(|raw_speed| match raw_speed { + Value::Int(speed) => Ok(*speed as i64), + _ => Err(anyhow!( + "error converting wind_speed into int: {:?}", + raw_speed + )), + })?; + + Ok(Forecasted { + date, + temp_low, + temp_high, + wind_speed, + }) + } +} + +impl TryInto for OrderedMap { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + let values: Vec<&Value> = self.values().collect(); + let date = values + .first() + .ok_or_else(|| anyhow!("date not found in the map")) + .and_then(|raw_date| match raw_date { + Value::Timestamp(duckdb::types::TimeUnit::Microsecond, raw_date) => Ok(raw_date), + _ => Err(anyhow!( + "error converting date into OffsetDatetime: {:?}", + raw_date + )), + }) + .and_then(|timestamp| { + OffsetDateTime::from_unix_timestamp_nanos((*timestamp as i128) * 1000_i128).map_err( + |e| { + anyhow!( + "error parsing forecast date into offsetdatetime: {} {}", + timestamp, + e + ) + }, + ) + }) + .map(|val| val.to_offset(UtcOffset::UTC))?; + + let temp_low = values + .get(1) + .ok_or_else(|| anyhow!("temp_low not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let temp_high = values + .get(2) + .ok_or_else(|| anyhow!("temp_high not found in the map")) + .and_then(|raw_temp| match raw_temp { + Value::Int(temp) => Ok(*temp as i64), + _ => Err(anyhow!("error converting temp into int: {:?}", raw_temp)), + })?; + + let wind_speed = values + .get(3) + .ok_or_else(|| anyhow!("wind_speed not found in the map")) + .and_then(|raw_speed| match raw_speed { + Value::Int(speed) => Ok(*speed as i64), + _ => Err(anyhow!( + "error converting wind_speed into int: {:?}", + raw_speed + )), + })?; + + Ok(Forecasted { + date, + temp_low, + temp_high, + wind_speed, + }) + } +} + +pub trait ToRawSql { + /// Converts Rust value to raw valid DuckDB sql string (if user input make sure to validate before adding to db) + fn to_raw_sql(&self) -> String; +} + +impl ToRawSql for Forecasted { + fn to_raw_sql(&self) -> String { + // Done because the rust library doesn't natively support writing structs to the db just yet, + // Eventually we should be able to delete this code + // example of how to write a struct to duckdb: `INSERT INTO t1 VALUES (ROW('a', 42));` + let mut vals = String::new(); + vals.push_str("ROW('"); + let data_str = self.date.format(&Rfc3339).unwrap(); + vals.push_str(&data_str); + vals.push_str(r#"',"#); + vals.push_str(&format!("{}", self.temp_low)); + vals.push(','); + vals.push_str(&format!("{}", self.temp_high)); + vals.push(','); + vals.push_str(&format!("{}", self.wind_speed)); + vals.push(')'); + vals + } +} + +impl ToSql for Forecasted { + fn to_sql(&self) -> duckdb::Result> { + let ordered_struct: OrderedMap = OrderedMap::from(vec![ + ( + String::from("date"), + Value::Text(self.date.format(&Rfc3339).unwrap()), + ), + (String::from("temp_low"), Value::Int(self.temp_low as i32)), + (String::from("temp_high"), Value::Int(self.temp_high as i32)), + ( + String::from("wind_speed"), + Value::Int(self.wind_speed as i32), + ), + ]); + Ok(ToSqlOutput::Owned(Value::Struct(ordered_struct))) + } +} + +// Once submitted for now don't allow changes +// Decide if we want to add a pubkey for who submitted the entry? +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct AddEventEntry { + /// Client needs to provide a valid Uuidv7 + pub id: Uuid, + pub event_id: Uuid, + pub expected_observations: Vec, + /// Add the coordinator information that pushed this event entry along + pub coordinator: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct AddEventEntryMessage { + /// Client needs to provide a valid Uuidv7 + pub id: Uuid, + pub event_id: Uuid, + pub expected_observations: Vec, +} + +impl AddEventEntryMessage { + pub fn message(&self) -> Result { + let message_str = serde_json::to_string(self)?; + let message = Message::from_hashed_data::(message_str.as_bytes()); + Ok(message) + } +} + +impl From for AddEventEntryMessage { + fn from(value: AddEventEntry) -> Self { + AddEventEntryMessage { + id: value.id, + event_id: value.event_id, + expected_observations: value.expected_observations, + } + } +} + +impl From for WeatherEntry { + fn from(value: AddEventEntry) -> Self { + WeatherEntry { + id: value.id, + event_id: value.event_id, + expected_observations: value.expected_observations, + score: None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct WeatherEntry { + pub id: Uuid, + pub event_id: Uuid, + pub expected_observations: Vec, + /// A score wont appear until the observation_date has begun + pub score: Option, +} + +impl TryInto for &OrderedMap { + type Error = anyhow::Error; + fn try_into(self) -> Result { + debug!("raw weather entry: {:?}", self); + let values: Vec<&Value> = self.values().collect(); + let id = values + .first() + .ok_or_else(|| anyhow!("id not found in the map")) + .and_then(|raw_id| match raw_id { + Value::Text(id) => Ok(id), + _ => Err(anyhow!( + "error converting weather entry id into string: {:?}", + raw_id + )), + }) + .and_then(|id| { + Uuid::parse_str(id) + .map_err(|e| anyhow!("error converting weather entry id into uuid: {}", e)) + })?; + + let event_id = values + .get(1) + .ok_or_else(|| anyhow!("event_id not found in the map")) + .and_then(|raw_id| match raw_id { + Value::Text(id) => Ok(id), + _ => Err(anyhow!( + "error converting weather event id into string: {:?}", + raw_id + )), + }) + .and_then(|id| { + Uuid::parse_str(id) + .map_err(|e| anyhow!("error converting weather event id into uuid: {}", e)) + })?; + + let expected_observations = values + .get(2) + .ok_or_else(|| anyhow!("expect_observations not found in the map")) + .and_then(|raw| match raw { + Value::List(expected_observations) => Ok(expected_observations), + _ => Err(anyhow!( + "error converting expect_observations into struct: {:?}", + raw + )), + }) + .and_then(|weather_choices| { + let mut converted = vec![]; + for weather_choice in weather_choices { + let weather_struct_choice = match weather_choice { + Value::Struct(weather_choice_struct) => weather_choice_struct.try_into()?, + _ => { + return Err(anyhow!( + "error converting weather_choice into struct: {:?}", + weather_choice + )) + } + }; + converted.push(weather_struct_choice); + } + Ok(converted) + })?; + + let score = values.get(3).and_then(|raw_id| match raw_id { + Value::Int(id) => Some(*id as i64), + _ => None, + }); + + Ok(WeatherEntry { + id, + event_id, + score, + expected_observations, + }) + } +} + +impl<'a> TryFrom<&Row<'a>> for WeatherEntry { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + Ok(WeatherEntry { + id: row + .get::(0) + .map(|val| Uuid::parse_str(&val))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + event_id: row + .get::(1) + .map(|val| Uuid::parse_str(&val))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + score: row + .get::>(2) + .map(|val| val.filter(|&val| val != 0))?, + expected_observations: vec![], + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WeatherChoicesWithEntry { + pub entry_id: Uuid, + // NOAA weather stations we're using + pub stations: String, + pub temp_high: Option, + pub temp_low: Option, + pub wind_speed: Option, +} + +impl<'a> TryFrom<&Row<'a>> for WeatherChoicesWithEntry { + type Error = duckdb::Error; + fn try_from(row: &Row<'a>) -> Result { + Ok(WeatherChoicesWithEntry { + entry_id: row + .get::(0) + .map(|val| Uuid::parse_str(&val))? + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + stations: row + .get::(1) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + temp_low: row + .get::>(2) + .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, + temp_high: row + .get::>(3) + .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, + wind_speed: row + .get::>(4) + .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(4, Type::Any, Box::new(e)))?, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub struct WeatherChoices { + // NOAA weather stations we're using + pub stations: String, + pub temp_high: Option, + pub temp_low: Option, + pub wind_speed: Option, +} + +impl From for WeatherChoices { + fn from(value: WeatherChoicesWithEntry) -> Self { + Self { + stations: value.stations, + temp_high: value.temp_high, + temp_low: value.temp_low, + wind_speed: value.wind_speed, + } + } +} + +impl<'a> TryFrom<&Row<'a>> for WeatherChoices { + type Error = duckdb::Error; + + fn try_from(row: &Row) -> Result { + Ok(WeatherChoices { + stations: row + .get::(0) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, Type::Any, Box::new(e)))?, + temp_low: row + .get::>(1) + .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(1, Type::Any, Box::new(e)))?, + temp_high: row + .get::>(2) + .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, + wind_speed: row + .get::>(3) + .map(|raw| raw.and_then(|inner| ValueOptions::try_from(inner).ok())) + .map_err(|e| duckdb::Error::FromSqlConversionFailure(3, Type::Any, Box::new(e)))?, + }) + } +} + +impl TryInto for &OrderedMap { + type Error = anyhow::Error; + fn try_into(self) -> Result { + debug!("raw weather choices: {:?}", self); + let values: Vec<&Value> = self.values().collect(); + let stations = values + .first() + .ok_or_else(|| anyhow!("stations not found in the map")) + .and_then(|raw_station| match raw_station { + Value::Text(station) => Ok(station.clone()), + _ => Err(anyhow!( + "error converting station id into string: {:?}", + raw_station + )), + })?; + let temp_low = values.get(1).and_then(|raw_temp| match raw_temp { + Value::Text(temp) => ValueOptions::try_from(temp.clone()).ok(), + _ => None, + }); + let temp_high = values.get(2).and_then(|raw_temp| match raw_temp { + Value::Text(temp) => ValueOptions::try_from(temp.clone()).ok(), + _ => None, + }); + let wind_speed = values + .get(3) + .and_then(|raw_wind_speed| match raw_wind_speed { + Value::Text(wind_speed) => ValueOptions::try_from(wind_speed.clone()).ok(), + _ => None, + }); + Ok(WeatherChoices { + stations, + temp_low, + temp_high, + wind_speed, + }) + } +} + +#[allow(clippy::from_over_into)] +impl Into for &WeatherChoices { + fn into(self) -> Value { + let temp_low = match self.temp_low.clone() { + Some(val) => Value::Text(val.to_string()), + None => Value::Null, + }; + let temp_high = match self.temp_high.clone() { + Some(val) => Value::Text(val.to_string()), + None => Value::Null, + }; + let wind_speed = match self.wind_speed.clone() { + Some(val) => Value::Text(val.to_string()), + None => Value::Null, + }; + let ordered_struct: OrderedMap = OrderedMap::from(vec![ + (String::from("stations"), Value::Text(self.stations.clone())), + (String::from("temp_low"), temp_low), + (String::from("temp_high"), temp_high), + (String::from("wind_speed"), wind_speed), + ]); + Value::Struct(ordered_struct) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)] +pub enum ValueOptions { + Over, + // Par is what was forecasted for this value + Par, + Under, +} + +impl std::fmt::Display for ValueOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Over => write!(f, "over"), + Self::Par => write!(f, "par"), + Self::Under => write!(f, "under"), + } + } +} + +impl TryFrom<&str> for ValueOptions { + type Error = anyhow::Error; + + fn try_from(s: &str) -> Result { + match s { + "over" => Ok(ValueOptions::Over), + "par" => Ok(ValueOptions::Par), + "under" => Ok(ValueOptions::Under), + val => Err(anyhow!("invalid option: {}", val)), + } + } +} + +impl TryFrom for ValueOptions { + type Error = anyhow::Error; + + fn try_from(s: String) -> Result { + match s.as_str() { + "over" => Ok(ValueOptions::Over), + "par" => Ok(ValueOptions::Par), + "under" => Ok(ValueOptions::Under), + val => Err(anyhow!("invalid option: {}", val)), + } + } +} diff --git a/oracle/src/file_access.rs b/oracle/src/file_access.rs index 2394d86..e87c5d4 100644 --- a/oracle/src/file_access.rs +++ b/oracle/src/file_access.rs @@ -7,13 +7,13 @@ use time::{ use tokio::fs; use utoipa::IntoParams; -use crate::{create_folder, subfolder_exists, utc_option_datetime}; +use crate::{create_folder, subfolder_exists}; #[derive(Clone, Deserialize, Serialize, IntoParams)] pub struct FileParams { - #[serde(with = "utc_option_datetime")] + #[serde(with = "time::serde::rfc3339::option")] pub start: Option, - #[serde(with = "utc_option_datetime")] + #[serde(with = "time::serde::rfc3339::option")] pub end: Option, pub observations: Option, pub forecasts: Option, diff --git a/oracle/src/lib.rs b/oracle/src/lib.rs index 2e567aa..c6d6678 100644 --- a/oracle/src/lib.rs +++ b/oracle/src/lib.rs @@ -3,7 +3,6 @@ mod db; mod file_access; pub mod oracle; pub mod routes; -mod ser; mod startup; mod utils; @@ -11,6 +10,5 @@ pub use app_error::AppError; pub use db::*; pub use file_access::{drop_suffix, Error, FileAccess, FileData, FileParams}; pub use routes::*; -pub use ser::*; pub use startup::*; pub use utils::*; diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index 6949816..975b656 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -1,7 +1,8 @@ use crate::{ - weather_data, ActiveEvent, AddEventEntry, CreateEvent, CreateEventData, Event, EventData, - EventFilter, EventStatus, EventSummary, Forecast, ForecastRequest, Observation, - ObservationRequest, SignEvent, ValueOptions, Weather, WeatherData, WeatherEntry, + validate, weather_data, ActiveEvent, AddEventEntry, AddEventEntryMessage, CreateEvent, + CreateEventData, CreateEventMessage, Event, EventData, EventFilter, EventStatus, EventSummary, + Forecast, ForecastRequest, Observation, ObservationRequest, SignEvent, ValueOptions, Weather, + WeatherData, WeatherEntry, }; use anyhow::anyhow; use base64::{engine::general_purpose, Engine}; @@ -30,8 +31,8 @@ use uuid::Uuid; pub enum Error { #[error("{0}")] NotFound(String), - #[error("Failed to get private key: {0}")] - PrivateKey(#[from] anyhow::Error), + #[error("Failed to get key: {0}")] + ValidateKey(#[from] anyhow::Error), #[error("Must have at least one outcome: {0}")] MinOutcome(String), #[error("Event maturity epoch must be in the future: {0}")] @@ -52,6 +53,8 @@ pub enum Error { WeatherData(#[from] weather_data::Error), #[error("Failed to find winning outcome: {0}")] OutcomeNotFound(String), + #[error("Failed to validate message: {0}")] + Validation(#[from] serde_json::Error), } pub struct Oracle { @@ -148,17 +151,17 @@ impl Oracle { } pub async fn create_event(&self, event: CreateEvent) -> Result { - let oracle_event = CreateEventData::new( - self, - event.id, - event.observation_date, - event.signing_date, - event.locations, - event.total_allowed_entries, - event.number_of_places_win, - event.number_of_values_per_entry, - ) - .map_err(Error::BadEvent)?; + if let Some(coordinator) = event.coordinator.clone() { + let message: CreateEventMessage = event.clone().into(); + info!("create event: {:?}", message); + validate( + message.message()?, + &coordinator.pubkey, + &coordinator.signature, + )?; + } + let oracle_event = + CreateEventData::new(self.raw_public_key(), event).map_err(Error::BadEvent)?; self.event_data .add_event(oracle_event) .await @@ -166,13 +169,20 @@ impl Oracle { } pub async fn add_event_entry(&self, entry: AddEventEntry) -> Result { - //TODO: use the builder pattern on WeatherEntry and add the validation there if entry.id.get_version_num() != 7 { return Err(Error::BadEntry(format!( "Client needs to provide a valid Uuidv7 for entry id {}", entry.id ))); } + if let Some(coordinator) = entry.coordinator.clone() { + let messages: AddEventEntryMessage = entry.clone().into(); + validate( + messages.message()?, + &coordinator.pubkey, + &coordinator.signature, + )?; + } let event = match self.event_data.get_event(&entry.event_id).await { Ok(event_data) => Ok(event_data), Err(duckdb::Error::QueryReturnedNoRows) => Err(Error::NotFound(format!( diff --git a/oracle/src/routes/stations/weather_routes.rs b/oracle/src/routes/stations/weather_routes.rs index 415add0..b6dd6aa 100644 --- a/oracle/src/routes/stations/weather_routes.rs +++ b/oracle/src/routes/stations/weather_routes.rs @@ -9,7 +9,7 @@ use serde::Serialize; use time::OffsetDateTime; use utoipa::IntoParams; -use crate::{utc_option_datetime, AppError, AppState, FileParams, Forecast, Observation, Station}; +use crate::{AppError, AppState, FileParams, Forecast, Observation, Station}; #[utoipa::path( get, @@ -36,10 +36,10 @@ pub async fn forecasts( #[derive(Clone, Serialize, Deserialize, IntoParams)] pub struct ForecastRequest { - #[serde(with = "utc_option_datetime")] + #[serde(with = "time::serde::rfc3339::option")] #[serde(default)] pub start: Option, - #[serde(with = "utc_option_datetime")] + #[serde(with = "time::serde::rfc3339::option")] #[serde(default)] pub end: Option, pub station_ids: String, @@ -67,10 +67,10 @@ impl From<&ForecastRequest> for FileParams { #[derive(Clone, Serialize, Deserialize, IntoParams)] pub struct ObservationRequest { - #[serde(with = "utc_option_datetime")] + #[serde(with = "time::serde::rfc3339::option")] #[serde(default)] pub start: Option, - #[serde(with = "utc_option_datetime")] + #[serde(with = "time::serde::rfc3339::option")] #[serde(default)] pub end: Option, pub station_ids: String, diff --git a/oracle/src/ser/mod.rs b/oracle/src/ser/mod.rs deleted file mode 100644 index 631def6..0000000 --- a/oracle/src/ser/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod utc_datetime; -pub mod utc_option_datetime; diff --git a/oracle/src/ser/utc_datetime.rs b/oracle/src/ser/utc_datetime.rs deleted file mode 100644 index 0c4203a..0000000 --- a/oracle/src/ser/utc_datetime.rs +++ /dev/null @@ -1,19 +0,0 @@ -use serde::{Deserialize, Deserializer, Serializer}; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; - -pub fn serialize(value: &OffsetDateTime, serializer: S) -> Result -where - S: Serializer, -{ - let s = value.format(&Rfc3339).map_err(serde::ser::Error::custom)?; - serializer.serialize_str(&s) -} - -#[allow(dead_code)] -pub fn deserialize<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s = String::deserialize(deserializer)?; - OffsetDateTime::parse(&s, &Rfc3339).map_err(serde::de::Error::custom) -} diff --git a/oracle/src/ser/utc_option_datetime.rs b/oracle/src/ser/utc_option_datetime.rs deleted file mode 100644 index ac73062..0000000 --- a/oracle/src/ser/utc_option_datetime.rs +++ /dev/null @@ -1,31 +0,0 @@ -use serde::{Deserialize, Deserializer, Serializer}; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; - -pub fn serialize(value: &Option, serializer: S) -> Result -where - S: Serializer, -{ - if let Some(datetime) = value { - let s = datetime - .format(&Rfc3339) - .map_err(serde::ser::Error::custom)?; - serializer.serialize_str(&s) - } else { - serializer.serialize_str("null") - } -} - -#[allow(dead_code)] -pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let s = String::deserialize(deserializer)?; - if s == "null" { - Ok(None) - } else { - OffsetDateTime::parse(&s, &Rfc3339) - .map(Some) - .map_err(serde::de::Error::custom) - } -} diff --git a/oracle/src/startup.rs b/oracle/src/startup.rs index da0a993..3830032 100644 --- a/oracle/src/startup.rs +++ b/oracle/src/startup.rs @@ -15,7 +15,10 @@ use axum::{ routing::{get, post}, Router, }; -use hyper::{Method, header::{ACCEPT, CONTENT_TYPE}}; +use hyper::{ + header::{ACCEPT, CONTENT_TYPE}, + Method, +}; use log::info; use std::sync::Arc; use tower_http::{ @@ -56,10 +59,10 @@ pub struct AppState { schemas( routes::files::get_names::Files, oracle::Error, - db::event_data::Event, - db::event_data::WeatherEntry, - db::event_data::AddEventEntry, - db::event_data::CreateEvent, + db::Event, + db::WeatherEntry, + db::AddEventEntry, + db::CreateEvent, routes::events::oracle_routes::Pubkey, routes::events::oracle_routes::Base64Pubkey ) @@ -104,7 +107,7 @@ pub fn app(app_state: AppState) -> Router { let cors = CorsLayer::new() // allow `GET` and `POST` when accessing the resource .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) - .allow_headers([ACCEPT,CONTENT_TYPE]) + .allow_headers([ACCEPT, CONTENT_TYPE]) // allow requests from any origin .allow_origin(Any); Router::new() diff --git a/oracle/tests/api/create_event.rs b/oracle/tests/api/create_event.rs index a76e291..f40e09a 100644 --- a/oracle/tests/api/create_event.rs +++ b/oracle/tests/api/create_event.rs @@ -31,6 +31,7 @@ async fn can_create_oracle_event() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let body_json = to_string(&new_event).unwrap(); let request = Request::builder() @@ -89,6 +90,7 @@ async fn can_create_and_get_oracle_event() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let body_json = to_string(&new_event).unwrap(); let request_post = Request::builder() diff --git a/oracle/tests/api/create_event_entry.rs b/oracle/tests/api/create_event_entry.rs index e8a5e2b..4f3ab55 100644 --- a/oracle/tests/api/create_event_entry.rs +++ b/oracle/tests/api/create_event_entry.rs @@ -29,6 +29,7 @@ async fn can_create_entry_into_event() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let oracle_event = test_app.oracle.create_event(new_event).await.unwrap(); let new_entry = AddEventEntry { @@ -54,6 +55,7 @@ async fn can_create_entry_into_event() { wind_speed: None, }, ], + coordinator: None, }; let body_json = to_string(&new_entry).unwrap(); let request = Request::builder() @@ -94,6 +96,7 @@ async fn can_create_and_get_event_entry() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let oracle_event = test_app.oracle.create_event(new_event).await.unwrap(); let new_entry = AddEventEntry { @@ -119,6 +122,7 @@ async fn can_create_and_get_event_entry() { wind_speed: None, }, ], + coordinator: None, }; let body_json = to_string(&new_entry).unwrap(); let request = Request::builder() diff --git a/oracle/tests/api/etl_workflow.rs b/oracle/tests/api/etl_workflow.rs index 6869f44..8010955 100644 --- a/oracle/tests/api/etl_workflow.rs +++ b/oracle/tests/api/etl_workflow.rs @@ -68,6 +68,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { total_allowed_entries: 4, number_of_places_win: 1, number_of_values_per_entry: 6, + coordinator: None, }; info!("above create event"); let event = test_app.oracle.create_event(new_event_1).await.unwrap(); @@ -94,6 +95,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { wind_speed: Some(oracle::ValueOptions::Par), }, ], + coordinator: None, }; let entry_2 = AddEventEntry { id: Uuid::now_v7(), @@ -118,6 +120,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { wind_speed: None, }, ], + coordinator: None, }; let entry_3 = AddEventEntry { id: Uuid::now_v7(), @@ -142,6 +145,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { wind_speed: Some(oracle::ValueOptions::Under), }, ], + coordinator: None, }; let entry_4 = AddEventEntry { id: Uuid::now_v7(), @@ -166,6 +170,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { wind_speed: Some(oracle::ValueOptions::Under), }, ], + coordinator: None, }; test_app .oracle @@ -314,6 +319,7 @@ async fn can_get_event_run_etl_and_see_it_signed_multiple_winners() { total_allowed_entries: 4, number_of_places_win: 2, number_of_values_per_entry: 6, + coordinator: None, }; info!("above create event"); let event = test_app.oracle.create_event(new_event_1).await.unwrap(); @@ -340,6 +346,7 @@ async fn can_get_event_run_etl_and_see_it_signed_multiple_winners() { wind_speed: Some(oracle::ValueOptions::Par), }, ], + coordinator: None, }; let entry_2 = AddEventEntry { id: Uuid::now_v7(), @@ -364,6 +371,7 @@ async fn can_get_event_run_etl_and_see_it_signed_multiple_winners() { wind_speed: None, }, ], + coordinator: None, }; let entry_3 = AddEventEntry { id: Uuid::now_v7(), @@ -388,6 +396,7 @@ async fn can_get_event_run_etl_and_see_it_signed_multiple_winners() { wind_speed: Some(oracle::ValueOptions::Under), }, ], + coordinator: None, }; let entry_4 = AddEventEntry { id: Uuid::now_v7(), @@ -412,6 +421,7 @@ async fn can_get_event_run_etl_and_see_it_signed_multiple_winners() { wind_speed: Some(oracle::ValueOptions::Under), }, ], + coordinator: None, }; test_app .oracle diff --git a/oracle/tests/api/get_events.rs b/oracle/tests/api/get_events.rs index 5142979..98230ea 100644 --- a/oracle/tests/api/get_events.rs +++ b/oracle/tests/api/get_events.rs @@ -30,6 +30,7 @@ async fn can_get_all_events() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let new_event_2 = CreateEvent { id: Uuid::now_v7(), @@ -44,6 +45,7 @@ async fn can_get_all_events() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let new_event_3 = CreateEvent { id: Uuid::now_v7(), @@ -58,6 +60,7 @@ async fn can_get_all_events() { total_allowed_entries: 100, number_of_places_win: 3, number_of_values_per_entry: 6, + coordinator: None, }; let expected = vec![ new_event_1.clone(),