Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no op metadata json requests based on slot #171

Open
wants to merge 12 commits into
base: triton-build
Choose a base branch
from
37 changes: 28 additions & 9 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use {
backon::{ExponentialBuilder, Retryable},
clap::Parser,
digital_asset_types::dao::asset_data,
digital_asset_types::dao::asset_data::{self},
futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt},
indicatif::HumanDuration,
log::{debug, error},
reqwest::{Client, Url as ReqwestUrl},
sea_orm::{entity::*, SqlxPostgresConnector},
sea_orm::{entity::*, ConnectionTrait, SqlxPostgresConnector, TransactionTrait},
serde::{Deserialize, Serialize},
std::{sync::Arc, time::Duration},
tokio::{
Expand Down Expand Up @@ -119,10 +119,9 @@ impl MetadataJsonDownloadWorkerArgs {
}

let pool = pool.clone();
let client = client.clone();

handlers.push(spawn_task(
client,
client.clone(),
pool,
download_metadata_info,
Arc::clone(&download_config),
Expand Down Expand Up @@ -207,10 +206,10 @@ async fn fetch_metadata_json(
let response = client.get(url.clone()).send().await?;

match response.error_for_status() {
Ok(res) => res
Ok(res) => Ok(res
.json::<serde_json::Value>()
.await
.map_err(|source| FetchMetadataJsonError::Parse { source, url }),
.map_err(|source| FetchMetadataJsonError::Parse { source, url })?),
Err(source) => {
let status = source
.status()
Expand Down Expand Up @@ -250,6 +249,7 @@ pub async fn perform_metadata_json_task(
download_metadata_info: &DownloadMetadataInfo,
config: Arc<DownloadMetadataJsonRetryConfig>,
) -> Result<(), MetadataJsonTaskError> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
match fetch_metadata_json(client, &download_metadata_info.uri, config).await {
Ok(metadata) => {
let active_model = asset_data::ActiveModel {
Expand All @@ -259,13 +259,21 @@ pub async fn perform_metadata_json_task(
..Default::default()
};

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

active_model.update(&conn).await?;

Ok(())
}
Err(e) => Err(MetadataJsonTaskError::Fetch(e)),
Err(e) => {
let active_model = asset_data::ActiveModel {
id: Set(download_metadata_info.asset_data_id.clone()),
reindex: Set(Some(true)),
..Default::default()
};

active_model.update(&conn).await?;

Err(MetadataJsonTaskError::Fetch(e))
}
}
}

Expand Down Expand Up @@ -293,3 +301,14 @@ impl DownloadMetadata {
.await
}
}

pub async fn skip_metadata_json_download<T>(asset_data_id: &[u8], uri: &str, conn: &T) -> bool
where
T: ConnectionTrait + TransactionTrait,
{
asset_data::Entity::find_by_id(asset_data_id.to_vec())
.one(conn)
.await
.unwrap_or(None)
.is_some_and(|model| model.metadata_url.eq(uri))
}
64 changes: 37 additions & 27 deletions program_transformers/src/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::error::{ProgramTransformerError, ProgramTransformerResult},
das_core::DownloadMetadataInfo,
das_core::{skip_metadata_json_download, DownloadMetadataInfo},
digital_asset_types::dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, cl_items,
sea_orm_active_enums::{
Expand Down Expand Up @@ -386,35 +386,45 @@ pub async fn upsert_asset_data<T>(
where
T: ConnectionTrait + TransactionTrait,
{
let model = asset_data::ActiveModel {
let skip_metadata_json_download = skip_metadata_json_download(&id, &metadata_url, txn).await;

let mut model = asset_data::ActiveModel {
id: ActiveValue::Set(id.clone()),
chain_data_mutability: ActiveValue::Set(chain_data_mutability),
chain_data: ActiveValue::Set(chain_data),
metadata_url: ActiveValue::Set(metadata_url.clone()),
metadata_mutability: ActiveValue::Set(metadata_mutability),
metadata: ActiveValue::Set(JsonValue::String("processing".to_string())),
slot_updated: ActiveValue::Set(slot_updated),
reindex: ActiveValue::Set(Some(true)),
raw_name: ActiveValue::Set(Some(raw_name)),
raw_symbol: ActiveValue::Set(Some(raw_symbol)),
base_info_seq: ActiveValue::Set(Some(seq)),
..Default::default()
};

let mut columns_to_update = vec![
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataMutability,
asset_data::Column::SlotUpdated,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
];
if !skip_metadata_json_download {
model.metadata_url = ActiveValue::Set(metadata_url.clone());
model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string()));
model.reindex = ActiveValue::Set(Some(true));

columns_to_update.extend_from_slice(&[
asset_data::Column::MetadataUrl,
asset_data::Column::Metadata,
asset_data::Column::Reindex,
]);
}

let mut query = asset_data::Entity::insert(model)
.on_conflict(
OnConflict::columns([asset_data::Column::Id])
.update_columns([
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataUrl,
asset_data::Column::MetadataMutability,
asset_data::Column::Metadata,
asset_data::Column::SlotUpdated,
asset_data::Column::Reindex,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
])
.update_columns(columns_to_update)
.to_owned(),
)
.build(DbBackend::Postgres);
Expand All @@ -428,20 +438,20 @@ where
query.sql
);

let result = txn
.execute(query)
txn.execute(query)
.await
.map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?;

if result.rows_affected() > 0 {
Ok(Some(DownloadMetadataInfo::new(
id,
metadata_url,
slot_updated,
)))
} else {
Ok(None)
// If the metadata JSON already exists, skip the download.
if skip_metadata_json_download {
return Ok(None);
}

Ok(Some(DownloadMetadataInfo::new(
id,
metadata_url,
slot_updated,
)))
}

#[allow(clippy::too_many_arguments)]
Expand Down
48 changes: 33 additions & 15 deletions program_transformers/src/mpl_core_program/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
mpl_core::types::{Plugin, PluginAuthority, PluginType, UpdateAuthority},
programs::mpl_core_program::MplCoreAccountData,
},
das_core::skip_metadata_json_download,
digital_asset_types::{
dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping,
Expand Down Expand Up @@ -193,33 +194,45 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
_ => ChainMutability::Mutable,
};

let asset_data_model = asset_data::ActiveModel {
let skip_metadata_json_download = skip_metadata_json_download(&id_vec, &uri, &txn).await;

let mut asset_data_model = asset_data::ActiveModel {
chain_data_mutability: ActiveValue::Set(chain_mutability),
chain_data: ActiveValue::Set(chain_data_json),
metadata_url: ActiveValue::Set(uri.clone()),
metadata: ActiveValue::Set(JsonValue::String("processing".to_string())),
metadata_mutability: ActiveValue::Set(Mutability::Mutable),
slot_updated: ActiveValue::Set(slot_i),
reindex: ActiveValue::Set(Some(true)),
id: ActiveValue::Set(id_vec.clone()),
raw_name: ActiveValue::Set(Some(name.to_vec())),
raw_symbol: ActiveValue::Set(None),
base_info_seq: ActiveValue::Set(Some(0)),
..Default::default()
};

let mut columns_to_update = vec![
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataMutability,
asset_data::Column::SlotUpdated,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
];

if !skip_metadata_json_download {
asset_data_model.metadata_url = ActiveValue::Set(uri.clone());
asset_data_model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string()));
asset_data_model.reindex = ActiveValue::Set(Some(true));

columns_to_update.extend_from_slice(&[
asset_data::Column::MetadataUrl,
asset_data::Column::Metadata,
asset_data::Column::Reindex,
]);
}
asset_data::Entity::insert(asset_data_model)
.on_conflict(
OnConflict::columns([asset_data::Column::Id])
.update_columns([
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataUrl,
asset_data::Column::MetadataMutability,
asset_data::Column::SlotUpdated,
asset_data::Column::Reindex,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
])
.update_columns(columns_to_update)
.action_cond_where(
Condition::all()
.add(
Expand Down Expand Up @@ -811,6 +824,11 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
return Ok(None);
}

// If the metadata JSON exists, skip downloading it.
if skip_metadata_json_download {
return Ok(None);
}

// Otherwise return with info for background downloading.
Ok(Some(DownloadMetadataInfo::new(id_vec.clone(), uri, slot_i)))
}
Expand Down
51 changes: 36 additions & 15 deletions program_transformers/src/token_metadata/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
accounts::{MasterEdition, Metadata},
types::TokenStandard,
},
das_core::skip_metadata_json_download,
digital_asset_types::{
dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping,
Expand Down Expand Up @@ -116,19 +117,44 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
true => ChainMutability::Mutable,
false => ChainMutability::Immutable,
};
let asset_data_model = asset_data::ActiveModel {

let skip_metadata_json_download =
skip_metadata_json_download(&mint_pubkey_vec, &uri, conn).await;

let mut asset_data_model = asset_data::ActiveModel {
chain_data_mutability: ActiveValue::Set(chain_mutability),
chain_data: ActiveValue::Set(chain_data_json),
metadata_url: ActiveValue::Set(uri.clone()),
metadata: ActiveValue::Set(JsonValue::String("processing".to_string())),
metadata_mutability: ActiveValue::Set(Mutability::Mutable),
slot_updated: ActiveValue::Set(slot_i),
reindex: ActiveValue::Set(Some(true)),
id: ActiveValue::Set(mint_pubkey_vec.clone()),
raw_name: ActiveValue::Set(Some(name.to_vec())),
raw_symbol: ActiveValue::Set(Some(symbol.to_vec())),
base_info_seq: ActiveValue::Set(Some(0)),
..Default::default()
};

let mut columns_to_update = vec![
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataMutability,
asset_data::Column::SlotUpdated,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
];

if !skip_metadata_json_download {
asset_data_model.reindex = ActiveValue::Set(Some(true));
asset_data_model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string()));
asset_data_model.metadata_url = ActiveValue::Set(uri.clone());

columns_to_update.extend_from_slice(&[
asset_data::Column::Reindex,
asset_data::Column::Metadata,
asset_data::Column::MetadataUrl,
]);
}

let txn = conn.begin().await?;

let set_lock_timeout = "SET LOCAL lock_timeout = '1s';";
Expand All @@ -144,17 +170,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
asset_data::Entity::insert(asset_data_model)
.on_conflict(
OnConflict::columns([asset_data::Column::Id])
.update_columns([
asset_data::Column::ChainDataMutability,
asset_data::Column::ChainData,
asset_data::Column::MetadataUrl,
asset_data::Column::MetadataMutability,
asset_data::Column::SlotUpdated,
asset_data::Column::Reindex,
asset_data::Column::RawName,
asset_data::Column::RawSymbol,
asset_data::Column::BaseInfoSeq,
])
.update_columns(columns_to_update)
.action_cond_where(
Condition::all()
.add(
Expand Down Expand Up @@ -496,6 +512,11 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
return Ok(None);
}

// If the metadata JSON exists, skip downloading it.
if skip_metadata_json_download {
return Ok(None);
}

Ok(Some(DownloadMetadataInfo::new(
mint_pubkey_vec,
uri,
Expand Down