Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no op metadata json requests based on slot #171

Open
wants to merge 7 commits into
base: grpc-ingest
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
backon::{ExponentialBuilder, Retryable},
clap::Parser,
digital_asset_types::dao::asset_data,
digital_asset_types::dao::{asset_data, sea_orm_active_enums::LastRequestedStatusCode},
futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt},
indicatif::HumanDuration,
log::{debug, error},
Expand All @@ -17,9 +17,9 @@ use {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DownloadMetadataInfo {
asset_data_id: Vec<u8>,
uri: String,
slot: i64,
pub asset_data_id: Vec<u8>,
pub uri: String,
pub slot: i64,
}

impl DownloadMetadataInfo {
Expand Down Expand Up @@ -90,9 +90,8 @@ impl MetadataJsonDownloadWorkerArgs {
}

let pool = pool.clone();
let client = client.clone();

handlers.push(spawn_task(client, pool, download_metadata_info));
handlers.push(spawn_task(client.clone(), pool, download_metadata_info));
}

while handlers.next().await.is_some() {}
Expand Down Expand Up @@ -169,10 +168,10 @@ async fn fetch_metadata_json(
let response = client.get(url.clone()).send().await?;

match response.error_for_status() {
Ok(res) => res
Ok(res) => Ok(res
.json::<serde_json::Value>()
.await
.map_err(|source| FetchMetadataJsonError::Parse { source, url }),
.map_err(|source| FetchMetadataJsonError::Parse { source, url })?),
Err(source) => {
let status = source
.status()
Expand Down Expand Up @@ -206,22 +205,38 @@ pub async fn perform_metadata_json_task(
pool: sqlx::PgPool,
download_metadata_info: &DownloadMetadataInfo,
) -> Result<asset_data::Model, MetadataJsonTaskError> {
match fetch_metadata_json(client, &download_metadata_info.uri).await {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let start = Instant::now();
let fetch_res = fetch_metadata_json(client, &download_metadata_info.uri).await;
let time_elapsed = start.elapsed().as_millis() as u64;
match fetch_res {
Ok(metadata) => {
let active_model = asset_data::ActiveModel {
id: Set(download_metadata_info.asset_data_id.clone()),
metadata: Set(metadata),
reindex: Set(Some(false)),
last_requested_status_code: Set(Some(LastRequestedStatusCode::Success)),
fetch_duration_in_ms: Set(Some(time_elapsed)),
..Default::default()
};

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let model = active_model.update(&conn).await?;

Ok(model)
}
Err(e) => Err(MetadataJsonTaskError::Fetch(e)),
Err(e) => {
let active_model = asset_data::ActiveModel {
id: Set(download_metadata_info.asset_data_id.clone()),
reindex: Set(Some(true)),
last_requested_status_code: Set(Some(LastRequestedStatusCode::Failure)),
fetch_duration_in_ms: Set(Some(time_elapsed)),
..Default::default()
};

active_model.update(&conn).await?;

Err(MetadataJsonTaskError::Fetch(e))
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions digital_asset_types/src/dao/generated/asset_data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3

use super::sea_orm_active_enums::ChainMutability;
use super::sea_orm_active_enums::LastRequestedStatusCode;
use super::sea_orm_active_enums::Mutability;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
Expand All @@ -27,6 +28,8 @@ pub struct Model {
pub raw_name: Option<Vec<u8>>,
pub raw_symbol: Option<Vec<u8>>,
pub base_info_seq: Option<i64>,
pub fetch_duration_in_ms: Option<u64>,
pub last_requested_status_code: Option<LastRequestedStatusCode>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
Expand All @@ -42,6 +45,8 @@ pub enum Column {
RawName,
RawSymbol,
BaseInfoSeq,
FetchDurationInMs,
LastRequestedStatusCode,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
Expand Down Expand Up @@ -74,6 +79,8 @@ impl ColumnTrait for Column {
Self::RawName => ColumnType::Binary.def().null(),
Self::RawSymbol => ColumnType::Binary.def().null(),
Self::BaseInfoSeq => ColumnType::BigInteger.def().null(),
Self::FetchDurationInMs => ColumnType::Unsigned.def().null(),
Self::LastRequestedStatusCode => LastRequestedStatusCode::db_type().null(),
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,16 @@ pub enum Instruction {
#[sea_orm(string_value = "verify_creator")]
VerifyCreator,
}

#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(
rs_type = "String",
db_type = "Enum",
enum_name = "last_requested_status_code"
)]
pub enum LastRequestedStatusCode {
#[sea_orm(string_value = "success")]
Success,
#[sea_orm(string_value = "failure")]
Failure,
}
Nagaprasadvr marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions digital_asset_types/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub fn create_asset_data(
raw_name: Some(metadata.name.into_bytes().to_vec().clone()),
raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()),
base_info_seq: Some(0),
fetch_duration_in_ms: None,
last_requested_status_code: None,
},
)
}
Expand Down
2 changes: 2 additions & 0 deletions digital_asset_types/tests/json_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content {
raw_name: Some(String::from("Handalf").into_bytes().to_vec()),
raw_symbol: Some(String::from("").into_bytes().to_vec()),
base_info_seq: Some(0),
fetch_duration_in_ms: None,
last_requested_status_code: None,
};

v1_content_from_json(&asset_data).unwrap()
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod m20240319_120101_add_mpl_core_enum_vals;
mod m20240320_120101_add_mpl_core_info_items;
mod m20240520_120101_add_mpl_core_external_plugins_columns;
mod m20240718_161232_change_supply_columns_to_numeric;
mod m20241104_093312_add_metadata_json_fetch_heuristics_columns;

pub mod model;

Expand Down Expand Up @@ -95,6 +96,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240320_120101_add_mpl_core_info_items::Migration),
Box::new(m20240520_120101_add_mpl_core_external_plugins_columns::Migration),
Box::new(m20240718_161232_change_supply_columns_to_numeric::Migration),
Box::new(m20241104_093312_add_metadata_json_fetch_heuristics_columns::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use enum_iterator::{all, Sequence};
use extension::postgres::Type;
use sea_orm_migration::prelude::*;

use crate::model::table::AssetData;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(AssetData::Table)
.add_column(
ColumnDef::new(AssetData::FetchDurationInMs)
.integer()
.null(),
)
.to_owned(),
)
.await?;

manager
.create_type(
Type::create()
.as_enum(AssetData::LastRequestedStatusCode)
.values(vec![
LastRequestedStatusCode::Success,
LastRequestedStatusCode::Failure,
])
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(AssetData::Table)
.add_column(
ColumnDef::new(AssetData::LastRequestedStatusCode)
.enumeration(
AssetData::LastRequestedStatusCode,
all::<LastRequestedStatusCode>().collect::<Vec<_>>(),
)
.not_null(),
)
.to_owned(),
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(AssetData::Table)
.drop_column(AssetData::FetchDurationInMs)
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(AssetData::Table)
.drop_column(AssetData::LastRequestedStatusCode)
.to_owned(),
)
.await?;

manager
.drop_type(
Type::drop()
.name(AssetData::LastRequestedStatusCode)
.to_owned(),
)
.await?;

Ok(())
}
}

#[derive(Iden, Sequence)]
pub enum LastRequestedStatusCode {
Success,
Failure,
}
2 changes: 2 additions & 0 deletions migration/src/model/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub enum AssetData {
RawName,
RawSymbol,
BaseInfoSeq,
FetchDurationInMs,
LastRequestedStatusCode,
}

#[derive(Copy, Clone, Iden)]
Expand Down
1 change: 1 addition & 0 deletions program_transformers/src/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ where
raw_name: ActiveValue::Set(Some(raw_name)),
raw_symbol: ActiveValue::Set(Some(raw_symbol)),
base_info_seq: ActiveValue::Set(Some(seq)),
..Default::default()
};

let mut query = asset_data::Entity::insert(model)
Expand Down
8 changes: 7 additions & 1 deletion program_transformers/src/bubblegum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
error::{ProgramTransformerError, ProgramTransformerResult},
DownloadMetadataNotifier,
skip_metadata_json_download, DownloadMetadataNotifier,
},
blockbuster::{
instruction::InstructionBundle,
Expand Down Expand Up @@ -72,6 +72,9 @@ where
}
InstructionName::MintV1 | InstructionName::MintToCollectionV1 => {
if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? {
if skip_metadata_json_download(&info, txn).await {
return Ok(());
}
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Expand Down Expand Up @@ -99,6 +102,9 @@ where
if let Some(info) =
update_metadata::update_metadata(parsing_result, bundle, txn, ix_str).await?
{
if skip_metadata_json_download(&info, txn).await {
return Ok(());
}
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Expand Down
25 changes: 25 additions & 0 deletions program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
},
},
das_core::{DownloadMetadataInfo, DownloadMetadataNotifier},
digital_asset_types::dao::asset_data,
sea_orm::{
entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr,
SqlxPostgresConnector, TransactionTrait,
Expand Down Expand Up @@ -254,3 +255,27 @@ fn record_metric(metric_name: &str, success: bool, retries: u32) {
cadence_macros::statsd_count!(metric_name, 1, "success" => success, "retry_count" => retry_count);
}
}

pub async fn skip_metadata_json_download<T>(
download_metadata_info: &DownloadMetadataInfo,
conn: &T,
) -> bool
where
T: ConnectionTrait + TransactionTrait,
{
let DownloadMetadataInfo {
asset_data_id,
slot: incoming_slot,
uri,
} = download_metadata_info;

asset_data::Entity::find_by_id(asset_data_id.clone())
.one(conn)
.await
.unwrap_or(None)
.is_some_and(|model| {
!model.reindex.unwrap_or(false)
&& model.metadata_url.eq(uri)
&& model.slot_updated >= *incoming_slot
})
}
Nagaprasadvr marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion program_transformers/src/mpl_core_program/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
error::{ProgramTransformerError, ProgramTransformerResult},
mpl_core_program::v1_asset::{burn_v1_asset, save_v1_asset},
AccountInfo, DownloadMetadataNotifier,
skip_metadata_json_download, AccountInfo, DownloadMetadataNotifier,
},
blockbuster::programs::mpl_core_program::{MplCoreAccountData, MplCoreAccountState},
sea_orm::DatabaseConnection,
Expand Down Expand Up @@ -30,6 +30,9 @@ pub async fn handle_mpl_core_account<'a, 'b, 'c>(
)
.await?
{
if skip_metadata_json_download(&info, db).await {
return Ok(());
}
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Nagaprasadvr marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 1 addition & 0 deletions program_transformers/src/mpl_core_program/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
raw_name: ActiveValue::Set(Some(name.to_vec())),
raw_symbol: ActiveValue::Set(None),
base_info_seq: ActiveValue::Set(Some(0)),
..Default::default()
};

let mut query = asset_data::Entity::insert(asset_data_model)
Expand Down
4 changes: 4 additions & 0 deletions program_transformers/src/token_metadata/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::{
error::{ProgramTransformerError, ProgramTransformerResult},
skip_metadata_json_download,
token_metadata::{
master_edition::{save_v1_master_edition, save_v2_master_edition},
v1_asset::{burn_v1_asset, save_v1_asset},
Expand Down Expand Up @@ -33,6 +34,9 @@ pub async fn handle_token_metadata_account<'a, 'b>(
}
TokenMetadataAccountData::MetadataV1(m) => {
if let Some(info) = save_v1_asset(db, m, account_info.slot).await? {
if skip_metadata_json_download(&info, db).await {
return Ok(());
}
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Expand Down
Loading
Loading