Skip to content

Commit

Permalink
Add simple check for whether asset was decompressed to Bubblegum tran…
Browse files Browse the repository at this point in the history
…sformers
  • Loading branch information
danenbm committed Oct 19, 2023
1 parent bdf8e3c commit c826583
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 69 deletions.
3 changes: 3 additions & 0 deletions nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
// Note: We do not check whether the asset has been decompressed here becuase we know if it
// was burned then it could not have been decompressed later.

let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
save_changelog_event, upsert_asset_with_leaf_info,
asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info,
upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
},
};
Expand Down Expand Up @@ -31,6 +31,12 @@ where
..
} => {
let id_bytes = id.to_bytes();

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(());
}

let owner_bytes = owner.to_bytes().to_vec();
let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
None
Expand All @@ -50,7 +56,6 @@ where
le.schema.data_hash(),
le.schema.creator_hash(),
seq as i64,
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::program_transformers::bubblegum::{upsert_asset_with_seq, upsert_collection_info};
use crate::program_transformers::bubblegum::{
asset_was_decompressed, upsert_asset_with_seq, upsert_collection_info,
};
use blockbuster::{
instruction::InstructionBundle,
programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload},
Expand Down Expand Up @@ -41,6 +43,12 @@ where
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(());
}

let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

Expand All @@ -54,7 +62,6 @@ where
le.schema.data_hash(),
le.schema.creator_hash(),
seq as i64,
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
save_changelog_event, upsert_asset_with_leaf_info,
asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info,
upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified,
},
};
Expand Down Expand Up @@ -51,6 +51,12 @@ where
..
} => {
let id_bytes = id.to_bytes();

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(());
}

let owner_bytes = owner.to_bytes().to_vec();
let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
None
Expand All @@ -70,7 +76,6 @@ where
le.schema.data_hash(),
le.schema.creator_hash(),
seq as i64,
false,
)
.await?;

Expand Down
60 changes: 29 additions & 31 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
..Default::default()
};

let mut audit_item: Option<cl_audits::ActiveModel> = if (cl_audits) {
let audit_item: Option<cl_audits::ActiveModel> = if cl_audits {
let mut ai: cl_audits::ActiveModel = item.clone().into();
ai.tx = Set(txn_id.to_string());
Some(ai)
Expand Down Expand Up @@ -144,7 +144,6 @@ pub async fn upsert_asset_with_leaf_info<T>(
data_hash: [u8; 32],
creator_hash: [u8; 32],
seq: i64,
was_decompressed: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -169,22 +168,19 @@ where
asset::Column::Nonce,
asset::Column::TreeId,
asset::Column::Leaf,
asset::Column::LeafSeq,
asset::Column::DataHash,
asset::Column::CreatorHash,
asset::Column::LeafSeq,
])
.to_owned(),
)
.build(DbBackend::Postgres);

// If we are indexing decompression we will update the leaf regardless of if we have previously
// indexed decompression and regardless of seq.
if !was_decompressed {
query.sql = format!(
"{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)",
query.sql
);
}
// If the asset was decompressed, don't update the leaf info since we cleared it during decompression.
query.sql = format!(
"{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)",
query.sql
);

txn.execute(query)
.await
Expand All @@ -202,26 +198,25 @@ where
{
let model = asset::ActiveModel {
id: Set(id),
leaf: Set(None),
nonce: Set(Some(0)),
leaf_seq: Set(None),
tree_id: Set(None),
leaf: Set(None),
data_hash: Set(None),
creator_hash: Set(None),
tree_id: Set(None),
seq: Set(Some(0)),
leaf_seq: Set(None),
..Default::default()
};

let query = asset::Entity::insert(model)
.on_conflict(
OnConflict::column(asset::Column::Id)
.update_columns([
asset::Column::Leaf,
asset::Column::LeafSeq,
asset::Column::Nonce,
asset::Column::TreeId,
asset::Column::Leaf,
asset::Column::DataHash,
asset::Column::CreatorHash,
asset::Column::TreeId,
asset::Column::Seq,
asset::Column::LeafSeq,
])
.to_owned(),
)
Expand Down Expand Up @@ -474,13 +469,6 @@ where
//seq: Set(seq),
};

// First check to see if this asset has been decompressed.
if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? {
if let Some(0) = asset.seq {
return Ok(());
}
};

let mut query = asset_data::Entity::insert(model)
.on_conflict(
OnConflict::columns([asset_data::Column::Id])
Expand All @@ -503,7 +491,6 @@ where
)
.build(DbBackend::Postgres);
query.sql = format!(
// New asset_data.seq.
"{} WHERE excluded.seq >= asset_data.seq OR asset_data.seq IS NULL)",
query.sql
);
Expand All @@ -518,13 +505,13 @@ pub async fn upsert_asset_with_royalty_amount<T>(
txn: &T,
id: Vec<u8>,
royalty_amount: i32,
seq: i64,
_seq: i64,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
let model = asset::ActiveModel {
id: Set(id),
id: Set(id.clone()),
royalty_amount: Set(royalty_amount),
//royalty_amount_seq: Set(Some(seq)),
..Default::default()
Expand All @@ -542,8 +529,7 @@ where
.build(DbBackend::Postgres);

query.sql = format!(
// TODO DEAL WITH THIS
"{} WHERE (NOT asset.was_decompressed) AND (excluded.royalty_amount_seq >= asset.royalty_amount_seq OR royalty_amount_seq.seq IS NULL)",
"{} WHERE excluded.royalty_amount_seq >= asset.royalty_amount_seq OR royalty_amount_seq.seq IS NULL)",
query.sql
);

Expand All @@ -553,3 +539,15 @@ where

Ok(())
}

pub async fn asset_was_decompressed<T>(txn: &T, id: Vec<u8>) -> Result<bool, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? {
if let Some(0) = asset.seq {
return Ok(true);
}
};
Ok(false)
}
12 changes: 9 additions & 3 deletions nft_ingester/src/program_transformers/bubblegum/decompress.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::upsert_asset_with_leaf_info_for_decompression,
program_transformers::bubblegum::{
asset_was_decompressed, upsert_asset_with_compression_info,
upsert_asset_with_leaf_info_for_decompression,
},
};
use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction};
use sea_orm::{query::*, ConnectionTrait};

use super::upsert_asset_with_compression_info;

pub async fn decompress<'c, T>(
_parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
Expand All @@ -17,6 +18,11 @@ where
{
let id_bytes = bundle.keys.get(3).unwrap().0.as_slice();

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(());
}

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?;

Expand Down
9 changes: 7 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
save_changelog_event, upsert_asset_with_leaf_info,
asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info,
upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
},
};
Expand Down Expand Up @@ -30,6 +30,12 @@ where
..
} => {
let id_bytes = id.to_bytes();

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(());
}

let owner_bytes = owner.to_bytes().to_vec();
let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
None
Expand All @@ -48,7 +54,6 @@ where
le.schema.data_hash(),
le.schema.creator_hash(),
seq as i64,
false,
)
.await?;

Expand Down
21 changes: 11 additions & 10 deletions nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
save_changelog_event, upsert_asset_data, upsert_asset_with_compression_info,
upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info,
upsert_asset_with_royalty_amount, upsert_asset_with_seq, upsert_collection_info,
asset_was_decompressed, save_changelog_event, upsert_asset_data,
upsert_asset_with_compression_info, upsert_asset_with_leaf_info,
upsert_asset_with_owner_and_delegate_info, upsert_asset_with_royalty_amount,
upsert_asset_with_seq, upsert_collection_info,
},
tasks::{DownloadMetadata, IntoTaskData, TaskData},
};
Expand Down Expand Up @@ -63,6 +64,12 @@ where
} => {
let (edition_attachment_address, _) = find_master_edition_account(&id);
let id_bytes = id.to_bytes();

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(None);
}

let slot_i = bundle.slot as i64;
let uri = metadata.uri.replace('\0', "");
let name = metadata.name.clone().into_bytes();
Expand Down Expand Up @@ -128,7 +135,7 @@ where
};

// Upsert asset table base info.
let mut query = asset::Entity::insert(asset_model)
let query = asset::Entity::insert(asset_model)
.on_conflict(
OnConflict::columns([asset::Column::Id])
.update_columns([
Expand All @@ -144,11 +151,6 @@ where
)
.build(DbBackend::Postgres);

// Do not overwrite changes that happened after the asset was decompressed.
query.sql = format!(
"{} WHERE excluded.slot_updated > asset.slot_updated OR asset.slot_updated IS NULL",
query.sql
);
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
Expand Down Expand Up @@ -183,7 +185,6 @@ where
le.schema.data_hash(),
le.schema.creator_hash(),
seq as i64,
false,
)
.await?;

Expand Down
10 changes: 8 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use log::debug;
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, upsert_asset_with_seq,
asset_was_decompressed, save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info,
upsert_asset_with_seq,
},
};
use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction};
Expand Down Expand Up @@ -32,6 +33,12 @@ where
);
debug!("Indexing redeem for asset id: {:?}", asset_id);
let id_bytes = asset_id.to_bytes();

// First check to see if this asset has been decompressed and if so do not update.
if asset_was_decompressed(txn, id_bytes.to_vec()).await? {
return Ok(());
}

let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

Expand All @@ -45,7 +52,6 @@ where
[0; 32],
[0; 32],
seq as i64,
false,
)
.await?;

Expand Down
Loading

0 comments on commit c826583

Please sign in to comment.