diff --git a/Cargo.lock b/Cargo.lock index 085ecf4..9b6031a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1076,7 +1076,7 @@ dependencies = [ [[package]] name = "daemon" -version = "1.2.0" +version = "1.3.0" dependencies = [ "anyhow", "arrow", @@ -2306,7 +2306,7 @@ dependencies = [ [[package]] name = "oracle" -version = "1.2.0" +version = "1.3.0" dependencies = [ "anyhow", "axum", diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 01fb375..02e169a 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daemon" -version = "1.2.0" +version = "1.3.0" edition = "2021" repository = "https://github.com/tee8z/noaa-data-pipeline" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 64cdef8..5105a9c 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "oracle" -version = "1.2.0" +version = "1.3.0" edition = "2021" repository = "https://github.com/tee8z/noaa-data-pipeline" diff --git a/oracle/src/db/event_data.rs b/oracle/src/db/event_data.rs index 99d4435..165e2eb 100644 --- a/oracle/src/db/event_data.rs +++ b/oracle/src/db/event_data.rs @@ -235,7 +235,7 @@ impl EventData { let observation_date = OffsetDateTime::format(event.observation_date, &Rfc3339) .map_err(|e| duckdb::Error::ToSqlConversionFailure(Box::new(e)))?; let nonce = to_vec(&event.nonce).unwrap(); - let annoucement_bytes = to_vec(&event.event_annoucement).unwrap(); + let announcement_bytes = to_vec(&event.event_announcement).unwrap(); let conn = self.new_write_connection_retry().await?; let mut stmt = conn.prepare( "INSERT INTO events ( @@ -247,7 +247,7 @@ impl EventData { signing_date, observation_date, locations, - event_annoucement, + event_announcement, coordinator_pubkey) VALUES(?,?,?,?,?,?,?,?,?,?)", )?; stmt.execute(params![ @@ -259,7 +259,7 @@ impl EventData { signing_date, observation_date, locations_sql, - annoucement_bytes, + announcement_bytes, event.coordinator_pubkey ])?; @@ -679,7 +679,7 @@ impl EventData { "id", "signing_date::TEXT", "observation_date::TEXT", - "event_annoucement", + "event_announcement", "locations", "total_allowed_entries", "number_of_places_win", @@ -764,7 +764,7 @@ impl EventData { "number_of_values_per_entry", "attestation_signature", "nonce", - "event_annoucement", + "event_announcement", )) .from("events") .where_(where_clause); diff --git a/oracle/src/db/event_db_migrations.rs b/oracle/src/db/event_db_migrations.rs index 494c4b2..82d61b9 100644 --- a/oracle/src/db/event_db_migrations.rs +++ b/oracle/src/db/event_db_migrations.rs @@ -57,7 +57,7 @@ pub fn create_initial_schema(conn: &mut Connection) -> Result<(), duckdb::Error> created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), nonce BLOB NOT NULL, - event_annoucement BLOB NOT NULL, + event_announcement BLOB NOT NULL, locations TEXT[] NOT NULL, coordinator_pubkey TEXT, attestation_signature BLOB diff --git a/oracle/src/db/mod.rs b/oracle/src/db/mod.rs index ba5f7ce..bf9e39f 100644 --- a/oracle/src/db/mod.rs +++ b/oracle/src/db/mod.rs @@ -111,7 +111,7 @@ pub struct CreateEventData { /// Used to sign the result of the event being watched pub nonce: Scalar, /// Used in constructing the dlctix transactions - pub event_annoucement: EventAnnouncement, + pub event_announcement: EventAnnouncement, /// The pubkey of the coordinator pub coordinator_pubkey: Option, } @@ -131,8 +131,9 @@ impl CreateEventData { event.observation_date.format(&Rfc3339).unwrap() )); } + let allowed_scored_ranks = 3; let possible_user_outcomes: Vec> = - generate_winner_permutations(event.total_allowed_entries); + generate_ranking_permutations(event.total_allowed_entries, allowed_scored_ranks); info!("user outcomes: {:?}", possible_user_outcomes); let outcome_messages: Vec> = generate_outcome_messages(possible_user_outcomes); @@ -140,14 +141,14 @@ impl CreateEventData { let mut rng = rand::thread_rng(); let nonce = Scalar::random(&mut rng); let nonce_point = nonce.base_point_mul(); - // Manually set expiry to 7 days after the signature should have been proveded so users can get their funds back + // Manually set expiry to 7 days after the signature should have been provided so users can get their funds back let expiry = event .signing_date .saturating_add(Duration::DAY * 7) .unix_timestamp() as u32; - // The actual accounement the oracle is going to attest the outcome - let event_annoucement = EventAnnouncement { + // The actual announcement the oracle is going to attest the outcome + let event_announcement = EventAnnouncement { oracle_pubkey: oracle_pubkey.into(), nonce_point, outcome_messages, @@ -163,7 +164,7 @@ impl CreateEventData { number_of_places_win: 1_i64, // Default to 1 winning score to simplify possible outcomes number_of_values_per_entry: event.number_of_values_per_entry as i64, locations: event.clone().locations, - event_annoucement, + event_announcement, coordinator_pubkey: event .coordinator .map(|v| Some(v.pubkey)) @@ -202,7 +203,7 @@ impl From for Event { total_allowed_entries: value.total_allowed_entries, number_of_places_win: value.number_of_places_win, number_of_values_per_entry: value.number_of_values_per_entry, - event_annoucement: value.event_annoucement, + event_announcement: value.event_announcement, nonce: value.nonce, status: EventStatus::default(), entry_ids: vec![], @@ -238,7 +239,7 @@ pub struct SignEvent { pub observation_date: OffsetDateTime, pub status: EventStatus, pub nonce: Scalar, - pub event_annoucement: EventAnnouncement, + pub event_announcement: EventAnnouncement, pub number_of_places_win: i64, pub number_of_values_per_entry: i64, pub attestation: Option, @@ -299,7 +300,7 @@ impl<'a> TryFrom<&Row<'a>> for SignEvent { serde_json::from_slice(&blob) })? .map_err(|e| duckdb::Error::FromSqlConversionFailure(6, Type::Any, Box::new(e)))?, - event_annoucement: row + event_announcement: row .get::(7) .map(|raw| { let blob = match raw { @@ -614,8 +615,8 @@ pub struct Event { pub weather: Vec, /// Nonce the oracle committed to use as part of signing final results pub nonce: Scalar, - /// Holds the predefined outcomes the oracle will attest to at event complet - pub event_annoucement: EventAnnouncement, + /// Holds the predefined outcomes the oracle will attest to at event complete + pub event_announcement: EventAnnouncement, /// When added it means the oracle has signed that the current data is the final result pub attestation: Option, } @@ -658,7 +659,7 @@ impl<'a> TryFrom<&Row<'a>> for Event { .map(|val| OffsetDateTime::parse(&val, &sql_time_format))? .map(|val| val.to_offset(UtcOffset::UTC)) .map_err(|e| duckdb::Error::FromSqlConversionFailure(2, Type::Any, Box::new(e)))?, - event_annoucement: row + event_announcement: row .get::(3) .map(|raw| { let blob = match raw { diff --git a/oracle/src/db/outcome_generator.rs b/oracle/src/db/outcome_generator.rs index 6c290fe..ae1e74b 100644 --- a/oracle/src/db/outcome_generator.rs +++ b/oracle/src/db/outcome_generator.rs @@ -1,16 +1,9 @@ use itertools::Itertools; -use rayon::prelude::*; - -pub fn generate_winner_permutations(num_players: usize) -> Vec> { - (0..=num_players) - .into_par_iter() - .flat_map(|r| { - (0..num_players) - .combinations(r) - .map(|v| v.into_iter().collect::>()) - .collect::>() - }) - .collect() + +/// We are assuming the scoring mechanism does not allow for ties and every user has a unique score +/// (most likely using time as an element of the scoring) +pub fn generate_ranking_permutations(num_players: usize, rankings: usize) -> Vec> { + (0..num_players).permutations(rankings).collect() } pub fn generate_outcome_messages(possible_user_outcomes: Vec>) -> Vec> { @@ -28,27 +21,48 @@ pub fn generate_outcome_messages(possible_user_outcomes: Vec>) -> Vec #[cfg(test)] mod test { - use super::generate_winner_permutations; + use super::generate_ranking_permutations; #[test] - fn can_generate_list_of_winners_small() { + fn can_generate_list_of_winners_n5() { let num_players = 5; - let permutations: Vec> = generate_winner_permutations(num_players); - println!("permutations: {:?}", permutations); - assert_eq!(permutations.len(), 32); + let permutations: Vec> = generate_ranking_permutations(num_players, 3); + assert_eq!(permutations.len(), 60); } #[test] - fn can_generate_list_of_winners_default_size() { + fn can_generate_list_of_winners_n20() { let num_players = 20; - let permutations: Vec> = generate_winner_permutations(num_players); - assert_eq!(permutations.len(), 1_048_576); + let permutations: Vec> = generate_ranking_permutations(num_players, 3); + assert_eq!(permutations.len(), 6_840); } #[test] - fn can_generate_list_of_winners_large() { + fn can_generate_list_of_winners_n25() { let num_players = 25; - let permutations: Vec> = generate_winner_permutations(num_players); - assert_eq!(permutations.len(), 33_554_432); + let permutations: Vec> = generate_ranking_permutations(num_players, 3); + assert_eq!(permutations.len(), 13_800); + } + + #[test] + fn can_generate_list_of_winners_n100() { + let num_players = 100; + let permutations: Vec> = generate_ranking_permutations(num_players, 3); + assert_eq!(permutations.len(), 970_200); + } + + #[test] + fn can_generate_list_of_winners_n200() { + let num_players = 200; + let permutations: Vec> = generate_ranking_permutations(num_players, 3); + assert_eq!(permutations.len(), 7_880_400); + } + + #[test] + //note: beyond 500 players the time to create the permutations is over 60 seconds + fn can_generate_list_of_winners_n400() { + let num_players = 400; + let permutations: Vec> = generate_ranking_permutations(num_players, 3); + assert_eq!(permutations.len(), 63_520_800); } } diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index bac449e..dbe3042 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -410,8 +410,10 @@ impl Oracle { continue; } - // Score logic, match on Par 2pts, on Over 1pt, on Under 1pt - let mut total_score = 0; + // Score logic, match on Par 2pts, on Over 1pt, on Under 1pt, created_at used as tie breaker (older > newer) + let mut base_score = 0; + const OVER_OR_UNDER_POINTS: u64 = 10; + const PAR_POINTS: u64 = 20; let expected_observations = entry.expected_observations.clone(); let locations = event.locations.clone(); for location in locations { @@ -442,17 +444,17 @@ impl Oracle { match high_temp { ValueOptions::Over => { if forecast.temp_high < observation.temp_high.round() as i64 { - total_score += 1; + base_score += OVER_OR_UNDER_POINTS; } } ValueOptions::Par => { if forecast.temp_high == observation.temp_high.round() as i64 { - total_score += 2; + base_score += PAR_POINTS; } } ValueOptions::Under => { if forecast.temp_high > observation.temp_high.round() as i64 { - total_score += 1; + base_score += OVER_OR_UNDER_POINTS; } } } @@ -462,17 +464,17 @@ impl Oracle { match temp_low { ValueOptions::Over => { if forecast.temp_low < observation.temp_low.round() as i64 { - total_score += 1; + base_score += OVER_OR_UNDER_POINTS; } } ValueOptions::Par => { if forecast.temp_low == observation.temp_low.round() as i64 { - total_score += 2; + base_score += PAR_POINTS; } } ValueOptions::Under => { if forecast.temp_low > observation.temp_low.round() as i64 { - total_score += 1; + base_score += OVER_OR_UNDER_POINTS; } } } @@ -482,22 +484,45 @@ impl Oracle { match wind_speed { ValueOptions::Over => { if forecast.wind_speed < observation.wind_speed { - total_score += 1; + base_score += OVER_OR_UNDER_POINTS; } } ValueOptions::Par => { if forecast.wind_speed == observation.wind_speed { - total_score += 2; + base_score += PAR_POINTS; } } ValueOptions::Under => { if forecast.wind_speed > observation.wind_speed { - total_score += 1; + base_score += OVER_OR_UNDER_POINTS; } } } } } + let (created_at_secs, created_at_nano) = entry + .id + .get_timestamp() + .expect("UUIDv7 should have timestamp") + .to_unix(); + let time_millis = (created_at_secs * 1000) + (created_at_nano as u64 / 1_000_000); + let time_part = 9999 - (time_millis % 10000) as u64; + + /* By adding the time element we are able to make competitions that have 1mil unique possible scores + meaning no ties under the following constraints: + + With queue for entries (serialized creation): + - Up to 10,000 entries over 24h: negligible collision risk + - Max burst: ~40 entries/second with millisecond precision + + Without queue for entries (concurrent creation): + - Up to 1,300 entries over 24h: negligible collision risk + - Burst limit: ~30 entries/second for < 0.01% collision risk + + This is important for keeping the amount of possible outcomes for the DLC as low as possible + but able to scale to as many entries as possible + */ + let total_score = ((base_score * 10000) + time_part) as i64; info!( "updating entry {} for event {} to score {} in etl process {}", @@ -520,51 +545,61 @@ impl Oracle { let mut events: Vec = self.event_data.get_events_to_sign(event_ids).await?; info!("events: {:?}", events); for event in events.iter_mut() { - let mut entries = self.event_data.get_event_weather_entries(&event.id).await?; - let mut entry_indexies = entries.clone(); + let entries = self.event_data.get_event_weather_entries(&event.id).await?; + let mut entry_indices = entries.clone(); // very important, the sort index of the entry should always be the same when getting the outcome - entry_indexies.sort_by_key(|entry| entry.id); + entry_indices.sort_by_key(|entry| entry.id); - entries.sort_by_key(|entry| cmp::Reverse(entry.score)); - // we have set the number_of_places_win to 1 always (ranking can't be done in large groups efficiently at the moment) - let winning_score: i64 = entries[1].score.unwrap_or_default(); // default means '0' was winning score; + // Sort by score descending for top 3 + let mut top_entries = entries.clone(); + top_entries.sort_by_key(|entry| cmp::Reverse(entry.score)); + top_entries.truncate(3); - let winners: Vec = get_winners(entry_indexies.clone(), winning_score); + // Get indices of top 3 in original entry_indices order + let winners: Vec = top_entries + .iter() + .map(|top_entry| { + entry_indices + .iter() + .position(|entry| entry.id == top_entry.id) + .expect("Entry should exist") + }) + .collect(); let winner_bytes: Vec = get_winning_bytes(winners.clone()); if event.signing_date < OffsetDateTime::now_utc() { info!( "outcome_messages: {:?}", - event.event_annoucement.outcome_messages + event.event_announcement.outcome_messages ); info!("winner_bytes: {:?}", winner_bytes); let outcome_index = event - .event_annoucement + .event_announcement .outcome_messages .iter() .position(|outcome| *outcome == winner_bytes); - let winning_entries = winners + + let winners_str = winners .iter() - .filter_map(|entry_index| entry_indexies.get(*entry_index)) - .map(|entry| entry.id.to_string()) + .filter_map(|entry_index| entry_indices.get(*entry_index)) + .map(|entry| format!("({}, {})", entry.score.unwrap_or_default(), entry.id)) .collect::>() - .join(","); - let winners_str = format!("({}, {})", winning_score, winning_entries); + .join(", "); let Some(index) = outcome_index else { // Something went horribly wrong, use the info from this log line to track refunding users based on DLC expiry (we set to 1 week) - error!("final result doesn't match any of the possible outcomes: event_id {} winners {} expiry {:?}", event.id, winners_str, event.event_annoucement.expiry); + error!("final result doesn't match any of the possible outcomes: event_id {} winners {} expiry {:?}", event.id, winners_str, event.event_announcement.expiry); return Err(Error::OutcomeNotFound(format!( "event_id {} outcome winners {} expiry {:?}", - event.id, winners_str, event.event_annoucement.expiry + event.id, winners_str, event.event_announcement.expiry ))); }; info!("winners: event_id {} winners {}", event.id, winners_str); - event.attestation = event.event_annoucement.attestation_secret( + event.attestation = event.event_announcement.attestation_secret( index, self.private_key, event.nonce, @@ -612,25 +647,10 @@ impl Oracle { } } -pub fn get_winners(entry_indexies: Vec, winning_score: i64) -> Vec { - let mut winners: Vec = Vec::new(); - let entry_indexies_iter = entry_indexies.iter(); - for (entry_index, entry) in entry_indexies_iter.enumerate() { - let Some(score) = entry.score else { continue }; - if winning_score == score { - winners.push(entry_index); - } - } - - winners.sort(); - - winners -} - pub fn get_winning_bytes(winners: Vec) -> Vec { winners - .into_iter() - .flat_map(|num| num.to_be_bytes()) + .iter() + .flat_map(|&idx| idx.to_be_bytes()) .collect::>() } diff --git a/oracle/tests/api/create_event.rs b/oracle/tests/api/create_event.rs index f5dab42..09a9cdd 100644 --- a/oracle/tests/api/create_event.rs +++ b/oracle/tests/api/create_event.rs @@ -63,7 +63,7 @@ async fn can_create_oracle_event() { assert!(res.nonce.serialize().len() > 0); assert!(res.attestation.is_none()); assert!(res - .event_annoucement + .event_announcement .is_valid_outcome(&Outcome::Attestation(1))); } @@ -151,6 +151,6 @@ async fn can_create_and_get_oracle_event() { assert!(res.nonce.serialize().len() > 0); assert!(res.attestation.is_none()); assert!(res - .event_annoucement + .event_announcement .is_valid_outcome(&Outcome::Attestation(1))); } diff --git a/oracle/tests/api/etl_workflow.rs b/oracle/tests/api/etl_workflow.rs index 573a8fc..10248eb 100644 --- a/oracle/tests/api/etl_workflow.rs +++ b/oracle/tests/api/etl_workflow.rs @@ -15,7 +15,25 @@ use serde_json::from_slice; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::time::sleep; use tower::ServiceExt; -use uuid::Uuid; +use uuid::{ClockSequence, Timestamp, Uuid}; + +fn get_uuid_from_timestamp(timestamp_str: &str) -> Uuid { + struct Context; + impl ClockSequence for Context { + type Output = u16; + fn generate_sequence(&self, _ts_secs: u64, _ts_nanos: u32) -> u16 { + 0 + } + } + + let dt = OffsetDateTime::parse( + timestamp_str, + &time::format_description::well_known::Rfc3339, + ) + .expect("Valid RFC3339 timestamp"); + let ts = Timestamp::from_unix(Context, dt.unix_timestamp() as u64, dt.nanosecond()); + Uuid::new_v7(ts) +} #[tokio::test] async fn can_handle_no_events() { @@ -73,8 +91,9 @@ async fn can_get_event_run_etl_and_see_it_signed() { info!("above create event"); let event = test_app.oracle.create_event(new_event_1).await.unwrap(); + let entry_1 = AddEventEntry { - id: Uuid::now_v7(), + id: get_uuid_from_timestamp("2024-08-11T00:00:00.10Z"), event_id: event.id, expected_observations: vec![ WeatherChoices { @@ -99,7 +118,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { coordinator: None, }; let entry_2 = AddEventEntry { - id: Uuid::now_v7(), + id: get_uuid_from_timestamp("2024-08-11T00:00:00.20Z"), event_id: event.id, expected_observations: vec![ WeatherChoices { @@ -124,7 +143,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { coordinator: None, }; let entry_3 = AddEventEntry { - id: Uuid::now_v7(), + id: get_uuid_from_timestamp("2024-08-11T00:00:00.30Z"), event_id: event.id, expected_observations: vec![ WeatherChoices { @@ -149,7 +168,7 @@ async fn can_get_event_run_etl_and_see_it_signed() { coordinator: None, }; let entry_4 = AddEventEntry { - id: Uuid::now_v7(), + id: get_uuid_from_timestamp("2024-08-11T00:00:00.40Z"), event_id: event.id, expected_observations: vec![ WeatherChoices { @@ -257,53 +276,60 @@ async fn can_get_event_run_etl_and_see_it_signed() { let mut entries_scores_order = res.entries.clone(); entries_scores_order.sort_by_key(|entry| cmp::Reverse(entry.score)); info!("entries: {:?}", entries_scores_order); + // Make sure the expected entries won and calculated the correct score for each - // We expect a tie between entry_1 and entry_3 with 4 pts let entry_1_res = entries_scores_order .iter() .find(|entry| entry.id == entry_1.id) .unwrap(); - assert_eq!(entry_1_res.score.unwrap(), 4); + assert_eq!(entry_1_res.score.unwrap(), 409899); let entry_2_res = entries_scores_order .iter() .find(|entry| entry.id == entry_2.id) .unwrap(); - assert_eq!(entry_2_res.score.unwrap(), 3); + assert_eq!(entry_2_res.score.unwrap(), 309799); let entry_3_res = entries_scores_order .iter() .find(|entry| entry.id == entry_3.id) .unwrap(); - assert_eq!(entry_3_res.score.unwrap(), 4); + assert_eq!(entry_3_res.score.unwrap(), 409699); let entry_4_res = entries_scores_order .iter() .find(|entry| entry.id == entry_4.id) .unwrap(); - assert_eq!(entry_4_res.score.unwrap(), 1); + assert_eq!(entry_4_res.score.unwrap(), 109599); + + let mut entry_outcome_order = res.entries.clone(); + entry_outcome_order.sort_by_key(|entry| entry.id); - let entry_outcome_order = res.entries.clone(); - entries_scores_order.sort_by_key(|entry| entry.id); - let entry_3_index = entry_outcome_order + let first_place_index = entry_outcome_order + .iter() + .position(|entry| entry.id == entry_1.id) + .unwrap(); + + let second_place_index = entry_outcome_order .iter() .position(|entry| entry.id == entry_3.id) .unwrap(); - let entry_4_index = entry_outcome_order + + let third_place_index = entry_outcome_order .iter() - .position(|entry| entry.id == entry_4.id) + .position(|entry| entry.id == entry_2.id) .unwrap(); - let mut winners = vec![entry_3_index, entry_4_index]; - winners.sort(); + let winners = vec![first_place_index, second_place_index, third_place_index]; + let winning_bytes = get_winning_bytes(winners); println!("winning_bytes in test: {:?}", winning_bytes); let outcome_index = event - .event_annoucement + .event_announcement .outcome_messages .iter() .position(|outcome| *outcome == winning_bytes) .unwrap(); - let attested_outcome = res.event_annoucement.attestation_secret( + let attested_outcome = res.event_announcement.attestation_secret( outcome_index, test_app.oracle.raw_private_key(), res.nonce,