Skip to content

Commit

Permalink
feat(hubble): record token source in database (#3528)
Browse files Browse the repository at this point in the history
  • Loading branch information
qlp authored Jan 18, 2025
2 parents 2ba0e29 + b7bc6af commit 58771c0
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 72 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

19 changes: 12 additions & 7 deletions hubble/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,56 +151,61 @@ pub async fn get_chain_ids_and_ids<'a, A: Acquire<'a, Database = Postgres>>(
#[allow(clippy::type_complexity)] // it's just kind of a mess
pub async fn insert_or_update_tokens<'a, A: Acquire<'a, Database = Postgres>>(
db: A,
tokens: &[(i64, String, String, i64, Option<String>, String)],
tokens: &[(i64, String, String, i64, Option<String>, String, String)],
) -> sqlx::Result<()> {
let mut conn = db.acquire().await?;

let (chain_ids, denoms, display_symbols, decimals, logo_uris, display_names): (
let (chain_ids, denoms, display_symbols, decimals, logo_uris, display_names, sources): (
Vec<i64>,
Vec<String>,
Vec<String>,
Vec<i64>,
Vec<Option<String>>,
Vec<String>,
Vec<String>,
) = tokens
.iter()
.map(
|(chain_id, denom, display_symbol, decimals, logo_uri, display_name)| {
|(chain_id, denom, display_symbol, decimals, logo_uri, display_name, source)| {
(
*chain_id,
denom.clone(),
display_symbol.clone(),
*decimals,
logo_uri.clone(),
display_name.clone(),
source.clone(),
)
},
)
.multiunzip();

sqlx::query!(
r#"
INSERT INTO hubble.assets (chain_id, denom, display_symbol, decimals, logo_uri, display_name, gas_token)
INSERT INTO hubble.assets (chain_id, denom, display_symbol, decimals, logo_uri, display_name, gas_token, source)
SELECT
unnest($1::bigint[]),
unnest($2::text[]),
unnest($3::text[]),
unnest($4::bigint[]),
unnest($5::text[]),
unnest($6::text[]),
false
false,
unnest($7::text[])
ON CONFLICT (chain_id, denom) DO UPDATE SET
display_symbol = EXCLUDED.display_symbol,
decimals = EXCLUDED.decimals,
logo_uri = EXCLUDED.logo_uri,
display_name = EXCLUDED.display_name
display_name = EXCLUDED.display_name,
source = EXCLUDED.source
"#,
&chain_ids,
&denoms,
&display_symbols,
&decimals,
&logo_uris as _,
&display_names
&display_names,
&sources,
)
.execute(&mut *conn)
.await?;
Expand Down
27 changes: 19 additions & 8 deletions hubble/src/token_list.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(clippy::type_complexity)]
use color_eyre::Result;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};
Expand All @@ -6,11 +7,11 @@ use crate::cli::TokensUrls;

pub async fn update_tokens(db: sqlx::PgPool, urls: TokensUrls) -> Result<()> {
info!("Starting token update process.");
if let Some(token_list) = get_tokens(urls).await? {
if let Some(fetched_token_list_vec) = get_tokens(urls).await? {
let mut token_inserts = Vec::new();

for token_value in token_list {
for token in token_value.tokens {
for fetched_token_list in fetched_token_list_vec {
for token in fetched_token_list.token_list.tokens {
let token_insert: TokenInsert = token.into();
token_inserts.push((
token_insert.chain_id,
Expand All @@ -19,6 +20,7 @@ pub async fn update_tokens(db: sqlx::PgPool, urls: TokensUrls) -> Result<()> {
token_insert.decimals,
token_insert.logo_uri.clone(),
token_insert.display_name.clone(),
fetched_token_list.source.clone(),
));
}
}
Expand All @@ -35,7 +37,7 @@ pub async fn update_tokens(db: sqlx::PgPool, urls: TokensUrls) -> Result<()> {
if !token_inserts.is_empty() {
let chain_ids_and_ids = crate::postgres::get_chain_ids_and_ids(&db).await?;
// Filter tokens based on valid chain_id
let filtered_tokens: Vec<(i64, String, String, i64, Option<String>, String)> =
let filtered_tokens: Vec<(i64, String, String, i64, Option<String>, String, String)> =
token_inserts
.iter()
.filter_map(|token| {
Expand All @@ -47,6 +49,7 @@ pub async fn update_tokens(db: sqlx::PgPool, urls: TokensUrls) -> Result<()> {
token.3,
token.4.clone(),
token.5.clone(),
token.6.clone(),
)
})
})
Expand All @@ -65,7 +68,12 @@ pub async fn update_tokens(db: sqlx::PgPool, urls: TokensUrls) -> Result<()> {
Ok(())
}

pub async fn get_tokens(urls: TokensUrls) -> Result<Option<Vec<TokenList>>> {
struct FetchedTokenList {
source: String,
token_list: TokenList,
}

async fn get_tokens(urls: TokensUrls) -> Result<Option<Vec<FetchedTokenList>>> {
let client = reqwest::Client::new();

let requests = urls.into_iter().map(|url| {
Expand All @@ -76,7 +84,10 @@ pub async fn get_tokens(urls: TokensUrls) -> Result<Option<Vec<TokenList>>> {

if val.get("statusCode").is_none() {
debug!("Token list successfully retrieved from: {}", url);
Ok(Some(serde_json::from_value(val).unwrap()))
Ok(Some(FetchedTokenList {
source: url,
token_list: serde_json::from_value(val).unwrap(),
}))
} else {
debug!("No valid token list found at: {}", url);
Ok(None)
Expand All @@ -85,7 +96,7 @@ pub async fn get_tokens(urls: TokensUrls) -> Result<Option<Vec<TokenList>>> {
});

// Execute all requests simultaneously and collect the results
let results: Vec<Result<Option<TokenList>>> = futures::future::join_all(requests).await;
let results: Vec<Result<Option<FetchedTokenList>>> = futures::future::join_all(requests).await;

let tokens = results.into_iter().flatten().flatten().collect::<Vec<_>>();

Expand All @@ -105,7 +116,7 @@ pub struct TokenList {
pub version: Version,
pub tokens: Vec<Token>,
#[serde(rename = "logoURI")]
pub logo_uri: String,
pub logo_uri: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down

0 comments on commit 58771c0

Please sign in to comment.