Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Flippening data #34

Merged
merged 14 commits into from
Jun 3, 2024
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ RUN cargo build --release --bin sync-beacon-states
RUN cargo build --release --bin sync-execution-blocks
RUN cargo build --release --bin sync-execution-supply-deltas
RUN cargo build --release --bin update-effective-balance-sum
RUN cargo build --release --bin update-flippening-data
RUN cargo build --release --bin update-issuance-breakdown
RUN cargo build --release --bin update-issuance-estimate
RUN cargo build --release --bin update-supply-projection-inputs
Expand All @@ -45,6 +46,7 @@ COPY --from=builder /app/target/release/sync-beacon-states /app
COPY --from=builder /app/target/release/sync-execution-blocks /app
COPY --from=builder /app/target/release/sync-execution-supply-deltas /app
COPY --from=builder /app/target/release/update-effective-balance-sum /app
COPY --from=builder /app/target/release/update-flippening-data /app
COPY --from=builder /app/target/release/update-issuance-breakdown /app
COPY --from=builder /app/target/release/update-issuance-estimate /app
COPY --from=builder /app/src/bin/update-supply-projection-inputs/in_contracts_by_day.json /app/src/bin/update-supply-projection-inputs/in_contracts_by_day.json
Expand Down
16 changes: 16 additions & 0 deletions src/bin/download-flippening-data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use eth_analysis::dune::get_flippening_data;
use std::fs::File;
use std::io::{BufWriter, Write};

const OUTPUT_FILE_RAW_DATA: &str = "raw_dune_values.json";

#[tokio::main]
pub async fn main() {
let raw_data = get_flippening_data().await.unwrap();

// Write to Json
let file = File::create(OUTPUT_FILE_RAW_DATA).unwrap();
let mut writer = BufWriter::new(file);
serde_json::to_writer(&mut writer, &raw_data).unwrap();
writer.flush().unwrap();
}
208 changes: 208 additions & 0 deletions src/bin/update-flippening-data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use std::io::{BufWriter, Write};
use std::{collections::HashMap, fs::File};

use chrono::{DateTime, NaiveDateTime, Utc};
use eth_analysis::{
beacon_chain::GweiInTime,
caching::{self, CacheKey},
db,
dune::get_flippening_data,
eth_supply, log, SupplyAtTime,
};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};

use sqlx::Decode;
use tracing::{info, warn};

lazy_static! {
static ref SUPPLY_LOWER_LIMIT_DATE_TIME: DateTime<Utc> =
("2015-07-30T00:00:00Z").parse::<DateTime<Utc>>().unwrap();
}

#[derive(Decode)]
pub struct GweiInTimeRow {
pub timestamp: DateTime<Utc>,
pub gwei: i64,
}

impl From<&GweiInTimeRow> for GweiInTime {
fn from(row: &GweiInTimeRow) -> Self {
Self {
t: row.timestamp.timestamp() as u64,
v: row.gwei,
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TimestampValuePoint {
pub t: u64,
// fraction
pub v: f64,
}

impl From<SupplyAtTime> for TimestampValuePoint {
fn from(supply_at_time: SupplyAtTime) -> Self {
TimestampValuePoint {
t: supply_at_time.timestamp.timestamp() as u64,
v: supply_at_time.supply.0,
}
}
}

fn forward_fill(data: &mut [FlippeningDatapointPartial]) {
let mut last_eth_price: Option<f64> = None;
let mut last_eth_supply: Option<f64> = None;
let mut last_btc_price: Option<f64> = None;
let mut last_bitcoin_supply: Option<f64> = None;

for entry in data.iter_mut() {
if entry.eth_price.is_none() {
entry.eth_price = last_eth_price;
} else {
last_eth_price = entry.eth_price;
}

if entry.eth_supply.is_none() {
entry.eth_supply = last_eth_supply;
} else {
last_eth_supply = entry.eth_supply;
}

if entry.btc_price.is_none() {
entry.btc_price = last_btc_price;
} else {
last_btc_price = entry.btc_price;
}

if entry.bitcoin_supply.is_none() {
entry.bitcoin_supply = last_bitcoin_supply;
} else {
last_bitcoin_supply = entry.bitcoin_supply;
}
}
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct FlippeningDatapoint {
pub t: u64,
pub marketcap_ratio: Option<f64>,
}

struct FlippeningDatapointPartial {
pub t: u64,
pub eth_price: Option<f64>,
pub btc_price: Option<f64>,
pub bitcoin_supply: Option<f64>,
pub eth_supply: Option<f64>,
}

const OUTPUT_FILE_RAW_DATA: &str = "raw_dune_values.json";
const PRESALE_ETH_BTC_RATIO: f64 = 1337.0;
#[tokio::main]
pub async fn main() {
log::init_with_env();

info!("updating flippening data");

let db_pool = db::get_db_pool("flippening-data", 3).await;

// sqlx::migrate!().run(&db_pool).await.unwrap();

let read_from_file = std::env::var("MOCK_DUNE_API").is_ok();
let raw_dune_data = if read_from_file {
warn!("Reading from file instead of DUNE API");
let data = std::fs::read_to_string(OUTPUT_FILE_RAW_DATA).expect("Unable to read file");
serde_json::from_str(&data).expect("JSON does not have correct format.")
} else {
get_flippening_data().await.unwrap()
};

let write_to_file = std::env::var("CACHE_DUNE_RESPONSE").is_ok();
if write_to_file {
// Write to json file
let file = File::create(OUTPUT_FILE_RAW_DATA).unwrap();
let mut writer = BufWriter::new(file);
serde_json::to_writer(&mut writer, &raw_dune_data).unwrap();
writer.flush().unwrap();
}

info!(
"got flippening data from dune, {} data points",
raw_dune_data.len()
);

let supply_by_day: Vec<TimestampValuePoint> = eth_supply::get_daily_supply(&db_pool)
.await
.into_iter()
.filter(|point| point.timestamp >= *SUPPLY_LOWER_LIMIT_DATE_TIME)
.map(Into::into)
.collect();

let first_supply_datapoint = &supply_by_day[0];
info!(
"got first supply data point at: {:}",
first_supply_datapoint.t
);

let eth_supply_map: HashMap<u64, f64> = supply_by_day
.iter()
.map(|point| (point.t, point.v))
.collect();
info!("got supply by day, {} data points", eth_supply_map.len());

let mut flippening_data: Vec<FlippeningDatapointPartial> = raw_dune_data
.into_iter()
.map(|row| {
let t = NaiveDateTime::parse_from_str(&row.time, "%Y-%m-%d %H:%M:%S%.f UTC")
.unwrap()
.timestamp() as u64;
let is_presale_period = t < first_supply_datapoint.t;
let eth_supply = if is_presale_period {
Some(first_supply_datapoint.v)
} else {
eth_supply_map.get(&t).copied()
};
let eth_price = if is_presale_period {
row.btc_price
.map(|btc_price| btc_price / PRESALE_ETH_BTC_RATIO)
} else {
row.eth_price
};
FlippeningDatapointPartial {
t,
eth_price,
btc_price: row.btc_price,
bitcoin_supply: row.bitcoin_supply,
eth_supply,
}
})
.collect();
flippening_data.sort_by_key(|row| row.t);
forward_fill(&mut flippening_data);

let flippening_data: Vec<FlippeningDatapoint> = flippening_data
.into_iter()
.map(|row| {
let eth_marketcap = row
.eth_price
.zip(row.eth_supply)
.map(|(price, supply)| price * supply);
let btc_marketcap = row
.btc_price
.zip(row.bitcoin_supply)
.map(|(price, supply)| price * supply);
let marketcap_ratio = eth_marketcap.zip(btc_marketcap).map(|(eth, btc)| eth / btc);
FlippeningDatapoint {
t: row.t,
marketcap_ratio,
}
})
.collect();

caching::update_and_publish(&db_pool, &CacheKey::FlippeningData, &flippening_data).await;

info!("done updating flippening data");
}
3 changes: 3 additions & 0 deletions src/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub enum CacheKey {
BurnSums,
EffectiveBalanceSum,
EthPrice,
FlippeningData,
GaugeRates,
SupplyParts,
IssuanceBreakdown,
Expand Down Expand Up @@ -78,6 +79,7 @@ impl CacheKey {
BurnSums => "burn-sums",
EffectiveBalanceSum => "effective-balance-sum",
EthPrice => "eth-price",
FlippeningData => "flippening-data",
GaugeRates => "gauge-rates",
IssuanceBreakdown => "issuance-breakdown",
IssuanceEstimate => "issuance-estimate",
Expand Down Expand Up @@ -122,6 +124,7 @@ impl FromStr for CacheKey {
"burn-sums" => Ok(Self::BurnSums),
"effective-balance-sum" => Ok(Self::EffectiveBalanceSum),
"eth-price" => Ok(Self::EthPrice),
"flippening-data" => Ok(Self::FlippeningData),
"gauge-rates" => Ok(Self::GaugeRates),
"issuance-breakdown" => Ok(Self::IssuanceBreakdown),
"issuance-estimate" => Ok(Self::IssuanceEstimate),
Expand Down
26 changes: 23 additions & 3 deletions src/dune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use format_url::FormatUrl;
use serde::Deserialize;
use serde::Serialize;

const DUNE_ETH_IN_CONTRACTS_QUERY_URL: &str = "https://api.dune.com/api/v1/query/3686915/results";
const DUNE_ETH_IN_CONTRACTS_QUERY_URL: &str = "https://api.dune.com/api/v1/query/3751774/results";
const DUNE_FLIPPENING_DATA_QUERY_URL: &str = "https://api.dune.com/api/v1/query/3758140/results";

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DuneResponse<Row> {
Expand All @@ -32,6 +33,14 @@ pub struct EthInContractsRow {
pub cumulative_sum: f64,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct FlippeningDataRow {
pub time: String,
pub eth_price: Option<f64>,
pub btc_price: Option<f64>,
pub bitcoin_supply: Option<f64>,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Metadata {
pub column_names: Vec<String>,
Expand All @@ -46,11 +55,22 @@ pub struct Metadata {
}

pub async fn get_eth_in_contracts() -> Result<Vec<EthInContractsRow>> {
get_dune_data(DUNE_ETH_IN_CONTRACTS_QUERY_URL).await
}

pub async fn get_flippening_data() -> Result<Vec<FlippeningDataRow>> {
get_dune_data(DUNE_FLIPPENING_DATA_QUERY_URL).await
}

async fn get_dune_data<Row>(url: &str) -> Result<Vec<Row>>
where
Row: for<'a> Deserialize<'a>,
{
let dune_api_key = ENV_CONFIG
.dune_api_key
.as_ref()
.expect("expect DUNE_API_KEY in env in order to fetch eth in smart contracts");
let url = FormatUrl::new(DUNE_ETH_IN_CONTRACTS_QUERY_URL).format_url();
let url = FormatUrl::new(url).format_url();

let client = reqwest::Client::new();
Ok(client
Expand All @@ -59,7 +79,7 @@ pub async fn get_eth_in_contracts() -> Result<Vec<EthInContractsRow>> {
.send()
.await?
.error_for_status()?
.json::<DuneResponse<EthInContractsRow>>()
.json::<DuneResponse<Row>>()
.await
.map(|body| body.result.rows)?)
}
6 changes: 3 additions & 3 deletions src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::{PgExecutor, PgPool};
use tracing::debug;
use tracing::trace;

pub async fn get_value(executor: impl PgExecutor<'_>, key: &str) -> Option<Value> {
debug!(key = key, "getting key value pair");
trace!(key = key, "getting key value pair");

sqlx::query!(
"
Expand All @@ -21,7 +21,7 @@ pub async fn get_value(executor: impl PgExecutor<'_>, key: &str) -> Option<Value
}

pub async fn set_value(executor: impl PgExecutor<'_>, key: &str, value: &Value) {
debug!("storing key: {}", &key,);
trace!("storing key: {}", &key,);

sqlx::query!(
"
Expand Down
Loading
Loading