diff --git a/Dockerfile b/Dockerfile index debba9d..4638e55 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,6 +27,7 @@ RUN cargo build --release --bin sync-beacon-states RUN cargo build --release --bin sync-execution-blocks RUN cargo build --release --bin sync-execution-supply-deltas RUN cargo build --release --bin update-effective-balance-sum +RUN cargo build --release --bin update-flippening-data RUN cargo build --release --bin update-issuance-breakdown RUN cargo build --release --bin update-issuance-estimate RUN cargo build --release --bin update-supply-projection-inputs @@ -45,6 +46,7 @@ COPY --from=builder /app/target/release/sync-beacon-states /app COPY --from=builder /app/target/release/sync-execution-blocks /app COPY --from=builder /app/target/release/sync-execution-supply-deltas /app COPY --from=builder /app/target/release/update-effective-balance-sum /app +COPY --from=builder /app/target/release/update-flippening-data /app COPY --from=builder /app/target/release/update-issuance-breakdown /app COPY --from=builder /app/target/release/update-issuance-estimate /app COPY --from=builder /app/src/bin/update-supply-projection-inputs/in_contracts_by_day.json /app/src/bin/update-supply-projection-inputs/in_contracts_by_day.json diff --git a/src/bin/download-flippening-data.rs b/src/bin/download-flippening-data.rs new file mode 100644 index 0000000..d601610 --- /dev/null +++ b/src/bin/download-flippening-data.rs @@ -0,0 +1,16 @@ +use eth_analysis::dune::get_flippening_data; +use std::fs::File; +use std::io::{BufWriter, Write}; + +const OUTPUT_FILE_RAW_DATA: &str = "raw_dune_values.json"; + +#[tokio::main] +pub async fn main() { + let raw_data = get_flippening_data().await.unwrap(); + + // Write to Json + let file = File::create(OUTPUT_FILE_RAW_DATA).unwrap(); + let mut writer = BufWriter::new(file); + serde_json::to_writer(&mut writer, &raw_data).unwrap(); + writer.flush().unwrap(); +} diff --git a/src/bin/update-flippening-data.rs b/src/bin/update-flippening-data.rs new file mode 100644 index 0000000..c9be1a1 --- /dev/null +++ b/src/bin/update-flippening-data.rs @@ -0,0 +1,208 @@ +use std::io::{BufWriter, Write}; +use std::{collections::HashMap, fs::File}; + +use chrono::{DateTime, NaiveDateTime, Utc}; +use eth_analysis::{ + beacon_chain::GweiInTime, + caching::{self, CacheKey}, + db, + dune::get_flippening_data, + eth_supply, log, SupplyAtTime, +}; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; + +use sqlx::Decode; +use tracing::{info, warn}; + +lazy_static! { + static ref SUPPLY_LOWER_LIMIT_DATE_TIME: DateTime = + ("2015-07-30T00:00:00Z").parse::>().unwrap(); +} + +#[derive(Decode)] +pub struct GweiInTimeRow { + pub timestamp: DateTime, + pub gwei: i64, +} + +impl From<&GweiInTimeRow> for GweiInTime { + fn from(row: &GweiInTimeRow) -> Self { + Self { + t: row.timestamp.timestamp() as u64, + v: row.gwei, + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct TimestampValuePoint { + pub t: u64, + // fraction + pub v: f64, +} + +impl From for TimestampValuePoint { + fn from(supply_at_time: SupplyAtTime) -> Self { + TimestampValuePoint { + t: supply_at_time.timestamp.timestamp() as u64, + v: supply_at_time.supply.0, + } + } +} + +fn forward_fill(data: &mut [FlippeningDatapointPartial]) { + let mut last_eth_price: Option = None; + let mut last_eth_supply: Option = None; + let mut last_btc_price: Option = None; + let mut last_bitcoin_supply: Option = None; + + for entry in data.iter_mut() { + if entry.eth_price.is_none() { + entry.eth_price = last_eth_price; + } else { + last_eth_price = entry.eth_price; + } + + if entry.eth_supply.is_none() { + entry.eth_supply = last_eth_supply; + } else { + last_eth_supply = entry.eth_supply; + } + + if entry.btc_price.is_none() { + entry.btc_price = last_btc_price; + } else { + last_btc_price = entry.btc_price; + } + + if entry.bitcoin_supply.is_none() { + entry.bitcoin_supply = last_bitcoin_supply; + } else { + last_bitcoin_supply = entry.bitcoin_supply; + } + } +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct FlippeningDatapoint { + pub t: u64, + pub marketcap_ratio: Option, +} + +struct FlippeningDatapointPartial { + pub t: u64, + pub eth_price: Option, + pub btc_price: Option, + pub bitcoin_supply: Option, + pub eth_supply: Option, +} + +const OUTPUT_FILE_RAW_DATA: &str = "raw_dune_values.json"; +const PRESALE_ETH_BTC_RATIO: f64 = 1337.0; +#[tokio::main] +pub async fn main() { + log::init_with_env(); + + info!("updating flippening data"); + + let db_pool = db::get_db_pool("flippening-data", 3).await; + + // sqlx::migrate!().run(&db_pool).await.unwrap(); + + let read_from_file = std::env::var("MOCK_DUNE_API").is_ok(); + let raw_dune_data = if read_from_file { + warn!("Reading from file instead of DUNE API"); + let data = std::fs::read_to_string(OUTPUT_FILE_RAW_DATA).expect("Unable to read file"); + serde_json::from_str(&data).expect("JSON does not have correct format.") + } else { + get_flippening_data().await.unwrap() + }; + + let write_to_file = std::env::var("CACHE_DUNE_RESPONSE").is_ok(); + if write_to_file { + // Write to json file + let file = File::create(OUTPUT_FILE_RAW_DATA).unwrap(); + let mut writer = BufWriter::new(file); + serde_json::to_writer(&mut writer, &raw_dune_data).unwrap(); + writer.flush().unwrap(); + } + + info!( + "got flippening data from dune, {} data points", + raw_dune_data.len() + ); + + let supply_by_day: Vec = eth_supply::get_daily_supply(&db_pool) + .await + .into_iter() + .filter(|point| point.timestamp >= *SUPPLY_LOWER_LIMIT_DATE_TIME) + .map(Into::into) + .collect(); + + let first_supply_datapoint = &supply_by_day[0]; + info!( + "got first supply data point at: {:}", + first_supply_datapoint.t + ); + + let eth_supply_map: HashMap = supply_by_day + .iter() + .map(|point| (point.t, point.v)) + .collect(); + info!("got supply by day, {} data points", eth_supply_map.len()); + + let mut flippening_data: Vec = raw_dune_data + .into_iter() + .map(|row| { + let t = NaiveDateTime::parse_from_str(&row.time, "%Y-%m-%d %H:%M:%S%.f UTC") + .unwrap() + .timestamp() as u64; + let is_presale_period = t < first_supply_datapoint.t; + let eth_supply = if is_presale_period { + Some(first_supply_datapoint.v) + } else { + eth_supply_map.get(&t).copied() + }; + let eth_price = if is_presale_period { + row.btc_price + .map(|btc_price| btc_price / PRESALE_ETH_BTC_RATIO) + } else { + row.eth_price + }; + FlippeningDatapointPartial { + t, + eth_price, + btc_price: row.btc_price, + bitcoin_supply: row.bitcoin_supply, + eth_supply, + } + }) + .collect(); + flippening_data.sort_by_key(|row| row.t); + forward_fill(&mut flippening_data); + + let flippening_data: Vec = flippening_data + .into_iter() + .map(|row| { + let eth_marketcap = row + .eth_price + .zip(row.eth_supply) + .map(|(price, supply)| price * supply); + let btc_marketcap = row + .btc_price + .zip(row.bitcoin_supply) + .map(|(price, supply)| price * supply); + let marketcap_ratio = eth_marketcap.zip(btc_marketcap).map(|(eth, btc)| eth / btc); + FlippeningDatapoint { + t: row.t, + marketcap_ratio, + } + }) + .collect(); + + caching::update_and_publish(&db_pool, &CacheKey::FlippeningData, &flippening_data).await; + + info!("done updating flippening data"); +} diff --git a/src/caching.rs b/src/caching.rs index 033f8b6..a745eec 100644 --- a/src/caching.rs +++ b/src/caching.rs @@ -28,6 +28,7 @@ pub enum CacheKey { BurnSums, EffectiveBalanceSum, EthPrice, + FlippeningData, GaugeRates, SupplyParts, IssuanceBreakdown, @@ -78,6 +79,7 @@ impl CacheKey { BurnSums => "burn-sums", EffectiveBalanceSum => "effective-balance-sum", EthPrice => "eth-price", + FlippeningData => "flippening-data", GaugeRates => "gauge-rates", IssuanceBreakdown => "issuance-breakdown", IssuanceEstimate => "issuance-estimate", @@ -122,6 +124,7 @@ impl FromStr for CacheKey { "burn-sums" => Ok(Self::BurnSums), "effective-balance-sum" => Ok(Self::EffectiveBalanceSum), "eth-price" => Ok(Self::EthPrice), + "flippening-data" => Ok(Self::FlippeningData), "gauge-rates" => Ok(Self::GaugeRates), "issuance-breakdown" => Ok(Self::IssuanceBreakdown), "issuance-estimate" => Ok(Self::IssuanceEstimate), diff --git a/src/dune.rs b/src/dune.rs index dda5d87..81fd6ff 100644 --- a/src/dune.rs +++ b/src/dune.rs @@ -5,7 +5,8 @@ use format_url::FormatUrl; use serde::Deserialize; use serde::Serialize; -const DUNE_ETH_IN_CONTRACTS_QUERY_URL: &str = "https://api.dune.com/api/v1/query/3686915/results"; +const DUNE_ETH_IN_CONTRACTS_QUERY_URL: &str = "https://api.dune.com/api/v1/query/3751774/results"; +const DUNE_FLIPPENING_DATA_QUERY_URL: &str = "https://api.dune.com/api/v1/query/3758140/results"; #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct DuneResponse { @@ -32,6 +33,14 @@ pub struct EthInContractsRow { pub cumulative_sum: f64, } +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct FlippeningDataRow { + pub time: String, + pub eth_price: Option, + pub btc_price: Option, + pub bitcoin_supply: Option, +} + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Metadata { pub column_names: Vec, @@ -46,11 +55,22 @@ pub struct Metadata { } pub async fn get_eth_in_contracts() -> Result> { + get_dune_data(DUNE_ETH_IN_CONTRACTS_QUERY_URL).await +} + +pub async fn get_flippening_data() -> Result> { + get_dune_data(DUNE_FLIPPENING_DATA_QUERY_URL).await +} + +async fn get_dune_data(url: &str) -> Result> +where + Row: for<'a> Deserialize<'a>, +{ let dune_api_key = ENV_CONFIG .dune_api_key .as_ref() .expect("expect DUNE_API_KEY in env in order to fetch eth in smart contracts"); - let url = FormatUrl::new(DUNE_ETH_IN_CONTRACTS_QUERY_URL).format_url(); + let url = FormatUrl::new(url).format_url(); let client = reqwest::Client::new(); Ok(client @@ -59,7 +79,7 @@ pub async fn get_eth_in_contracts() -> Result> { .send() .await? .error_for_status()? - .json::>() + .json::>() .await .map(|body| body.result.rows)?) } diff --git a/src/key_value_store.rs b/src/key_value_store.rs index 24a8ab7..c05ba49 100644 --- a/src/key_value_store.rs +++ b/src/key_value_store.rs @@ -2,10 +2,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::{PgExecutor, PgPool}; -use tracing::debug; +use tracing::trace; pub async fn get_value(executor: impl PgExecutor<'_>, key: &str) -> Option { - debug!(key = key, "getting key value pair"); + trace!(key = key, "getting key value pair"); sqlx::query!( " @@ -21,7 +21,7 @@ pub async fn get_value(executor: impl PgExecutor<'_>, key: &str) -> Option, key: &str, value: &Value) { - debug!("storing key: {}", &key,); + trace!("storing key: {}", &key,); sqlx::query!( " diff --git a/src/serve/caching.rs b/src/serve/caching.rs index 28a6d9d..438179b 100644 --- a/src/serve/caching.rs +++ b/src/serve/caching.rs @@ -15,7 +15,7 @@ use std::{ sync::{Arc, RwLock}, }; use tokio::task::JoinHandle; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use crate::{ caching::{self, CacheKey, ParseCacheKeyError}, @@ -28,19 +28,41 @@ use super::{State, StateExtension}; #[derive(Debug)] pub struct Cache(RwLock>); +impl Default for Cache { + fn default() -> Self { + Self::new() + } +} + impl Cache { - pub async fn new(key_value_store: &impl KeyValueStore) -> Self { - let map = RwLock::new(HashMap::new()); + pub fn new() -> Self { + Self(RwLock::new(HashMap::new())) + } + + async fn load_from_db(&self, key_value_store: &impl KeyValueStore) { + info!("loading cache from DB"); - // Tries to fetch a value from the key value store for every cached analysis value. for key in all::().collect::>() { let value = caching::get_serialized_caching_value(key_value_store, &key).await; - if let Some(value) = value { - map.write().unwrap().insert(key, value); + match value { + Some(value) => { + let length = serde_json::to_string(&value) + .expect("expect db cache value to be convertable to string") + .len(); + debug!(%key, %length, "loaded from DB"); + self.0.write().unwrap().insert(key, value); + } + None => { + warn!(%key, "no value found in DB"); + } } } + } - Self(map) + pub async fn new_with_data(key_value_store: &impl KeyValueStore) -> Self { + let cache = Self::new(); + cache.load_from_db(key_value_store).await; + cache } } diff --git a/src/serve/mod.rs b/src/serve/mod.rs index a8ce679..9b83ff5 100644 --- a/src/serve/mod.rs +++ b/src/serve/mod.rs @@ -52,7 +52,7 @@ pub async fn start_server() { debug!("warming cache"); - let cache = Cache::new(&key_value_store).await; + let cache = Cache::new_with_data(&key_value_store).await; info!("cache ready"); @@ -148,6 +148,12 @@ pub async fn start_server() { cached_get(state, &CacheKey::SupplyParts).await }), ) + .route( + "/api/v2/fees/flippening-data", + get(|state: StateExtension| async move { + cached_get(state, &CacheKey::FlippeningData).await + }), + ) .route( "/api/v2/fees/issuance-estimate", get(|state: StateExtension| async move {