diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index 70ddcfcea..a7f5a9279 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -23,6 +23,9 @@ where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { + // Note: We do not check whether the asset has been decompressed here becuase we know if it + // was burned then it could not have been decompressed later. + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index 1b8f5842a..ba6dd3073 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -1,7 +1,7 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; @@ -31,6 +31,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -50,7 +56,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index e6825774a..fd88d6c35 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -1,4 +1,6 @@ -use crate::program_transformers::bubblegum::{upsert_asset_with_seq, upsert_collection_info}; +use crate::program_transformers::bubblegum::{ + asset_was_decompressed, upsert_asset_with_seq, upsert_collection_info, +}; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, @@ -41,6 +43,12 @@ where let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; @@ -54,7 +62,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index 134fe89ca..85c6e9857 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -1,7 +1,7 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified, }, }; @@ -51,6 +51,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -70,7 +76,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 889d90a0c..6c15b52bd 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -67,7 +67,7 @@ where ..Default::default() }; - let mut audit_item: Option<cl_audits::ActiveModel> = if (cl_audits) { + let audit_item: Option<cl_audits::ActiveModel> = if cl_audits { let mut ai: cl_audits::ActiveModel = item.clone().into(); ai.tx = Set(txn_id.to_string()); Some(ai) @@ -144,7 +144,6 @@ pub async fn upsert_asset_with_leaf_info<T>( data_hash: [u8; 32], creator_hash: [u8; 32], seq: i64, - was_decompressed: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -169,22 +168,19 @@ where asset::Column::Nonce, asset::Column::TreeId, asset::Column::Leaf, - asset::Column::LeafSeq, asset::Column::DataHash, asset::Column::CreatorHash, + asset::Column::LeafSeq, ]) .to_owned(), ) .build(DbBackend::Postgres); - // If we are indexing decompression we will update the leaf regardless of if we have previously - // indexed decompression and regardless of seq. - if !was_decompressed { - query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", - query.sql - ); - } + // If the asset was decompressed, don't update the leaf info since we cleared it during decompression. + query.sql = format!( + "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", + query.sql + ); txn.execute(query) .await @@ -202,26 +198,25 @@ where { let model = asset::ActiveModel { id: Set(id), - leaf: Set(None), nonce: Set(Some(0)), - leaf_seq: Set(None), + tree_id: Set(None), + leaf: Set(None), data_hash: Set(None), creator_hash: Set(None), - tree_id: Set(None), - seq: Set(Some(0)), + leaf_seq: Set(None), ..Default::default() }; + let query = asset::Entity::insert(model) .on_conflict( OnConflict::column(asset::Column::Id) .update_columns([ - asset::Column::Leaf, - asset::Column::LeafSeq, asset::Column::Nonce, + asset::Column::TreeId, + asset::Column::Leaf, asset::Column::DataHash, asset::Column::CreatorHash, - asset::Column::TreeId, - asset::Column::Seq, + asset::Column::LeafSeq, ]) .to_owned(), ) @@ -474,13 +469,6 @@ where //seq: Set(seq), }; - // First check to see if this asset has been decompressed. - if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? { - if let Some(0) = asset.seq { - return Ok(()); - } - }; - let mut query = asset_data::Entity::insert(model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) @@ -503,7 +491,6 @@ where ) .build(DbBackend::Postgres); query.sql = format!( - // New asset_data.seq. "{} WHERE excluded.seq >= asset_data.seq OR asset_data.seq IS NULL)", query.sql ); @@ -518,13 +505,13 @@ pub async fn upsert_asset_with_royalty_amount<T>( txn: &T, id: Vec<u8>, royalty_amount: i32, - seq: i64, + _seq: i64, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { let model = asset::ActiveModel { - id: Set(id), + id: Set(id.clone()), royalty_amount: Set(royalty_amount), //royalty_amount_seq: Set(Some(seq)), ..Default::default() @@ -542,8 +529,7 @@ where .build(DbBackend::Postgres); query.sql = format!( - // TODO DEAL WITH THIS - "{} WHERE (NOT asset.was_decompressed) AND (excluded.royalty_amount_seq >= asset.royalty_amount_seq OR royalty_amount_seq.seq IS NULL)", + "{} WHERE excluded.royalty_amount_seq >= asset.royalty_amount_seq OR royalty_amount_seq.seq IS NULL)", query.sql ); @@ -553,3 +539,15 @@ where Ok(()) } + +pub async fn asset_was_decompressed<T>(txn: &T, id: Vec<u8>) -> Result<bool, IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? { + if let Some(0) = asset.seq { + return Ok(true); + } + }; + Ok(false) +} diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs index 79e124ce6..6e9e0341a 100644 --- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs +++ b/nft_ingester/src/program_transformers/bubblegum/decompress.rs @@ -1,12 +1,13 @@ use crate::{ error::IngesterError, - program_transformers::bubblegum::upsert_asset_with_leaf_info_for_decompression, + program_transformers::bubblegum::{ + asset_was_decompressed, upsert_asset_with_compression_info, + upsert_asset_with_leaf_info_for_decompression, + }, }; use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; use sea_orm::{query::*, ConnectionTrait}; -use super::upsert_asset_with_compression_info; - pub async fn decompress<'c, T>( _parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, @@ -17,6 +18,11 @@ where { let id_bytes = bundle.keys.get(3).unwrap().0.as_slice(); + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index 88896de64..5f646b269 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -1,7 +1,7 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; @@ -30,6 +30,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -48,7 +54,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index ddb038515..dbcd13447 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,9 +1,10 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_data, upsert_asset_with_compression_info, - upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, - upsert_asset_with_royalty_amount, upsert_asset_with_seq, upsert_collection_info, + asset_was_decompressed, save_changelog_event, upsert_asset_data, + upsert_asset_with_compression_info, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_royalty_amount, + upsert_asset_with_seq, upsert_collection_info, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -63,6 +64,12 @@ where } => { let (edition_attachment_address, _) = find_master_edition_account(&id); let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(None); + } + let slot_i = bundle.slot as i64; let uri = metadata.uri.replace('\0', ""); let name = metadata.name.clone().into_bytes(); @@ -128,7 +135,7 @@ where }; // Upsert asset table base info. - let mut query = asset::Entity::insert(asset_model) + let query = asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([ @@ -144,11 +151,6 @@ where ) .build(DbBackend::Postgres); - // Do not overwrite changes that happened after the asset was decompressed. - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated OR asset.slot_updated IS NULL", - query.sql - ); txn.execute(query) .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; @@ -183,7 +185,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index b9b7f2c27..3dc0bc999 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -4,7 +4,8 @@ use log::debug; use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, upsert_asset_with_seq, + asset_was_decompressed, save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, + upsert_asset_with_seq, }, }; use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; @@ -32,6 +33,12 @@ where ); debug!("Indexing redeem for asset id: {:?}", asset_id); let id_bytes = asset_id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; @@ -45,7 +52,6 @@ where [0; 32], [0; 32], seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 573f33a8f..07abbe523 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -2,8 +2,8 @@ use super::save_changelog_event; use crate::{ error::IngesterError, program_transformers::bubblegum::{ - upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, - upsert_asset_with_seq, + asset_was_decompressed, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; use blockbuster::{ @@ -32,6 +32,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -51,7 +57,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs index 774acdd7e..940c0932c 100644 --- a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs +++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs @@ -1,8 +1,8 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_data, upsert_asset_with_leaf_info, - upsert_asset_with_royalty_amount, upsert_asset_with_seq, + asset_was_decompressed, save_changelog_event, upsert_asset_data, + upsert_asset_with_leaf_info, upsert_asset_with_royalty_amount, upsert_asset_with_seq, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -48,10 +48,17 @@ where &parsing_result.payload, ) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { id, nonce, .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(None); + } + let slot_i = bundle.slot as i64; let uri = if let Some(uri) = &update_args.uri { @@ -155,7 +162,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; @@ -216,8 +222,10 @@ where .exec(txn) .await?; - // This statement will update base information for each creator. - let query = asset_creators::Entity::insert_many(db_creator_infos) + // This statement will update base information for each creator and the + // `base_info_seq` number, allows for `mintV1` and `update_metadata` to be + // processed out of order. + let mut query = asset_creators::Entity::insert_many(db_creator_infos) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, @@ -227,15 +235,21 @@ where asset_creators::Column::Position, asset_creators::Column::Share, asset_creators::Column::SlotUpdated, + //asset_creators::Column::BaseInfoSeq, ]) .to_owned(), ) .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.base_info_seq > asset_creators.base_info_seq OR asset_creators.base_info_seq IS NULL", + query.sql + ); txn.execute(query).await?; - // This statement will update whether the creator is verified and the `seq` - // number. `seq` is used to protect the `verified` field, allowing for `mint` - // and `verifyCreator` to be processed out of order. + // This statement will update whether the creator is verified and the + // `verified_seq` number, which is used to protect the `verified` field, + // allowing for `mintV1`, `update_metadata`, and `verifyCreator` to be + // processed out of order. let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) .on_conflict( OnConflict::columns([ @@ -244,13 +258,13 @@ where ]) .update_columns([ asset_creators::Column::Verified, - asset_creators::Column::Seq, + //asset_creators::Column::VerifiedSeq, ]) .to_owned(), ) .build(DbBackend::Postgres); query.sql = format!( - "{} WHERE excluded.seq > asset_creators.seq OR asset_creators.seq IS NULL", + "{} WHERE excluded.verified_seq > asset_creators.verified_seq OR asset_creators.verified_seq IS NULL", query.sql ); txn.execute(query).await?; diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index 215399322..c4975d575 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -23,7 +23,7 @@ use num_traits::FromPrimitive; use plerkle_serialization::Pubkey as FBPubkey; use sea_orm::{ entity::*, query::*, sea_query::OnConflict, ActiveValue::Set, ConnectionTrait, DbBackend, - DbErr, EntityTrait, FromQueryResult, JoinType, JsonValue, + DbErr, EntityTrait, JsonValue, }; use std::collections::HashSet; @@ -195,6 +195,9 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>( asset_data: Set(Some(id.to_vec())), slot_updated: Set(Some(slot_i)), burnt: Set(false), + //data_hash, + //creator_hash, + //leaf_seq, ..Default::default() }; let mut query = asset::Entity::insert(model) @@ -273,6 +276,8 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>( txn.execute(query) .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + + // TODO remove old items that are no longer in collection. if let Some(c) = &metadata.collection { let model = asset_grouping::ActiveModel { asset_id: Set(id.to_vec()), @@ -345,7 +350,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>( ) .exec(&txn) .await?; - if db_creators.len() > 0 { + if !db_creators.is_empty() { let mut query = asset_creators::Entity::insert_many(db_creators) .on_conflict( OnConflict::columns([