diff --git a/Cargo.lock b/Cargo.lock index 7e33acdb9..ba8f0af05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,9 +881,9 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" [[package]] name = "blockbuster" -version = "0.9.0-beta.1" +version = "0.9.0-beta.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56e0240c1218958c0d51284d783fa055f551d769bb8b7a4abf635b17fa9620dc" +checksum = "a32a0edd58b3aaaf55684bc9ad82e012b3345cb46a25fcae507b3b9034b83d44" dependencies = [ "anchor-lang", "async-trait", @@ -2882,9 +2882,9 @@ dependencies = [ [[package]] name = "mpl-bubblegum" -version = "1.0.1-beta.3" +version = "1.0.1-beta.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29346f26192bb7f73330196fde4c8cfb35675bf1a4b026cd088f7ca8fda69f3f" +checksum = "e59d102fe6f8b063a06a226874ea815b269316390ce3bf991b29ea9c54ccc467" dependencies = [ "borsh 0.10.3", "kaigan", diff --git a/README.md b/README.md index 2784bcad9..e0b4d6d2b 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Because this is a multi component system the easiest way to develop or locally t #### Regenerating DB Types Edit the init.sql, then run `docker compose up db` Then with a local `DATABASE_URL` var exported like this `export DATABASE_URL=postgres://solana:solana@localhost/solana` you can run -` sea-orm-cli generate entity -o ./digital_asset_types/src/dao/generated/ --database-url $DATABASE_URL --with-serde both --expanded-format` +`sea-orm-cli generate entity -o ./digital_asset_types/src/dao/generated/ --database-url $DATABASE_URL --with-serde both --expanded-format` If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`. diff --git a/das_api/Cargo.toml b/das_api/Cargo.toml index c99d8b21a..533e6cc43 100644 --- a/das_api/Cargo.toml +++ b/das_api/Cargo.toml @@ -33,9 +33,9 @@ schemars = "0.8.6" schemars_derive = "0.8.6" open-rpc-derive = { version = "0.0.4"} open-rpc-schema = { version = "0.0.4"} -blockbuster = "0.9.0-beta.1" +blockbuster = "=0.9.0-beta.5" anchor-lang = "0.28.0" mpl-token-metadata = { version = "=2.0.0-beta.1", features = ["serde-feature"] } mpl-candy-machine-core = { version = "2.0.1", features = ["no-entrypoint"] } -mpl-bubblegum = "1.0.1-beta.3" +mpl-bubblegum = "=1.0.1-beta.4" mpl-candy-guard = { version = "2.0.0", features = ["no-entrypoint"] } diff --git a/digital_asset_types/Cargo.toml b/digital_asset_types/Cargo.toml index 8e4b18bb9..3464035fa 100644 --- a/digital_asset_types/Cargo.toml +++ b/digital_asset_types/Cargo.toml @@ -18,7 +18,7 @@ solana-sdk = "~1.16.16" num-traits = "0.2.15" num-derive = "0.3.3" thiserror = "1.0.31" -blockbuster = "0.9.0-beta.1" +blockbuster = "=0.9.0-beta.5" jsonpath_lib = "0.3.0" mime_guess = "2.0.4" url = "2.3.1" diff --git a/digital_asset_types/src/dao/generated/asset.rs b/digital_asset_types/src/dao/generated/asset.rs index 0ced69299..0c32be2fe 100644 --- a/digital_asset_types/src/dao/generated/asset.rs +++ b/digital_asset_types/src/dao/generated/asset.rs @@ -44,8 +44,8 @@ pub struct Model { pub data_hash: Option, pub creator_hash: Option, pub owner_delegate_seq: Option, - pub was_decompressed: bool, pub leaf_seq: Option, + pub base_info_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -76,8 +76,8 @@ pub enum Column { DataHash, CreatorHash, OwnerDelegateSeq, - WasDecompressed, LeafSeq, + BaseInfoSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -131,8 +131,8 @@ impl ColumnTrait for Column { Self::DataHash => ColumnType::Char(Some(50u32)).def().null(), Self::CreatorHash => ColumnType::Char(Some(50u32)).def().null(), Self::OwnerDelegateSeq => ColumnType::BigInteger.def().null(), - Self::WasDecompressed => ColumnType::Boolean.def(), Self::LeafSeq => ColumnType::BigInteger.def().null(), + Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index 374ed854a..f6bf697b2 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -24,8 +24,9 @@ pub struct Model { pub metadata: Json, pub slot_updated: i64, pub reindex: Option, - pub raw_name: Vec, - pub raw_symbol: Vec, + pub raw_name: Option>, + pub raw_symbol: Option>, + pub base_info_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -40,6 +41,7 @@ pub enum Column { Reindex, RawName, RawSymbol, + BaseInfoSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -70,9 +72,10 @@ impl ColumnTrait for Column { Self::MetadataMutability => Mutability::db_type(), Self::Metadata => ColumnType::JsonBinary.def(), Self::SlotUpdated => ColumnType::BigInteger.def(), - Self::Reindex => ColumnType::Boolean.def(), - Self::RawName => ColumnType::Binary.def(), - Self::RawSymbol => ColumnType::Binary.def(), + Self::Reindex => ColumnType::Boolean.def().null(), + Self::RawName => ColumnType::Binary.def().null(), + Self::RawSymbol => ColumnType::Binary.def().null(), + Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/asset_grouping.rs b/digital_asset_types/src/dao/generated/asset_grouping.rs index aae51d6d8..5d5c0e749 100644 --- a/digital_asset_types/src/dao/generated/asset_grouping.rs +++ b/digital_asset_types/src/dao/generated/asset_grouping.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 +//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ impl EntityName for Entity { } } -#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)] pub struct Model { pub id: i64, pub asset_id: Vec, @@ -20,7 +20,7 @@ pub struct Model { pub group_value: Option, pub seq: Option, pub slot_updated: Option, - pub verified: Option, + pub verified: bool, pub group_info_seq: Option, } diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs index a07714202..0d02b7769 100644 --- a/digital_asset_types/src/dao/generated/cl_audits.rs +++ b/digital_asset_types/src/dao/generated/cl_audits.rs @@ -22,7 +22,7 @@ pub struct Model { pub seq: i64, pub level: i64, pub hash: Vec, - pub created_at: Option, + pub created_at: DateTime, pub tx: String, } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 2be0283e7..11362ae1c 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -3,34 +3,6 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")] -pub enum Mutability { - #[sea_orm(string_value = "immutable")] - Immutable, - #[sea_orm(string_value = "mutable")] - Mutable, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "v1_account_attachments" -)] -pub enum V1AccountAttachments { - #[sea_orm(string_value = "edition")] - Edition, - #[sea_orm(string_value = "edition_marker")] - EditionMarker, - #[sea_orm(string_value = "master_edition_v1")] - MasterEditionV1, - #[sea_orm(string_value = "master_edition_v2")] - MasterEditionV2, - #[sea_orm(string_value = "unknown")] - Unknown, -} #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] pub enum TaskStatus { @@ -44,6 +16,22 @@ pub enum TaskStatus { Success, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "specification_versions" +)] +pub enum SpecificationVersions { + #[sea_orm(string_value = "unknown")] + Unknown, + #[sea_orm(string_value = "v0")] + V0, + #[sea_orm(string_value = "v1")] + V1, + #[sea_orm(string_value = "v2")] + V2, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -60,6 +48,36 @@ pub enum RoyaltyTargetType { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] +pub enum OwnerType { + #[sea_orm(string_value = "single")] + Single, + #[sea_orm(string_value = "token")] + Token, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")] +pub enum Mutability { + #[sea_orm(string_value = "immutable")] + Immutable, + #[sea_orm(string_value = "mutable")] + Mutable, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] +pub enum ChainMutability { + #[sea_orm(string_value = "immutable")] + Immutable, + #[sea_orm(string_value = "mutable")] + Mutable, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -88,38 +106,20 @@ pub enum SpecificationAssetClass { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] -pub enum ChainMutability { - #[sea_orm(string_value = "immutable")] - Immutable, - #[sea_orm(string_value = "mutable")] - Mutable, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", - enum_name = "specification_versions" + enum_name = "v1_account_attachments" )] -pub enum SpecificationVersions { - #[sea_orm(string_value = "unknown")] - Unknown, - #[sea_orm(string_value = "v0")] - V0, - #[sea_orm(string_value = "v1")] - V1, - #[sea_orm(string_value = "v2")] - V2, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] -pub enum OwnerType { - #[sea_orm(string_value = "single")] - Single, - #[sea_orm(string_value = "token")] - Token, +pub enum V1AccountAttachments { + #[sea_orm(string_value = "edition")] + Edition, + #[sea_orm(string_value = "edition_marker")] + EditionMarker, + #[sea_orm(string_value = "master_edition_v1")] + MasterEditionV1, + #[sea_orm(string_value = "master_edition_v2")] + MasterEditionV2, #[sea_orm(string_value = "unknown")] Unknown, } diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 6f51ea8b9..ebda56afe 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -58,7 +58,7 @@ pub async fn get_by_creator( show_unverified_collections: bool, ) -> Result, DbErr> { let mut condition = Condition::all() - .add(asset_creators::Column::Creator.eq(creator)) + .add(asset_creators::Column::Creator.eq(creator.clone())) .add(asset::Column::Supply.gt(0)); if only_verified { condition = condition.add(asset_creators::Column::Verified.eq(true)); @@ -72,6 +72,7 @@ pub async fn get_by_creator( pagination, limit, show_unverified_collections, + Some(creator), ) .await } @@ -130,6 +131,7 @@ pub async fn get_by_grouping( pagination, limit, show_unverified_collections, + None, ) .await } @@ -203,6 +205,7 @@ pub async fn get_by_authority( pagination, limit, show_unverified_collections, + None, ) .await } @@ -216,6 +219,7 @@ async fn get_by_related_condition( pagination: &Pagination, limit: u64, show_unverified_collections: bool, + required_creator: Option>, ) -> Result, DbErr> where E: RelationTrait, @@ -233,13 +237,14 @@ where let assets = paginate(pagination, limit, stmt, sort_direction, asset::Column::Id) .all(conn) .await?; - get_related_for_assets(conn, assets, show_unverified_collections).await + get_related_for_assets(conn, assets, show_unverified_collections, required_creator).await } pub async fn get_related_for_assets( conn: &impl ConnectionTrait, assets: Vec, show_unverified_collections: bool, + required_creator: Option>, ) -> Result, DbErr> { let asset_ids = assets.iter().map(|a| a.id.clone()).collect::>(); @@ -272,29 +277,47 @@ pub async fn get_related_for_assets( acc }); let ids = assets_map.keys().cloned().collect::>(); - let authorities = asset_authority::Entity::find() - .filter(asset_authority::Column::AssetId.is_in(ids.clone())) - .order_by_asc(asset_authority::Column::AssetId) - .all(conn) - .await?; - for a in authorities.into_iter() { - if let Some(asset) = assets_map.get_mut(&a.asset_id) { - asset.authorities.push(a); - } - } + // Get all creators for all assets in `assets_map``. let creators = asset_creators::Entity::find() .filter(asset_creators::Column::AssetId.is_in(ids.clone())) .order_by_asc(asset_creators::Column::AssetId) .order_by_asc(asset_creators::Column::Position) .all(conn) .await?; + + // Add the creators to the assets in `asset_map``. for c in creators.into_iter() { if let Some(asset) = assets_map.get_mut(&c.asset_id) { asset.creators.push(c); } } + // Filter out stale creators from each asset. + for (_id, asset) in assets_map.iter_mut() { + filter_out_stale_creators(&mut asset.creators); + } + + // If we passed in a required creator, we make sure that creator is still in the creator array + // of each asset after stale creators were filtered out above. Only retain those assets that + // have the required creator. This corrects `getAssetByCreators` from returning assets for + // which the required creator is no longer in the creator array. + if let Some(required) = required_creator { + assets_map.retain(|_id, asset| asset.creators.iter().any(|c| c.creator == required)); + } + + let ids = assets_map.keys().cloned().collect::>(); + let authorities = asset_authority::Entity::find() + .filter(asset_authority::Column::AssetId.is_in(ids.clone())) + .order_by_asc(asset_authority::Column::AssetId) + .all(conn) + .await?; + for a in authorities.into_iter() { + if let Some(asset) = assets_map.get_mut(&a.asset_id) { + asset.authorities.push(a); + } + } + let cond = if show_unverified_collections { Condition::all() } else { @@ -345,7 +368,8 @@ pub async fn get_assets_by_condition( let assets = paginate(pagination, limit, stmt, sort_direction, asset::Column::Id) .all(conn) .await?; - let full_assets = get_related_for_assets(conn, assets, show_unverified_collections).await?; + let full_assets = + get_related_for_assets(conn, assets, show_unverified_collections, None).await?; Ok(full_assets) } @@ -371,11 +395,14 @@ pub async fn get_by_id( .order_by_asc(asset_authority::Column::AssetId) .all(conn) .await?; - let creators: Vec = asset_creators::Entity::find() + let mut creators: Vec = asset_creators::Entity::find() .filter(asset_creators::Column::AssetId.eq(asset.id.clone())) .order_by_asc(asset_creators::Column::Position) .all(conn) .await?; + + filter_out_stale_creators(&mut creators); + let grouping: Vec = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.eq(asset.id.clone())) .filter(asset_grouping::Column::GroupValue.is_not_null()) @@ -397,3 +424,36 @@ pub async fn get_by_id( groups: grouping, }) } + +fn filter_out_stale_creators(creators: &mut Vec) { + // If the first creator is an empty Vec, it means the creator array is empty (which is allowed + // for compressed assets in Bubblegum). + if !creators.is_empty() && creators[0].creator.is_empty() { + creators.clear(); + } else { + // For both compressed and non-compressed assets, any creators that do not have the max + // `slot_updated` value are stale and should be removed. + let max_slot_updated = creators.iter().map(|creator| creator.slot_updated).max(); + if let Some(max_slot_updated) = max_slot_updated { + creators.retain(|creator| creator.slot_updated == max_slot_updated); + } + + // For compressed assets, any creators that do not have the max `seq` value are stale and + // should be removed. A `seq` value of 0 indicates a decompressed or never-compressed + // asset. So if a `seq` value of 0 is present, then all creators with nonzero `seq` values + // are stale and should be removed. + let seq = if creators + .iter() + .map(|creator| creator.seq) + .any(|seq| seq == Some(0)) + { + Some(Some(0)) + } else { + creators.iter().map(|creator| creator.seq).max() + }; + + if let Some(seq) = seq { + creators.retain(|creator| creator.seq == seq); + } + } +} diff --git a/digital_asset_types/src/dapi/common/asset.rs b/digital_asset_types/src/dapi/common/asset.rs index 2377ddf32..a67a2caaf 100644 --- a/digital_asset_types/src/dapi/common/asset.rs +++ b/digital_asset_types/src/dapi/common/asset.rs @@ -292,7 +292,7 @@ pub fn to_grouping( .filter_map(|model| { let verified = match options.show_unverified_collections { // Null verified indicates legacy data, meaning it is verified. - true => Some(model.verified.unwrap_or(true)), + true => Some(model.verified), false => None, }; // Filter out items where group_value is None. diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index bbe3cf509..e486ef289 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -83,8 +83,9 @@ pub fn create_asset_data( metadata: JsonValue::String("processing".to_string()), slot_updated: 0, reindex: None, - raw_name: metadata.name.into_bytes().to_vec().clone(), - raw_symbol: metadata.symbol.into_bytes().to_vec().clone(), + raw_name: Some(metadata.name.into_bytes().to_vec().clone()), + raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()), + base_info_seq: Some(0), }, ) } @@ -155,8 +156,8 @@ pub fn create_asset( alt_id: None, creator_hash: None, owner_delegate_seq: Some(0), - was_decompressed: false, leaf_seq: Some(0), + base_info_seq: Some(0), }, ) } @@ -231,7 +232,7 @@ pub fn create_asset_grouping( id: row_num, group_key: "collection".to_string(), slot_updated: Some(0), - verified: Some(false), + verified: false, group_info_seq: Some(0), }, ) diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index 765f14bc6..b689010ec 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -34,8 +34,9 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content { metadata: json, slot_updated: 0, reindex: None, - raw_name: String::from("Handalf").into_bytes().to_vec(), - raw_symbol: String::from("").into_bytes().to_vec(), + raw_name: Some(String::from("Handalf").into_bytes().to_vec()), + raw_symbol: Some(String::from("").into_bytes().to_vec()), + base_info_seq: Some(0), }; v1_content_from_json(&asset_data).unwrap() diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 7e38ac93d..c54bdc81a 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -30,6 +30,8 @@ mod m20230724_120101_add_group_info_seq; mod m20230726_013107_remove_not_null_constraint_from_group_value; mod m20230918_182123_add_raw_name_symbol; mod m20230919_072154_cl_audits; +mod m20231019_120101_add_seq_numbers_bgum_update_metadata; +mod m20231206_120101_remove_was_decompressed; pub struct Migrator; @@ -67,6 +69,8 @@ impl MigratorTrait for Migrator { Box::new(m20230726_013107_remove_not_null_constraint_from_group_value::Migration), Box::new(m20230918_182123_add_raw_name_symbol::Migration), Box::new(m20230919_072154_cl_audits::Migration), + Box::new(m20231019_120101_add_seq_numbers_bgum_update_metadata::Migration), + Box::new(m20231206_120101_remove_was_decompressed::Migration), ] } } diff --git a/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs b/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs new file mode 100644 index 000000000..78e71b21e --- /dev/null +++ b/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs @@ -0,0 +1,55 @@ +use digital_asset_types::dao::{asset, asset_creators, asset_data}; +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(asset_data::Entity) + .add_column(ColumnDef::new(Alias::new("base_info_seq")).big_integer()) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .add_column(ColumnDef::new(Alias::new("base_info_seq")).big_integer()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(asset_data::Entity) + .drop_column(Alias::new("base_info_seq")) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .drop_column(Alias::new("base_info_seq")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20231206_120101_remove_was_decompressed.rs b/migration/src/m20231206_120101_remove_was_decompressed.rs new file mode 100644 index 000000000..27dac3fa9 --- /dev/null +++ b/migration/src/m20231206_120101_remove_was_decompressed.rs @@ -0,0 +1,39 @@ +use digital_asset_types::dao::asset; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .drop_column(Alias::new("was_decompressed")) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .add_column( + ColumnDef::new(Alias::new("was_decompressed")) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 6fb9f998c..9f81313bf 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -29,19 +29,19 @@ flatbuffers = "23.1.21" lazy_static = "1.4.0" regex = "1.5.5" digital_asset_types = { path = "../digital_asset_types", features = ["json_types", "sql_types"] } -mpl-bubblegum = "1.0.1-beta.3" +mpl-bubblegum = "=1.0.1-beta.4" spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } spl-concurrent-merkle-tree = "0.2.0" uuid = "1.0.0" async-trait = "0.1.53" num-traits = "0.2.15" -blockbuster = "0.9.0-beta.1" +blockbuster = "=0.9.0-beta.5" figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } cadence = "0.29.0" cadence-macros = "0.29.0" solana-sdk = "~1.16.16" solana-client = "~1.16.16" -spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } +spl-token = { version = "4.0.0", features = ["no-entrypoint"] } solana-transaction-status = "~1.16.16" solana-account-decoder = "~1.16.16" solana-geyser-plugin-interface = "~1.16.16" diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index 70ddcfcea..ef6c1dc7c 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -42,20 +42,25 @@ where ..Default::default() }; - // Upsert asset table `burnt` column. + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Upsert asset table `burnt` column. Note we don't check for decompression (asset.seq = 0) + // because we know if the item was burnt it could not have been decompressed later. let query = asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) - .update_columns([ - asset::Column::Burnt, - //TODO maybe handle slot updated. - ]) + .update_columns([asset::Column::Burnt]) .to_owned(), ) .build(DbBackend::Postgres); - txn.execute(query).await?; + multi_txn.execute(query).await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + multi_txn.commit().await?; return Ok(()); } diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index 1b8f5842a..d1a91ae45 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -22,8 +22,7 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - #[allow(unreachable_patterns)] - return match le.schema { + match le.schema { LeafSchema::V1 { id, owner, @@ -40,9 +39,14 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -50,13 +54,12 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -64,9 +67,13 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); } - }; + } } Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 7517f1544..c0397687a 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -26,7 +26,7 @@ where let (collection, verify) = match payload { Payload::CollectionVerification { collection, verify, .. - } => (collection.clone(), verify.clone()), + } => (collection, verify), _ => { return Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), @@ -41,12 +41,18 @@ where let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; + let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -54,24 +60,25 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; upsert_collection_info( - txn, + &multi_txn, id_bytes.to_vec(), Some(Collection { - key: collection.clone(), - verified: verify, + key: *collection, + verified: *verify, }), bundle.slot as i64, seq as i64, ) .await?; + multi_txn.commit().await?; + return Ok(()); }; Err(IngesterError::ParsingError( diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index 134fe89ca..a1ea3a050 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -1,8 +1,8 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified, + save_changelog_event, upsert_asset_creators, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; use blockbuster::{ @@ -10,13 +10,13 @@ use blockbuster::{ programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, }; use log::debug; +use mpl_bubblegum::types::Creator; use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - value: bool, cl_audits: bool, ) -> Result<(), IngesterError> where @@ -27,10 +27,26 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let (creator, verify) = match payload { + let (updated_creators, creator, verify) = match payload { Payload::CreatorVerification { - creator, verify, .. - } => (creator, verify), + metadata, + creator, + verify, + } => { + let updated_creators: Vec = metadata + .creators + .iter() + .map(|c| { + let mut c = c.clone(); + if c.address == *creator { + c.verified = *verify + }; + c + }) + .collect(); + + (updated_creators, creator, verify) + } _ => { return Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), @@ -51,6 +67,7 @@ where .. } => { let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -60,9 +77,14 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf info. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -70,13 +92,12 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -84,17 +105,20 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; id_bytes.to_vec() } }; - upsert_creator_verified( + // Upsert creators to `asset_creators` table. + upsert_asset_creators( txn, asset_id_bytes, - creator.to_bytes().to_vec(), - value, + &updated_creators, + bundle.slot as i64, seq as i64, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 7e930abdc..6b12eea32 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,15 +1,19 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, + asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items, cl_audits, + cl_items, + sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, + }, }; use log::{debug, info}; -use mpl_bubblegum::types::Collection; +use mpl_bubblegum::types::{Collection, Creator}; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; - -use std::convert::From; +use std::collections::HashSet; pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, @@ -68,7 +72,7 @@ where ..Default::default() }; - let mut audit_item: Option = if (cl_audits) { + let audit_item: Option = if cl_audits { let mut ai: cl_audits::ActiveModel = item.clone().into(); ai.tx = Set(txn_id.to_string()); Some(ai) @@ -135,6 +139,7 @@ where //TODO -> set maximum size of path and break into multiple statements } +#[allow(clippy::too_many_arguments)] pub async fn upsert_asset_with_leaf_info( txn: &T, id: Vec, @@ -144,7 +149,6 @@ pub async fn upsert_asset_with_leaf_info( data_hash: [u8; 32], creator_hash: [u8; 32], seq: i64, - was_decompressed: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -169,22 +173,20 @@ where asset::Column::Nonce, asset::Column::TreeId, asset::Column::Leaf, - asset::Column::LeafSeq, asset::Column::DataHash, asset::Column::CreatorHash, + asset::Column::LeafSeq, ]) .to_owned(), ) .build(DbBackend::Postgres); - // If we are indexing decompression we will update the leaf regardless of if we have previously - // indexed decompression and regardless of seq. - if !was_decompressed { - query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", - query.sql - ); - } + // Do not overwrite changes that happened after decompression (asset.seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. + query.sql = format!( + "{} WHERE (asset.seq != 0 OR asset.seq IS NULL) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", + query.sql + ); txn.execute(query) .await @@ -193,7 +195,7 @@ where Ok(()) } -pub async fn upsert_asset_with_leaf_info_for_decompression( +pub async fn upsert_asset_with_leaf_and_compression_info_for_decompression( txn: &T, id: Vec, ) -> Result<(), IngesterError> @@ -201,31 +203,43 @@ where T: ConnectionTrait + TransactionTrait, { let model = asset::ActiveModel { - id: Set(id), - leaf: Set(None), + id: Set(id.clone()), nonce: Set(Some(0)), - leaf_seq: Set(None), + tree_id: Set(None), + leaf: Set(None), data_hash: Set(None), creator_hash: Set(None), - tree_id: Set(None), + compressed: Set(false), + compressible: Set(false), + supply: Set(1), + supply_mint: Set(Some(id)), seq: Set(Some(0)), ..Default::default() }; - let query = asset::Entity::insert(model) + + let mut query = asset::Entity::insert(model) .on_conflict( OnConflict::column(asset::Column::Id) .update_columns([ - asset::Column::Leaf, - asset::Column::LeafSeq, asset::Column::Nonce, + asset::Column::TreeId, + asset::Column::Leaf, asset::Column::DataHash, asset::Column::CreatorHash, - asset::Column::TreeId, + asset::Column::LeafSeq, + asset::Column::Compressed, + asset::Column::Compressible, + asset::Column::Supply, + asset::Column::SupplyMint, asset::Column::Seq, ]) .to_owned(), ) .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + query.sql = format!("{} WHERE asset.seq != 0 OR asset.seq IS NULL", query.sql); + txn.execute(query) .await .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; @@ -247,7 +261,7 @@ where id: Set(id), owner: Set(Some(owner)), delegate: Set(delegate), - owner_delegate_seq: Set(Some(seq)), // gummyroll seq + owner_delegate_seq: Set(Some(seq)), ..Default::default() }; @@ -262,8 +276,11 @@ where .to_owned(), ) .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. query.sql = format!( - "{} WHERE excluded.owner_delegate_seq >= asset.owner_delegate_seq OR asset.owner_delegate_seq IS NULL", + "{} WHERE (asset.seq != 0 OR asset.seq IS NULL) AND (excluded.owner_delegate_seq >= asset.owner_delegate_seq OR asset.owner_delegate_seq IS NULL)", query.sql ); @@ -281,7 +298,6 @@ pub async fn upsert_asset_with_compression_info( compressible: bool, supply: i64, supply_mint: Option>, - was_decompressed: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -292,7 +308,6 @@ where compressible: Set(compressible), supply: Set(supply), supply_mint: Set(supply_mint), - was_decompressed: Set(was_decompressed), ..Default::default() }; @@ -304,12 +319,13 @@ where asset::Column::Compressible, asset::Column::Supply, asset::Column::SupplyMint, - asset::Column::WasDecompressed, ]) .to_owned(), ) .build(DbBackend::Postgres); - query.sql = format!("{} WHERE NOT asset.was_decompressed", query.sql); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + query.sql = format!("{} WHERE asset.seq != 0 OR asset.seq IS NULL", query.sql); txn.execute(query).await?; Ok(()) @@ -333,8 +349,10 @@ where ) .build(DbBackend::Postgres); + // Do not overwrite changes that happened after decompression (asset.seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.seq >= asset.seq OR asset.seq IS NULL)", + "{} WHERE (asset.seq != 0 AND excluded.seq >= asset.seq) OR asset.seq IS NULL", query.sql ); @@ -345,40 +363,50 @@ where Ok(()) } -pub async fn upsert_creator_verified( +pub async fn upsert_collection_info( txn: &T, asset_id: Vec, - creator: Vec, - verified: bool, + collection: Option, + slot_updated: i64, seq: i64, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - let model = asset_creators::ActiveModel { + let (group_value, verified) = match collection { + Some(c) => (Some(c.key.to_string()), c.verified), + None => (None, false), + }; + + let model = asset_grouping::ActiveModel { asset_id: Set(asset_id), - creator: Set(creator), + group_key: Set("collection".to_string()), + group_value: Set(group_value), verified: Set(verified), - seq: Set(Some(seq)), + slot_updated: Set(Some(slot_updated)), + group_info_seq: Set(Some(seq)), ..Default::default() }; - let mut query = asset_creators::Entity::insert(model) + let mut query = asset_grouping::Entity::insert(model) .on_conflict( OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, ]) .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::Seq, + asset_grouping::Column::GroupValue, + asset_grouping::Column::Verified, + asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, ]) .to_owned(), ) .build(DbBackend::Postgres); + // Do not overwrite changes that happened after decompression (asset_grouping.group_info_seq = 0). query.sql = format!( - "{} WHERE excluded.seq >= asset_creators.seq OR asset_creators.seq is NULL", + "{} WHERE (asset_grouping.group_info_seq != 0 AND excluded.group_info_seq >= asset_grouping.group_info_seq) OR asset_grouping.group_info_seq IS NULL", query.sql ); @@ -389,55 +417,248 @@ where Ok(()) } -pub async fn upsert_collection_info( +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_data( txn: &T, - asset_id: Vec, - collection: Option, + id: Vec, + chain_data_mutability: ChainMutability, + chain_data: JsonValue, + metadata_url: String, + metadata_mutability: Mutability, + metadata: JsonValue, slot_updated: i64, + reindex: Option, + raw_name: Vec, + raw_symbol: Vec, seq: i64, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - let (group_value, verified) = match collection { - Some(c) => (Some(c.key.to_string()), c.verified), - None => (None, false), + let model = asset_data::ActiveModel { + id: Set(id.clone()), + chain_data_mutability: Set(chain_data_mutability), + chain_data: Set(chain_data), + metadata_url: Set(metadata_url), + metadata_mutability: Set(metadata_mutability), + metadata: Set(metadata), + slot_updated: Set(slot_updated), + reindex: Set(reindex), + raw_name: Set(Some(raw_name)), + raw_symbol: Set(Some(raw_symbol)), + base_info_seq: Set(Some(seq)), + ..Default::default() }; - let model = asset_grouping::ActiveModel { - asset_id: Set(asset_id), - group_key: Set("collection".to_string()), - group_value: Set(group_value), - verified: Set(Some(verified)), + let mut query = asset_data::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_data::Column::Id]) + .update_columns([ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataUrl, + asset_data::Column::MetadataMutability, + // Don't update asset_data::Column::Metadata if it already exists. Even if we + // are indexing `update_metadata`` and there's a new URI, the new background + // task will overwrite it. + asset_data::Column::SlotUpdated, + asset_data::Column::Reindex, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset_data.base_info_seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. + query.sql = format!( + "{} WHERE (asset_data.base_info_seq != 0 AND excluded.base_info_seq >= asset_data.base_info_seq) OR asset_data.base_info_seq IS NULL", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_base_info( + txn: &T, + id: Vec, + owner_type: OwnerType, + frozen: bool, + specification_version: SpecificationVersions, + specification_asset_class: SpecificationAssetClass, + royalty_target_type: RoyaltyTargetType, + royalty_target: Option>, + royalty_amount: i32, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + // Set base info for asset. + let asset_model = asset::ActiveModel { + id: Set(id.clone()), + owner_type: Set(owner_type), + frozen: Set(frozen), + specification_version: Set(Some(specification_version)), + specification_asset_class: Set(Some(specification_asset_class)), + royalty_target_type: Set(royalty_target_type), + royalty_target: Set(royalty_target), + royalty_amount: Set(royalty_amount), + asset_data: Set(Some(id.clone())), slot_updated: Set(Some(slot_updated)), - group_info_seq: Set(Some(seq)), + base_info_seq: Set(Some(seq)), ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) + // Upsert asset table base info. + let mut query = asset::Entity::insert(asset_model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([ + asset::Column::OwnerType, + asset::Column::Frozen, + asset::Column::SpecificationVersion, + asset::Column::SpecificationAssetClass, + asset::Column::RoyaltyTargetType, + asset::Column::RoyaltyTarget, + asset::Column::RoyaltyAmount, + asset::Column::AssetData, + asset::Column::SlotUpdated, + asset::Column::BaseInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE (asset.seq != 0 OR asset.seq IS NULL) AND (excluded.base_info_seq >= asset.base_info_seq OR asset.base_info_seq IS NULL)", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_creators( + txn: &T, + id: Vec, + creators: &Vec, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + // Vec to hold base creator information. + let mut db_creators = Vec::with_capacity(creators.len()); + + if creators.is_empty() { + // Bubblegum supports empty creator array. In this case insert an empty Vec + // for the creator. + db_creators.push(asset_creators::ActiveModel { + asset_id: Set(id.clone()), + position: Set(0), + creator: Set(vec![]), + share: Set(100), + verified: Set(false), + slot_updated: Set(Some(slot_updated)), + seq: Set(Some(seq)), + ..Default::default() + }); + } else { + // Set to prevent duplicates. + let mut creators_set = HashSet::new(); + + for (i, c) in creators.iter().enumerate() { + if creators_set.contains(&c.address) { + continue; + } + + db_creators.push(asset_creators::ActiveModel { + asset_id: Set(id.clone()), + position: Set(i as i16), + creator: Set(c.address.to_bytes().to_vec()), + share: Set(c.share as i32), + verified: Set(c.verified), + slot_updated: Set(Some(slot_updated)), + seq: Set(Some(seq)), + ..Default::default() + }); + + creators_set.insert(c.address); + } + } + + // This statement will update base information for each creator. + let mut query = asset_creators::Entity::insert_many(db_creators) .on_conflict( OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, + asset_creators::Column::AssetId, + asset_creators::Column::Position, ]) .update_columns([ - asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, - asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, + asset_creators::Column::Creator, + asset_creators::Column::Share, + asset_creators::Column::Verified, + asset_creators::Column::SlotUpdated, + asset_creators::Column::Seq, ]) .to_owned(), ) .build(DbBackend::Postgres); query.sql = format!( - "{} WHERE excluded.group_info_seq >= asset_grouping.group_info_seq OR asset_grouping.group_info_seq IS NULL", + "{} WHERE (asset_creators.seq != 0 AND excluded.seq >= asset_creators.seq) OR asset_creators.seq IS NULL", query.sql ); + txn.execute(query).await?; + + Ok(()) +} + +pub async fn upsert_asset_authority( + txn: &T, + asset_id: Vec, + authority: Vec, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_authority::ActiveModel { + asset_id: Set(asset_id), + authority: Set(authority), + seq: Set(seq), + slot_updated: Set(slot_updated), + ..Default::default() + }; + + // This value is only written during `mint_V1`` or after an item is decompressed, so do not + // attempt to modify any existing values: + // `ON CONFLICT ('asset_id') DO NOTHING`. + let query = asset_authority::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_authority::Column::AssetId]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query) .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; Ok(()) } diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs index a024d5ebe..951fa7e4b 100644 --- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs +++ b/nft_ingester/src/program_transformers/bubblegum/decompress.rs @@ -1,12 +1,10 @@ use crate::{ error::IngesterError, - program_transformers::bubblegum::upsert_asset_with_leaf_info_for_decompression, + program_transformers::bubblegum::upsert_asset_with_leaf_and_compression_info_for_decompression, }; use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; use sea_orm::{query::*, ConnectionTrait}; -use super::upsert_asset_with_compression_info; - pub async fn decompress<'c, T>( _parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, @@ -17,16 +15,8 @@ where { let id_bytes = bundle.keys.get(3).unwrap().0.as_slice(); - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?; - upsert_asset_with_compression_info( - txn, - id_bytes.to_vec(), - false, - false, - 1, - Some(id_bytes.to_vec()), - true, - ) - .await + // Partial update of asset table with leaf and compression info. + upsert_asset_with_leaf_and_compression_info_for_decompression(txn, id_bytes.to_vec()).await?; + + Ok(()) } diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index 88896de64..4cb4aadc2 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -22,7 +22,7 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - return match le.schema { + match le.schema { LeafSchema::V1 { id, owner, @@ -38,9 +38,14 @@ where }; let tree_id = cl.id.to_bytes(); + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), cl.index as i64, tree_id.to_vec(), @@ -48,13 +53,12 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -62,9 +66,13 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); } - }; + } } Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 752ed6a3c..643b25bf1 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,39 +1,29 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_compression_info, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info, + save_changelog_event, upsert_asset_authority, upsert_asset_base_info, + upsert_asset_creators, upsert_asset_data, upsert_asset_with_compression_info, + upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, + upsert_asset_with_seq, upsert_collection_info, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, - token_metadata::{ - pda::find_master_edition_account, - state::{TokenStandard, UseMethod, Uses}, - }, + token_metadata::state::{TokenStandard, UseMethod, Uses}, }; use chrono::Utc; use digital_asset_types::{ - dao::{ - asset, asset_authority, asset_creators, asset_data, asset_v1_account_attachments, - sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType}, + dao::sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, }, json::ChainDataV1, }; use log::warn; use num_traits::FromPrimitive; -use sea_orm::{ - entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue, -}; -use std::collections::HashSet; - -use digital_asset_types::dao::sea_orm_active_enums::{ - SpecificationAssetClass, SpecificationVersions, V1AccountAttachments, -}; - -// TODO -> consider moving structs into these functions to avoid clone +use sea_orm::{query::*, ConnectionTrait, JsonValue}; pub async fn mint_v1<'c, T>( parsing_result: &BubblegumInstruction, @@ -44,7 +34,15 @@ pub async fn mint_v1<'c, T>( where T: ConnectionTrait + TransactionTrait, { - if let (Some(le), Some(cl), Some(Payload::MintV1 { args })) = ( + if let ( + Some(le), + Some(cl), + Some(Payload::MintV1 { + args, + authority, + tree_id, + }), + ) = ( &parsing_result.leaf_update, &parsing_result.tree_update, &parsing_result.payload, @@ -60,7 +58,6 @@ where nonce, .. } => { - let (edition_attachment_address, _) = find_master_edition_account(&id); let id_bytes = id.to_bytes(); let slot_i = bundle.slot as i64; let uri = metadata.uri.replace('\0', ""); @@ -86,124 +83,77 @@ where false => ChainMutability::Immutable, }; - let data = asset_data::ActiveModel { - id: Set(id_bytes.to_vec()), - chain_data_mutability: Set(chain_mutability), - chain_data: Set(chain_data_json), - metadata_url: Set(uri.clone()), - metadata: Set(JsonValue::String("processing".to_string())), - metadata_mutability: Set(Mutability::Mutable), - slot_updated: Set(slot_i), - reindex: Set(Some(true)), - raw_name: Set(name.to_vec()), - raw_symbol: Set(symbol.to_vec()), - ..Default::default() - }; + upsert_asset_data( + txn, + id_bytes.to_vec(), + chain_mutability, + chain_data_json, + uri.clone(), + Mutability::Mutable, + JsonValue::String("processing".to_string()), + slot_i, + Some(true), + name.to_vec(), + symbol.to_vec(), + seq as i64, + ) + .await?; - let mut query = asset_data::Entity::insert(data) - .on_conflict( - OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; - // Insert into `asset` table. + // Upsert `asset` table base info. let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None } else { Some(delegate.to_bytes().to_vec()) }; - let tree_id = bundle.keys.get(3).unwrap().0.to_vec(); - - // Set initial mint info. - let asset_model = asset::ActiveModel { - id: Set(id_bytes.to_vec()), - owner_type: Set(OwnerType::Single), - frozen: Set(false), - tree_id: Set(Some(tree_id.clone())), - specification_version: Set(Some(SpecificationVersions::V1)), - specification_asset_class: Set(Some(SpecificationAssetClass::Nft)), - nonce: Set(Some(nonce as i64)), - royalty_target_type: Set(RoyaltyTargetType::Creators), - royalty_target: Set(None), - royalty_amount: Set(metadata.seller_fee_basis_points as i32), //basis points - asset_data: Set(Some(id_bytes.to_vec())), - slot_updated: Set(Some(slot_i)), - ..Default::default() - }; - // Upsert asset table base info. - let mut query = asset::Entity::insert(asset_model) - .on_conflict( - OnConflict::columns([asset::Column::Id]) - .update_columns([ - asset::Column::OwnerType, - asset::Column::Frozen, - asset::Column::SpecificationVersion, - asset::Column::SpecificationAssetClass, - asset::Column::RoyaltyTargetType, - asset::Column::RoyaltyTarget, - asset::Column::RoyaltyAmount, - asset::Column::AssetData, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; - // Do not overwrite changes that happened after the asset was decompressed. - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated OR asset.slot_updated IS NULL", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + // Upsert `asset` table base info and `asset_creators` table. + upsert_asset_base_info( + txn, + id_bytes.to_vec(), + OwnerType::Single, + false, + SpecificationVersions::V1, + SpecificationAssetClass::Nft, + RoyaltyTargetType::Creators, + None, + metadata.seller_fee_basis_points as i32, + slot_i, + seq as i64, + ) + .await?; // Partial update of asset table with just compression info elements. upsert_asset_with_compression_info( - txn, + &multi_txn, id_bytes.to_vec(), true, false, 1, None, - false, ) .await?; // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce as i64, - tree_id, + tree_id.to_vec(), le.leaf_hash.to_vec(), le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner.to_bytes().to_vec(), delegate, @@ -211,123 +161,30 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - - let attachment = asset_v1_account_attachments::ActiveModel { - id: Set(edition_attachment_address.to_bytes().to_vec()), - slot_updated: Set(slot_i), - attachment_type: Set(V1AccountAttachments::MasterEditionV2), - ..Default::default() - }; - - let query = asset_v1_account_attachments::Entity::insert(attachment) - .on_conflict( - OnConflict::columns([asset_v1_account_attachments::Column::Id]) - .do_nothing() - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; - - // Insert into `asset_creators` table. - let creators = &metadata.creators; - if !creators.is_empty() { - // Vec to hold base creator information. - let mut db_creator_infos = Vec::with_capacity(creators.len()); - - // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. - let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); - - // Set to prevent duplicates. - let mut creators_set = HashSet::new(); + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; - for (i, c) in creators.iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - db_creator_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - position: Set(i as i16), - share: Set(c.share as i32), - slot_updated: Set(Some(slot_i)), - ..Default::default() - }); + multi_txn.commit().await?; - db_creator_verified_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - verified: Set(c.verified), - seq: Set(Some(seq as i64)), - ..Default::default() - }); - - creators_set.insert(c.address); - } - - // This statement will update base information for each creator. - let query = asset_creators::Entity::insert_many(db_creator_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Position, - asset_creators::Column::Share, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - - // This statement will update whether the creator is verified and the `seq` - // number. `seq` is used to protect the `verified` field, allowing for `mint` - // and `verifyCreator` to be processed out of order. - let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::Seq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.seq > asset_creators.seq OR asset_creators.seq IS NULL", - query.sql - ); - txn.execute(query).await?; - } + // Upsert creators to `asset_creators` table. + upsert_asset_creators( + txn, + id_bytes.to_vec(), + &metadata.creators, + slot_i, + seq as i64, + ) + .await?; // Insert into `asset_authority` table. - let model = asset_authority::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - authority: Set(bundle.keys.get(0).unwrap().0.to_vec()), //TODO - we need to rem,ove the optional bubblegum signer logic - seq: Set(seq as i64), - slot_updated: Set(slot_i), - ..Default::default() - }; - - // Do not attempt to modify any existing values: - // `ON CONFLICT ('asset_id') DO NOTHING`. - let query = asset_authority::Entity::insert(model) - .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) - .do_nothing() - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + //TODO - we need to remove the optional bubblegum signer logic + upsert_asset_authority( + txn, + id_bytes.to_vec(), + authority.to_vec(), + seq as i64, + slot_i, + ) + .await?; // Upsert into `asset_grouping` table with base collection info. upsert_collection_info( @@ -349,7 +206,7 @@ where let mut task = DownloadMetadata { asset_data_id: id_bytes.to_vec(), - uri: metadata.uri.clone(), + uri, created_at: Some(Utc::now().naive_utc()), }; task.sanitize(); diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index bcc102c0b..a034080ff 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -17,6 +17,7 @@ mod delegate; mod mint_v1; mod redeem; mod transfer; +mod update_metadata; pub use db::*; @@ -53,7 +54,8 @@ where InstructionName::VerifyCollection => "VerifyCollection", InstructionName::UnverifyCollection => "UnverifyCollection", InstructionName::SetAndVerifyCollection => "SetAndVerifyCollection", - InstructionName::SetDecompressibleState | InstructionName::UpdateMetadata => todo!(), + InstructionName::SetDecompressibleState => "SetDecompressibleState", + InstructionName::UpdateMetadata => "UpdateMetadata", }; info!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id); @@ -83,17 +85,23 @@ where InstructionName::DecompressV1 => { decompress::decompress(parsing_result, bundle, txn).await?; } - InstructionName::VerifyCreator => { - creator_verification::process(parsing_result, bundle, txn, true, cl_audits).await?; - } - InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, false, cl_audits).await?; + InstructionName::VerifyCreator | InstructionName::UnverifyCreator => { + creator_verification::process(parsing_result, bundle, txn, cl_audits).await?; } InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { collection_verification::process(parsing_result, bundle, txn, cl_audits).await?; } + InstructionName::SetDecompressibleState => (), // Nothing to index. + InstructionName::UpdateMetadata => { + let task = + update_metadata::update_metadata(parsing_result, bundle, txn, cl_audits).await?; + + if let Some(t) = task { + task_manager.send(t)?; + } + } _ => debug!("Bubblegum: Not Implemented Instruction"), } Ok(()) diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index b9b7f2c27..484f5cdc9 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -35,9 +35,14 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -45,11 +50,12 @@ where [0; 32], [0; 32], seq as i64, - false, ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; return Ok(()); } diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 573f33a8f..230167991 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -23,8 +23,8 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - #[allow(unreachable_patterns)] - return match le.schema { + + match le.schema { LeafSchema::V1 { id, owner, @@ -41,9 +41,14 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -51,13 +56,12 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -65,9 +69,13 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); } - }; + } } Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs new file mode 100644 index 000000000..393cdbbd2 --- /dev/null +++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs @@ -0,0 +1,207 @@ +use crate::{ + error::IngesterError, + program_transformers::bubblegum::{ + save_changelog_event, upsert_asset_base_info, upsert_asset_creators, upsert_asset_data, + upsert_asset_with_leaf_info, upsert_asset_with_seq, + }, + tasks::{DownloadMetadata, IntoTaskData, TaskData}, +}; +use blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + token_metadata::state::{TokenStandard, UseMethod, Uses}, +}; +use chrono::Utc; +use digital_asset_types::{ + dao::sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, + }, + json::ChainDataV1, +}; +use log::warn; +use num_traits::FromPrimitive; +use sea_orm::{query::*, ConnectionTrait, JsonValue}; + +pub async fn update_metadata<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + cl_audits: bool, +) -> Result, IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + if let ( + Some(le), + Some(cl), + Some(Payload::UpdateMetadata { + current_metadata, + update_args, + tree_id, + }), + ) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + + #[allow(unreachable_patterns)] + return match le.schema { + LeafSchema::V1 { id, nonce, .. } => { + let id_bytes = id.to_bytes(); + let slot_i = bundle.slot as i64; + + let uri = if let Some(uri) = &update_args.uri { + uri.replace('\0', "") + } else { + current_metadata.uri.replace('\0', "") + }; + if uri.is_empty() { + return Err(IngesterError::DeserializationError( + "URI is empty".to_string(), + )); + } + + let name = if let Some(name) = update_args.name.clone() { + name + } else { + current_metadata.name.clone() + }; + + let symbol = if let Some(symbol) = update_args.symbol.clone() { + symbol + } else { + current_metadata.symbol.clone() + }; + + let primary_sale_happened = + if let Some(primary_sale_happened) = update_args.primary_sale_happened { + primary_sale_happened + } else { + current_metadata.primary_sale_happened + }; + + let mut chain_data = ChainDataV1 { + name: name.clone(), + symbol: symbol.clone(), + edition_nonce: current_metadata.edition_nonce, + primary_sale_happened, + token_standard: Some(TokenStandard::NonFungible), + uses: current_metadata.uses.clone().map(|u| Uses { + use_method: UseMethod::from_u8(u.use_method as u8).unwrap(), + remaining: u.remaining, + total: u.total, + }), + }; + chain_data.sanitize(); + let chain_data_json = serde_json::to_value(chain_data) + .map_err(|e| IngesterError::DeserializationError(e.to_string()))?; + + let is_mutable = if let Some(is_mutable) = update_args.is_mutable { + is_mutable + } else { + current_metadata.is_mutable + }; + + let chain_mutability = if is_mutable { + ChainMutability::Mutable + } else { + ChainMutability::Immutable + }; + + upsert_asset_data( + txn, + id_bytes.to_vec(), + chain_mutability, + chain_data_json, + uri.clone(), + Mutability::Mutable, + JsonValue::String("processing".to_string()), + slot_i, + Some(true), + name.into_bytes().to_vec(), + symbol.into_bytes().to_vec(), + seq as i64, + ) + .await?; + + // Upsert `asset` table base info. + let seller_fee_basis_points = + if let Some(seller_fee_basis_points) = update_args.seller_fee_basis_points { + seller_fee_basis_points + } else { + current_metadata.seller_fee_basis_points + }; + + let creators = if let Some(creators) = &update_args.creators { + creators + } else { + ¤t_metadata.creators + }; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + upsert_asset_base_info( + txn, + id_bytes.to_vec(), + OwnerType::Single, + false, + SpecificationVersions::V1, + SpecificationAssetClass::Nft, + RoyaltyTargetType::Creators, + None, + seller_fee_basis_points as i32, + slot_i, + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce as i64, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + // Upsert creators to `asset_creators` table. + upsert_asset_creators(txn, id_bytes.to_vec(), creators, slot_i, seq as i64).await?; + + if uri.is_empty() { + warn!( + "URI is empty for mint {}. Skipping background task.", + bs58::encode(id).into_string() + ); + return Ok(None); + } + + let mut task = DownloadMetadata { + asset_data_id: id_bytes.to_vec(), + uri, + created_at: Some(Utc::now().naive_utc()), + }; + task.sanitize(); + let t = task.into_task_data()?; + Ok(Some(t)) + } + _ => Err(IngesterError::NotImplemented), + }; + } + Err(IngesterError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index 061a2ac9b..74a52f29f 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -23,7 +23,7 @@ use num_traits::FromPrimitive; use plerkle_serialization::Pubkey as FBPubkey; use sea_orm::{ entity::*, query::*, sea_query::OnConflict, ActiveValue::Set, ConnectionTrait, DbBackend, - DbErr, EntityTrait, FromQueryResult, JoinType, JsonValue, + DbErr, EntityTrait, JsonValue, }; use std::collections::HashSet; @@ -147,8 +147,9 @@ pub async fn save_v1_asset( slot_updated: Set(slot_i), reindex: Set(Some(true)), id: Set(id.to_vec()), - raw_name: Set(name.to_vec()), - raw_symbol: Set(symbol.to_vec()), + raw_name: Set(Some(name.to_vec())), + raw_symbol: Set(Some(symbol.to_vec())), + base_info_seq: Set(Some(0)), }; let txn = conn.begin().await?; let mut query = asset_data::Entity::insert(asset_data_model) @@ -163,6 +164,7 @@ pub async fn save_v1_asset( asset_data::Column::Reindex, asset_data::Column::RawName, asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, ]) .to_owned(), ) @@ -188,6 +190,8 @@ pub async fn save_v1_asset( nonce: Set(Some(0)), seq: Set(Some(0)), leaf: Set(None), + data_hash: Set(None), + creator_hash: Set(None), compressed: Set(false), compressible: Set(false), royalty_target_type: Set(RoyaltyTargetType::Creators), @@ -274,13 +278,14 @@ pub async fn save_v1_asset( txn.execute(query) .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + if let Some(c) = &metadata.collection { let model = asset_grouping::ActiveModel { asset_id: Set(id.to_vec()), group_key: Set("collection".to_string()), group_value: Set(Some(c.key.to_string())), - verified: Set(Some(c.verified)), - seq: Set(None), + verified: Set(c.verified), + group_info_seq: Set(Some(0)), slot_updated: Set(Some(slot_i)), ..Default::default() }; @@ -291,10 +296,10 @@ pub async fn save_v1_asset( asset_grouping::Column::GroupKey, ]) .update_columns([ - asset_grouping::Column::GroupKey, asset_grouping::Column::GroupValue, - asset_grouping::Column::Seq, + asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, ]) .to_owned(), ) @@ -309,70 +314,54 @@ pub async fn save_v1_asset( } txn.commit().await?; let creators = data.creators.unwrap_or_default(); + if !creators.is_empty() { let mut creators_set = HashSet::new(); - let existing_creators: Vec = asset_creators::Entity::find() - .filter( - Condition::all() - .add(asset_creators::Column::AssetId.eq(id.to_vec())) - .add(asset_creators::Column::SlotUpdated.lt(slot_i)), - ) - .all(conn) - .await?; - if !existing_creators.is_empty() { - let mut db_creators = Vec::with_capacity(creators.len()); - for (i, c) in creators.into_iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - db_creators.push(asset_creators::ActiveModel { - asset_id: Set(id.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - share: Set(c.share as i32), - verified: Set(c.verified), - seq: Set(Some(0)), - slot_updated: Set(Some(slot_i)), - position: Set(i as i16), - ..Default::default() - }); - creators_set.insert(c.address); + let mut db_creators = Vec::with_capacity(creators.len()); + for (i, c) in creators.into_iter().enumerate() { + if creators_set.contains(&c.address) { + continue; } - let txn = conn.begin().await?; - asset_creators::Entity::delete_many() - .filter( - Condition::all() - .add(asset_creators::Column::AssetId.eq(id.to_vec())) - .add(asset_creators::Column::SlotUpdated.lt(slot_i)), + db_creators.push(asset_creators::ActiveModel { + asset_id: Set(id.to_vec()), + position: Set(i as i16), + creator: Set(c.address.to_bytes().to_vec()), + share: Set(c.share as i32), + verified: Set(c.verified), + slot_updated: Set(Some(slot_i)), + seq: Set(Some(0)), + ..Default::default() + }); + creators_set.insert(c.address); + } + + let txn = conn.begin().await?; + if !db_creators.is_empty() { + let mut query = asset_creators::Entity::insert_many(db_creators) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Position, + ]) + .update_columns([ + asset_creators::Column::Creator, + asset_creators::Column::Share, + asset_creators::Column::Verified, + asset_creators::Column::SlotUpdated, + asset_creators::Column::Seq, + ]) + .to_owned(), ) - .exec(&txn) - .await?; - if db_creators.len() > 0 { - let mut query = asset_creators::Entity::insert_many(db_creators) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Position, - ]) - .update_columns([ - asset_creators::Column::Creator, - asset_creators::Column::Share, - asset_creators::Column::Verified, - asset_creators::Column::Seq, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_creators.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; - } - txn.commit().await?; + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_creators.slot_updated", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; } + txn.commit().await?; } if uri.is_empty() { warn!( diff --git a/nft_ingester/src/tasks/common/mod.rs b/nft_ingester/src/tasks/common/mod.rs index bf39e455d..c10fd9ee3 100644 --- a/nft_ingester/src/tasks/common/mod.rs +++ b/nft_ingester/src/tasks/common/mod.rs @@ -75,6 +75,11 @@ impl DownloadMetadataTask { } } +#[derive(FromQueryResult, Debug, Default, Clone, Eq, PartialEq)] +struct MetadataUrl { + pub metadata_url: String, +} + #[async_trait] impl BgTask for DownloadMetadataTask { fn name(&self) -> &'static str { @@ -106,6 +111,30 @@ impl BgTask for DownloadMetadataTask { } _ => serde_json::Value::String("Invalid Uri".to_string()), //TODO -> enumize this. }; + + let query = asset_data::Entity::find_by_id(download_metadata.asset_data_id.clone()) + .select_only() + .column(asset_data::Column::MetadataUrl) + .build(DbBackend::Postgres); + + match MetadataUrl::find_by_statement(query).one(db).await? { + Some(asset) => { + if asset.metadata_url != download_metadata.uri { + debug!( + "skipping download metadata of old URI for {:?}", + bs58::encode(download_metadata.asset_data_id.clone()).into_string() + ); + return Ok(()); + } + } + None => { + return Err(IngesterError::UnrecoverableTaskError(format!( + "failed to find URI in database for {:?}", + bs58::encode(download_metadata.asset_data_id.clone()).into_string() + ))); + } + } + let model = asset_data::ActiveModel { id: Unchanged(download_metadata.asset_data_id.clone()), metadata: Set(body), @@ -118,6 +147,10 @@ impl BgTask for DownloadMetadataTask { ); asset_data::Entity::update(model) .filter(asset_data::Column::Id.eq(download_metadata.asset_data_id.clone())) + .filter( + Condition::all() + .add(asset_data::Column::MetadataUrl.eq(download_metadata.uri.clone())), + ) .exec(db) .await .map(|_| ()) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 469626eac..8142c3012 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.70.0" \ No newline at end of file +channel = "1.73.0" diff --git a/tools/acc_forwarder/Cargo.toml b/tools/acc_forwarder/Cargo.toml index 04e0ee842..96d5d4eec 100644 --- a/tools/acc_forwarder/Cargo.toml +++ b/tools/acc_forwarder/Cargo.toml @@ -24,6 +24,6 @@ solana-account-decoder = "~1.16.16" solana-client = "~1.16.16" solana-sdk = "~1.16.16" solana-transaction-status = "~1.16.16" -spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } +spl-token = { version = "4.0.0", features = ["no-entrypoint"] } tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread", "time"] } txn_forwarder = { path = "../txn_forwarder" } diff --git a/tools/fetch_trees/Cargo.toml b/tools/fetch_trees/Cargo.toml index 4035014ca..9461e85b5 100644 --- a/tools/fetch_trees/Cargo.toml +++ b/tools/fetch_trees/Cargo.toml @@ -9,7 +9,7 @@ anyhow = "1.0.70" async-trait = "0.1.53" borsh = "~0.10.3" clap = { version = "4.2.2", features = ["derive", "cargo"] } -mpl-bubblegum = "1.0.1-beta.3" +mpl-bubblegum = "=1.0.1-beta.4" solana-account-decoder = "~1.16.16" solana-client = "~1.16.16" solana-sdk = "~1.16.16" diff --git a/tools/load_generation/Cargo.toml b/tools/load_generation/Cargo.toml index 2f9452300..e0fb82ace 100644 --- a/tools/load_generation/Cargo.toml +++ b/tools/load_generation/Cargo.toml @@ -12,5 +12,5 @@ solana-client = "~1.16.16" solana-program = "~1.16.16" solana-sdk = "~1.16.16" spl-associated-token-account = { version = ">= 1.1.3, < 3.0", features = ["no-entrypoint"] } -spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } +spl-token = { version = "4.0.0", features = ["no-entrypoint"] } tokio = { version ="1.25.0", features = ["macros", "rt-multi-thread"] }