diff --git a/Cargo.lock b/Cargo.lock
index 7e33acdb9..8ea778e95 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -881,9 +881,9 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
 
 [[package]]
 name = "blockbuster"
-version = "0.9.0-beta.1"
+version = "0.9.0-beta.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "56e0240c1218958c0d51284d783fa055f551d769bb8b7a4abf635b17fa9620dc"
+checksum = "3148abe1072ebeb76c9802c0ea3b8af5556dce6219c7cb2b0a156a0a892c613a"
 dependencies = [
  "anchor-lang",
  "async-trait",
diff --git a/README.md b/README.md
index 2784bcad9..e0b4d6d2b 100644
--- a/README.md
+++ b/README.md
@@ -34,7 +34,7 @@ Because this is a multi component system the easiest way to develop or locally t
 #### Regenerating DB Types
 Edit the init.sql, then run `docker compose up db`
 Then with a local `DATABASE_URL` var exported like this `export DATABASE_URL=postgres://solana:solana@localhost/solana` you can run
-` sea-orm-cli generate entity -o ./digital_asset_types/src/dao/generated/ --database-url $DATABASE_URL --with-serde both --expanded-format`
+`sea-orm-cli generate entity -o ./digital_asset_types/src/dao/generated/ --database-url $DATABASE_URL --with-serde both --expanded-format`
 
 If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`.
 
diff --git a/das_api/Cargo.toml b/das_api/Cargo.toml
index c99d8b21a..efbe2e722 100644
--- a/das_api/Cargo.toml
+++ b/das_api/Cargo.toml
@@ -33,7 +33,7 @@ schemars = "0.8.6"
 schemars_derive = "0.8.6"
 open-rpc-derive = { version = "0.0.4"}
 open-rpc-schema = { version = "0.0.4"}
-blockbuster = "0.9.0-beta.1"
+blockbuster = "0.9.0-beta.2"
 anchor-lang = "0.28.0"
 mpl-token-metadata = { version = "=2.0.0-beta.1",  features = ["serde-feature"] }
 mpl-candy-machine-core = { version = "2.0.1", features = ["no-entrypoint"] }
diff --git a/digital_asset_types/Cargo.toml b/digital_asset_types/Cargo.toml
index 8e4b18bb9..aa9ccc199 100644
--- a/digital_asset_types/Cargo.toml
+++ b/digital_asset_types/Cargo.toml
@@ -18,7 +18,7 @@ solana-sdk = "~1.16.16"
 num-traits = "0.2.15"
 num-derive = "0.3.3"
 thiserror = "1.0.31"
-blockbuster = "0.9.0-beta.1"
+blockbuster = "0.9.0-beta.2"
 jsonpath_lib = "0.3.0"
 mime_guess = "2.0.4"
 url = "2.3.1"
diff --git a/digital_asset_types/src/dao/generated/asset.rs b/digital_asset_types/src/dao/generated/asset.rs
index 0ced69299..e6721f428 100644
--- a/digital_asset_types/src/dao/generated/asset.rs
+++ b/digital_asset_types/src/dao/generated/asset.rs
@@ -46,6 +46,7 @@ pub struct Model {
     pub owner_delegate_seq: Option<i64>,
     pub was_decompressed: bool,
     pub leaf_seq: Option<i64>,
+    pub update_metadata_seq: Option<i64>,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
@@ -78,6 +79,7 @@ pub enum Column {
     OwnerDelegateSeq,
     WasDecompressed,
     LeafSeq,
+    UpdateMetadataSeq,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
@@ -133,6 +135,7 @@ impl ColumnTrait for Column {
             Self::OwnerDelegateSeq => ColumnType::BigInteger.def().null(),
             Self::WasDecompressed => ColumnType::Boolean.def(),
             Self::LeafSeq => ColumnType::BigInteger.def().null(),
+            Self::UpdateMetadataSeq => ColumnType::BigInteger.def().null(),
         }
     }
 }
diff --git a/digital_asset_types/src/dao/generated/asset_creators.rs b/digital_asset_types/src/dao/generated/asset_creators.rs
index 21f34dcf7..346ed3b2e 100644
--- a/digital_asset_types/src/dao/generated/asset_creators.rs
+++ b/digital_asset_types/src/dao/generated/asset_creators.rs
@@ -19,7 +19,7 @@ pub struct Model {
     pub creator: Vec<u8>,
     pub share: i32,
     pub verified: bool,
-    pub seq: Option<i64>,
+    pub verified_seq: Option<i64>,
     pub slot_updated: Option<i64>,
     pub position: i16,
 }
@@ -31,7 +31,7 @@ pub enum Column {
     Creator,
     Share,
     Verified,
-    Seq,
+    VerifiedSeq,
     SlotUpdated,
     Position,
 }
@@ -62,7 +62,7 @@ impl ColumnTrait for Column {
             Self::Creator => ColumnType::Binary.def(),
             Self::Share => ColumnType::Integer.def(),
             Self::Verified => ColumnType::Boolean.def(),
-            Self::Seq => ColumnType::BigInteger.def().null(),
+            Self::VerifiedSeq => ColumnType::BigInteger.def().null(),
             Self::SlotUpdated => ColumnType::BigInteger.def().null(),
             Self::Position => ColumnType::SmallInteger.def(),
         }
diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs
index 374ed854a..cdfc254ea 100644
--- a/digital_asset_types/src/dao/generated/asset_data.rs
+++ b/digital_asset_types/src/dao/generated/asset_data.rs
@@ -24,8 +24,9 @@ pub struct Model {
     pub metadata: Json,
     pub slot_updated: i64,
     pub reindex: Option<bool>,
-    pub raw_name: Vec<u8>,
-    pub raw_symbol: Vec<u8>,
+    pub raw_name: Option<Vec<u8>>,
+    pub raw_symbol: Option<Vec<u8>>,
+    pub download_metadata_seq: Option<i64>,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
@@ -40,6 +41,7 @@ pub enum Column {
     Reindex,
     RawName,
     RawSymbol,
+    DownloadMetadataSeq,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
@@ -70,9 +72,10 @@ impl ColumnTrait for Column {
             Self::MetadataMutability => Mutability::db_type(),
             Self::Metadata => ColumnType::JsonBinary.def(),
             Self::SlotUpdated => ColumnType::BigInteger.def(),
-            Self::Reindex => ColumnType::Boolean.def(),
-            Self::RawName => ColumnType::Binary.def(),
-            Self::RawSymbol => ColumnType::Binary.def(),
+            Self::Reindex => ColumnType::Boolean.def().null(),
+            Self::RawName => ColumnType::Binary.def().null(),
+            Self::RawSymbol => ColumnType::Binary.def().null(),
+            Self::DownloadMetadataSeq => ColumnType::BigInteger.def().null(),
         }
     }
 }
diff --git a/digital_asset_types/src/dao/generated/asset_grouping.rs b/digital_asset_types/src/dao/generated/asset_grouping.rs
index aae51d6d8..5d5c0e749 100644
--- a/digital_asset_types/src/dao/generated/asset_grouping.rs
+++ b/digital_asset_types/src/dao/generated/asset_grouping.rs
@@ -1,4 +1,4 @@
-//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3
+//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
 
 use sea_orm::entity::prelude::*;
 use serde::{Deserialize, Serialize};
@@ -12,7 +12,7 @@ impl EntityName for Entity {
     }
 }
 
-#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)]
+#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)]
 pub struct Model {
     pub id: i64,
     pub asset_id: Vec<u8>,
@@ -20,7 +20,7 @@ pub struct Model {
     pub group_value: Option<String>,
     pub seq: Option<i64>,
     pub slot_updated: Option<i64>,
-    pub verified: Option<bool>,
+    pub verified: bool,
     pub group_info_seq: Option<i64>,
 }
 
diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs
index a07714202..0d02b7769 100644
--- a/digital_asset_types/src/dao/generated/cl_audits.rs
+++ b/digital_asset_types/src/dao/generated/cl_audits.rs
@@ -22,7 +22,7 @@ pub struct Model {
     pub seq: i64,
     pub level: i64,
     pub hash: Vec<u8>,
-    pub created_at: Option<DateTime>,
+    pub created_at: DateTime,
     pub tx: String,
 }
 
diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
index 2be0283e7..eadaa037b 100644
--- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
+++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
@@ -3,62 +3,6 @@
 use sea_orm::entity::prelude::*;
 use serde::{Deserialize, Serialize};
 
-#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
-#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")]
-pub enum Mutability {
-    #[sea_orm(string_value = "immutable")]
-    Immutable,
-    #[sea_orm(string_value = "mutable")]
-    Mutable,
-    #[sea_orm(string_value = "unknown")]
-    Unknown,
-}
-#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
-#[sea_orm(
-    rs_type = "String",
-    db_type = "Enum",
-    enum_name = "v1_account_attachments"
-)]
-pub enum V1AccountAttachments {
-    #[sea_orm(string_value = "edition")]
-    Edition,
-    #[sea_orm(string_value = "edition_marker")]
-    EditionMarker,
-    #[sea_orm(string_value = "master_edition_v1")]
-    MasterEditionV1,
-    #[sea_orm(string_value = "master_edition_v2")]
-    MasterEditionV2,
-    #[sea_orm(string_value = "unknown")]
-    Unknown,
-}
-#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
-#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
-pub enum TaskStatus {
-    #[sea_orm(string_value = "failed")]
-    Failed,
-    #[sea_orm(string_value = "pending")]
-    Pending,
-    #[sea_orm(string_value = "running")]
-    Running,
-    #[sea_orm(string_value = "success")]
-    Success,
-}
-#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
-#[sea_orm(
-    rs_type = "String",
-    db_type = "Enum",
-    enum_name = "royalty_target_type"
-)]
-pub enum RoyaltyTargetType {
-    #[sea_orm(string_value = "creators")]
-    Creators,
-    #[sea_orm(string_value = "fanout")]
-    Fanout,
-    #[sea_orm(string_value = "single")]
-    Single,
-    #[sea_orm(string_value = "unknown")]
-    Unknown,
-}
 #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
 #[sea_orm(
     rs_type = "String",
@@ -98,6 +42,24 @@ pub enum ChainMutability {
     Unknown,
 }
 #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
+#[sea_orm(
+    rs_type = "String",
+    db_type = "Enum",
+    enum_name = "v1_account_attachments"
+)]
+pub enum V1AccountAttachments {
+    #[sea_orm(string_value = "edition")]
+    Edition,
+    #[sea_orm(string_value = "edition_marker")]
+    EditionMarker,
+    #[sea_orm(string_value = "master_edition_v1")]
+    MasterEditionV1,
+    #[sea_orm(string_value = "master_edition_v2")]
+    MasterEditionV2,
+    #[sea_orm(string_value = "unknown")]
+    Unknown,
+}
+#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
 #[sea_orm(
     rs_type = "String",
     db_type = "Enum",
@@ -114,6 +76,32 @@ pub enum SpecificationVersions {
     V2,
 }
 #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
+#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")]
+pub enum Mutability {
+    #[sea_orm(string_value = "immutable")]
+    Immutable,
+    #[sea_orm(string_value = "mutable")]
+    Mutable,
+    #[sea_orm(string_value = "unknown")]
+    Unknown,
+}
+#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
+#[sea_orm(
+    rs_type = "String",
+    db_type = "Enum",
+    enum_name = "royalty_target_type"
+)]
+pub enum RoyaltyTargetType {
+    #[sea_orm(string_value = "creators")]
+    Creators,
+    #[sea_orm(string_value = "fanout")]
+    Fanout,
+    #[sea_orm(string_value = "single")]
+    Single,
+    #[sea_orm(string_value = "unknown")]
+    Unknown,
+}
+#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
 #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")]
 pub enum OwnerType {
     #[sea_orm(string_value = "single")]
@@ -123,3 +111,15 @@ pub enum OwnerType {
     #[sea_orm(string_value = "unknown")]
     Unknown,
 }
+#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
+#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
+pub enum TaskStatus {
+    #[sea_orm(string_value = "failed")]
+    Failed,
+    #[sea_orm(string_value = "pending")]
+    Pending,
+    #[sea_orm(string_value = "running")]
+    Running,
+    #[sea_orm(string_value = "success")]
+    Success,
+}
diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs
index bbe3cf509..95a85f518 100644
--- a/digital_asset_types/tests/common.rs
+++ b/digital_asset_types/tests/common.rs
@@ -83,8 +83,9 @@ pub fn create_asset_data(
             metadata: JsonValue::String("processing".to_string()),
             slot_updated: 0,
             reindex: None,
-            raw_name: metadata.name.into_bytes().to_vec().clone(),
-            raw_symbol: metadata.symbol.into_bytes().to_vec().clone(),
+            raw_name: Some(metadata.name.into_bytes().to_vec().clone()),
+            raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()),
+            download_metadata_seq: Some(0),
         },
     )
 }
@@ -157,6 +158,7 @@ pub fn create_asset(
             owner_delegate_seq: Some(0),
             was_decompressed: false,
             leaf_seq: Some(0),
+            update_metadata_seq: Some(0),
         },
     )
 }
@@ -182,7 +184,7 @@ pub fn create_asset_creator(
             creator,
             share,
             verified,
-            seq: Some(0),
+            verified_seq: Some(0),
             slot_updated: Some(0),
             position: 0,
         },
@@ -231,7 +233,7 @@ pub fn create_asset_grouping(
             id: row_num,
             group_key: "collection".to_string(),
             slot_updated: Some(0),
-            verified: Some(false),
+            verified: false,
             group_info_seq: Some(0),
         },
     )
diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs
index 765f14bc6..77d5d30c1 100644
--- a/digital_asset_types/tests/json_parsing.rs
+++ b/digital_asset_types/tests/json_parsing.rs
@@ -34,8 +34,9 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content {
         metadata: json,
         slot_updated: 0,
         reindex: None,
-        raw_name: String::from("Handalf").into_bytes().to_vec(),
-        raw_symbol: String::from("").into_bytes().to_vec(),
+        raw_name: Some(String::from("Handalf").into_bytes().to_vec()),
+        raw_symbol: Some(String::from("").into_bytes().to_vec()),
+        download_metadata_seq: Some(0),
     };
 
     v1_content_from_json(&asset_data).unwrap()
diff --git a/migration/src/lib.rs b/migration/src/lib.rs
index 7e38ac93d..7b4be4523 100644
--- a/migration/src/lib.rs
+++ b/migration/src/lib.rs
@@ -30,6 +30,7 @@ mod m20230724_120101_add_group_info_seq;
 mod m20230726_013107_remove_not_null_constraint_from_group_value;
 mod m20230918_182123_add_raw_name_symbol;
 mod m20230919_072154_cl_audits;
+mod m20231019_120101_add_seq_numbers_bgum_update_metadata;
 
 pub struct Migrator;
 
@@ -67,6 +68,7 @@ impl MigratorTrait for Migrator {
             Box::new(m20230726_013107_remove_not_null_constraint_from_group_value::Migration),
             Box::new(m20230918_182123_add_raw_name_symbol::Migration),
             Box::new(m20230919_072154_cl_audits::Migration),
+            Box::new(m20231019_120101_add_seq_numbers_bgum_update_metadata::Migration),
         ]
     }
 }
diff --git a/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs b/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs
new file mode 100644
index 000000000..bce6b5c9d
--- /dev/null
+++ b/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs
@@ -0,0 +1,79 @@
+use digital_asset_types::dao::{asset, asset_creators, asset_data};
+use sea_orm_migration::{
+    prelude::*,
+    sea_orm::{ConnectionTrait, DatabaseBackend, Statement},
+};
+
+#[derive(DeriveMigrationName)]
+pub struct Migration;
+
+#[async_trait::async_trait]
+impl MigrationTrait for Migration {
+    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+        manager
+            .get_connection()
+            .execute(Statement::from_string(
+                DatabaseBackend::Postgres,
+                "
+                ALTER TABLE asset_creators
+                RENAME COLUMN seq to verified_seq;
+                "
+                .to_string(),
+            ))
+            .await?;
+
+        manager
+            .alter_table(
+                Table::alter()
+                    .table(asset_data::Entity)
+                    .add_column(ColumnDef::new(Alias::new("download_metadata_seq")).big_integer())
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .alter_table(
+                Table::alter()
+                    .table(asset::Entity)
+                    .add_column(ColumnDef::new(Alias::new("update_metadata_seq")).big_integer())
+                    .to_owned(),
+            )
+            .await?;
+
+        Ok(())
+    }
+
+    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+        manager
+            .get_connection()
+            .execute(Statement::from_string(
+                DatabaseBackend::Postgres,
+                "
+                ALTER TABLE asset_creators
+                RENAME COLUMN verified_seq to seq;
+                "
+                .to_string(),
+            ))
+            .await?;
+
+        manager
+            .alter_table(
+                Table::alter()
+                    .table(asset_data::Entity)
+                    .drop_column(Alias::new("download_metadata_seq"))
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .alter_table(
+                Table::alter()
+                    .table(asset::Entity)
+                    .drop_column(Alias::new("update_metadata_seq"))
+                    .to_owned(),
+            )
+            .await?;
+
+        Ok(())
+    }
+}
diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml
index 6fb9f998c..c9030e68e 100644
--- a/nft_ingester/Cargo.toml
+++ b/nft_ingester/Cargo.toml
@@ -35,13 +35,13 @@ spl-concurrent-merkle-tree = "0.2.0"
 uuid = "1.0.0"
 async-trait = "0.1.53"
 num-traits = "0.2.15"
-blockbuster = "0.9.0-beta.1"
+blockbuster = "0.9.0-beta.2"
 figment = { version = "0.10.6", features = ["env", "toml", "yaml"] }
 cadence = "0.29.0"
 cadence-macros = "0.29.0"
 solana-sdk = "~1.16.16"
 solana-client = "~1.16.16"
-spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] }
+spl-token = { version = "4.0.0", features = ["no-entrypoint"] }
 solana-transaction-status = "~1.16.16"
 solana-account-decoder = "~1.16.16"
 solana-geyser-plugin-interface = "~1.16.16"
diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs
index 70ddcfcea..adca1324f 100644
--- a/nft_ingester/src/program_transformers/bubblegum/burn.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs
@@ -23,6 +23,9 @@ where
     T: ConnectionTrait + TransactionTrait,
 {
     if let Some(cl) = &parsing_result.tree_update {
+        // Note: We do not check whether the asset has been decompressed here because we know if it
+        // was burned then it could not have been decompressed later.
+
         let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
         let leaf_index = cl.index;
         let (asset_id, _) = Pubkey::find_program_address(
@@ -46,10 +49,7 @@ where
         let query = asset::Entity::insert(asset_model)
             .on_conflict(
                 OnConflict::columns([asset::Column::Id])
-                    .update_columns([
-                        asset::Column::Burnt,
-                        //TODO maybe handle slot updated.
-                    ])
+                    .update_columns([asset::Column::Burnt])
                     .to_owned(),
             )
             .build(DbBackend::Postgres);
diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs
index 1b8f5842a..9b9d83525 100644
--- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs
@@ -1,7 +1,7 @@
 use crate::{
     error::IngesterError,
     program_transformers::bubblegum::{
-        save_changelog_event, upsert_asset_with_leaf_info,
+        asset_should_be_updated, save_changelog_event, upsert_asset_with_leaf_info,
         upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
     },
 };
@@ -31,6 +31,13 @@ where
                 ..
             } => {
                 let id_bytes = id.to_bytes();
+
+                // First check to see if this asset has been decompressed or updated by
+                // `update_metadata`.
+                if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+                    return Ok(());
+                }
+
                 let owner_bytes = owner.to_bytes().to_vec();
                 let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
                     None
@@ -50,7 +57,6 @@ where
                     le.schema.data_hash(),
                     le.schema.creator_hash(),
                     seq as i64,
-                    false,
                 )
                 .await?;
 
diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs
index 7517f1544..0d6dc828d 100644
--- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs
@@ -1,4 +1,6 @@
-use crate::program_transformers::bubblegum::{upsert_asset_with_seq, upsert_collection_info};
+use crate::program_transformers::bubblegum::{
+    asset_should_be_updated, upsert_asset_with_seq, upsert_collection_info,
+};
 use blockbuster::{
     instruction::InstructionBundle,
     programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload},
@@ -26,7 +28,7 @@ where
         let (collection, verify) = match payload {
             Payload::CollectionVerification {
                 collection, verify, ..
-            } => (collection.clone(), verify.clone()),
+            } => (collection, verify),
             _ => {
                 return Err(IngesterError::ParsingError(
                     "Ix not parsed correctly".to_string(),
@@ -41,6 +43,13 @@ where
         let id_bytes = match le.schema {
             LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
         };
+
+        // First check to see if this asset has been decompressed or updated by
+        // `update_metadata`.
+        if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+            return Ok(());
+        }
+
         let tree_id = cl.id.to_bytes();
         let nonce = cl.index as i64;
 
@@ -54,7 +63,6 @@ where
             le.schema.data_hash(),
             le.schema.creator_hash(),
             seq as i64,
-            false,
         )
         .await?;
 
@@ -64,8 +72,8 @@ where
             txn,
             id_bytes.to_vec(),
             Some(Collection {
-                key: collection.clone(),
-                verified: verify,
+                key: *collection,
+                verified: *verify,
             }),
             bundle.slot as i64,
             seq as i64,
diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs
index 134fe89ca..85dece67e 100644
--- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs
@@ -1,7 +1,7 @@
 use crate::{
     error::IngesterError,
     program_transformers::bubblegum::{
-        save_changelog_event, upsert_asset_with_leaf_info,
+        asset_should_be_updated, save_changelog_event, upsert_asset_with_leaf_info,
         upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified,
     },
 };
@@ -51,6 +51,13 @@ where
                 ..
             } => {
                 let id_bytes = id.to_bytes();
+
+                // First check to see if this asset has been decompressed or updated by
+                // `update_metadata`.
+                if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+                    return Ok(());
+                }
+
                 let owner_bytes = owner.to_bytes().to_vec();
                 let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
                     None
@@ -70,7 +77,6 @@ where
                     le.schema.data_hash(),
                     le.schema.creator_hash(),
                     seq as i64,
-                    false,
                 )
                 .await?;
 
diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs
index 7e930abdc..a7e95afdc 100644
--- a/nft_ingester/src/program_transformers/bubblegum/db.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/db.rs
@@ -1,15 +1,19 @@
 use crate::error::IngesterError;
 use digital_asset_types::dao::{
-    asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items,
+    asset, asset_authority, asset_creators, asset_data, asset_grouping,
+    asset_v1_account_attachments, backfill_items, cl_audits, cl_items,
+    sea_orm_active_enums::{
+        ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass,
+        SpecificationVersions, V1AccountAttachments,
+    },
 };
 use log::{debug, info};
-use mpl_bubblegum::types::Collection;
+use mpl_bubblegum::types::{Collection, Creator};
 use sea_orm::{
     query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait,
 };
 use spl_account_compression::events::ChangeLogEventV1;
-
-use std::convert::From;
+use std::collections::HashSet;
 
 pub async fn save_changelog_event<'c, T>(
     change_log_event: &ChangeLogEventV1,
@@ -68,7 +72,7 @@ where
             ..Default::default()
         };
 
-        let mut audit_item: Option<cl_audits::ActiveModel> = if (cl_audits) {
+        let audit_item: Option<cl_audits::ActiveModel> = if cl_audits {
             let mut ai: cl_audits::ActiveModel = item.clone().into();
             ai.tx = Set(txn_id.to_string());
             Some(ai)
@@ -135,6 +139,7 @@ where
     //TODO -> set maximum size of path and break into multiple statements
 }
 
+#[allow(clippy::too_many_arguments)]
 pub async fn upsert_asset_with_leaf_info<T>(
     txn: &T,
     id: Vec<u8>,
@@ -144,7 +149,6 @@ pub async fn upsert_asset_with_leaf_info<T>(
     data_hash: [u8; 32],
     creator_hash: [u8; 32],
     seq: i64,
-    was_decompressed: bool,
 ) -> Result<(), IngesterError>
 where
     T: ConnectionTrait + TransactionTrait,
@@ -169,22 +173,19 @@ where
                     asset::Column::Nonce,
                     asset::Column::TreeId,
                     asset::Column::Leaf,
-                    asset::Column::LeafSeq,
                     asset::Column::DataHash,
                     asset::Column::CreatorHash,
+                    asset::Column::LeafSeq,
                 ])
                 .to_owned(),
         )
         .build(DbBackend::Postgres);
 
-    // If we are indexing decompression we will update the leaf regardless of if we have previously
-    // indexed decompression and regardless of seq.
-    if !was_decompressed {
-        query.sql = format!(
-            "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)",
-            query.sql
-        );
-    }
+    // If the asset was decompressed, don't update the leaf info since we cleared it during decompression.
+    query.sql = format!(
+        "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)",
+        query.sql
+    );
 
     txn.execute(query)
         .await
@@ -202,26 +203,25 @@ where
 {
     let model = asset::ActiveModel {
         id: Set(id),
-        leaf: Set(None),
         nonce: Set(Some(0)),
-        leaf_seq: Set(None),
+        tree_id: Set(None),
+        leaf: Set(None),
         data_hash: Set(None),
         creator_hash: Set(None),
-        tree_id: Set(None),
-        seq: Set(Some(0)),
+        leaf_seq: Set(None),
         ..Default::default()
     };
+
     let query = asset::Entity::insert(model)
         .on_conflict(
             OnConflict::column(asset::Column::Id)
                 .update_columns([
-                    asset::Column::Leaf,
-                    asset::Column::LeafSeq,
                     asset::Column::Nonce,
+                    asset::Column::TreeId,
+                    asset::Column::Leaf,
                     asset::Column::DataHash,
                     asset::Column::CreatorHash,
-                    asset::Column::TreeId,
-                    asset::Column::Seq,
+                    asset::Column::LeafSeq,
                 ])
                 .to_owned(),
         )
@@ -334,7 +334,7 @@ where
         .build(DbBackend::Postgres);
 
     query.sql = format!(
-        "{} WHERE (NOT asset.was_decompressed) AND (excluded.seq >= asset.seq OR asset.seq IS NULL)",
+        "{} WHERE excluded.seq >= asset.seq OR asset.seq IS NULL",
         query.sql
     );
 
@@ -356,10 +356,10 @@ where
     T: ConnectionTrait + TransactionTrait,
 {
     let model = asset_creators::ActiveModel {
-        asset_id: Set(asset_id),
+        asset_id: Set(asset_id.clone()),
         creator: Set(creator),
         verified: Set(verified),
-        seq: Set(Some(seq)),
+        verified_seq: Set(Some(seq)),
         ..Default::default()
     };
 
@@ -371,16 +371,16 @@ where
             ])
             .update_columns([
                 asset_creators::Column::Verified,
-                asset_creators::Column::Seq,
+                asset_creators::Column::VerifiedSeq,
             ])
             .to_owned(),
         )
         .build(DbBackend::Postgres);
 
     query.sql = format!(
-        "{} WHERE excluded.seq >= asset_creators.seq OR asset_creators.seq is NULL",
-        query.sql
-    );
+    "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq is NULL",
+    query.sql,
+);
 
     txn.execute(query)
         .await
@@ -408,7 +408,7 @@ where
         asset_id: Set(asset_id),
         group_key: Set("collection".to_string()),
         group_value: Set(group_value),
-        verified: Set(Some(verified)),
+        verified: Set(verified),
         slot_updated: Set(Some(slot_updated)),
         group_info_seq: Set(Some(seq)),
         ..Default::default()
@@ -441,3 +441,327 @@ where
 
     Ok(())
 }
+
+#[allow(clippy::too_many_arguments)]
+pub async fn unprotected_upsert_asset_data<T>(
+    txn: &T,
+    id: Vec<u8>,
+    chain_data_mutability: ChainMutability,
+    chain_data: JsonValue,
+    metadata_url: String,
+    metadata_mutability: Mutability,
+    metadata: JsonValue,
+    slot_updated: i64,
+    reindex: Option<bool>,
+    raw_name: Vec<u8>,
+    raw_symbol: Vec<u8>,
+) -> Result<(), IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    let model = asset_data::ActiveModel {
+        id: Set(id.clone()),
+        chain_data_mutability: Set(chain_data_mutability),
+        chain_data: Set(chain_data),
+        metadata_url: Set(metadata_url),
+        metadata_mutability: Set(metadata_mutability),
+        metadata: Set(metadata),
+        slot_updated: Set(slot_updated),
+        reindex: Set(reindex),
+        raw_name: Set(Some(raw_name)),
+        raw_symbol: Set(Some(raw_symbol)),
+        ..Default::default()
+    };
+
+    let query = asset_data::Entity::insert(model)
+        .on_conflict(
+            OnConflict::columns([asset_data::Column::Id])
+                .update_columns([
+                    asset_data::Column::ChainDataMutability,
+                    asset_data::Column::ChainData,
+                    asset_data::Column::MetadataUrl,
+                    asset_data::Column::MetadataMutability,
+                    // Don't update asset_data::Column::Metadata if it already exists.  Even if we
+                    // are indexing `update_metadata`` and there's a new URI, the new background
+                    // task will overwrite it.
+                    asset_data::Column::SlotUpdated,
+                    asset_data::Column::Reindex,
+                    asset_data::Column::RawName,
+                    asset_data::Column::RawSymbol,
+                ])
+                .to_owned(),
+        )
+        .build(DbBackend::Postgres);
+    txn.execute(query)
+        .await
+        .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;
+
+    Ok(())
+}
+
+#[allow(clippy::too_many_arguments)]
+pub async fn unprotected_upsert_asset_base_info<T>(
+    txn: &T,
+    id: Vec<u8>,
+    owner_type: OwnerType,
+    frozen: bool,
+    specification_version: SpecificationVersions,
+    specification_asset_class: SpecificationAssetClass,
+    royalty_target_type: RoyaltyTargetType,
+    royalty_target: Option<Vec<u8>>,
+    royalty_amount: i32,
+    slot_updated: i64,
+) -> Result<(), IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    // Set initial mint info.
+    let asset_model = asset::ActiveModel {
+        id: Set(id.clone()),
+        owner_type: Set(owner_type),
+        frozen: Set(frozen),
+        specification_version: Set(Some(specification_version)),
+        specification_asset_class: Set(Some(specification_asset_class)),
+        royalty_target_type: Set(royalty_target_type),
+        royalty_target: Set(royalty_target),
+        royalty_amount: Set(royalty_amount),
+        asset_data: Set(Some(id)),
+        slot_updated: Set(Some(slot_updated)),
+        ..Default::default()
+    };
+
+    // Upsert asset table base info.
+    let query = asset::Entity::insert(asset_model)
+        .on_conflict(
+            OnConflict::columns([asset::Column::Id])
+                .update_columns([
+                    asset::Column::OwnerType,
+                    asset::Column::Frozen,
+                    asset::Column::SpecificationVersion,
+                    asset::Column::SpecificationAssetClass,
+                    asset::Column::RoyaltyTargetType,
+                    asset::Column::RoyaltyTarget,
+                    asset::Column::RoyaltyAmount,
+                    asset::Column::AssetData,
+                    asset::Column::SlotUpdated,
+                ])
+                .to_owned(),
+        )
+        .build(DbBackend::Postgres);
+
+    txn.execute(query)
+        .await
+        .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
+
+    Ok(())
+}
+
+pub async fn asset_should_be_updated<T>(
+    txn: &T,
+    id: Vec<u8>,
+    seq: Option<i64>,
+) -> Result<bool, IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? {
+        if let Some(update_metadata_seq) = asset.update_metadata_seq {
+            if update_metadata_seq == 0 {
+                // Asset was decompressed and should no longer be updated by Bubblegum program
+                // transformers.
+                return Ok(false);
+            } else if let Some(seq) = seq {
+                if seq < update_metadata_seq {
+                    // Asset was updated by an `update_metadata` so should not be modified by this
+                    // sequence number.
+                    return Ok(false);
+                }
+            }
+        }
+    };
+    Ok(true)
+}
+
+pub async fn upsert_asset_with_update_metadata_seq<T>(
+    txn: &T,
+    id: Vec<u8>,
+    seq: i64,
+) -> Result<(), IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    let model = asset::ActiveModel {
+        id: Set(id),
+        update_metadata_seq: Set(Some(seq)),
+        ..Default::default()
+    };
+
+    let mut query = asset::Entity::insert(model)
+        .on_conflict(
+            OnConflict::column(asset::Column::Id)
+                .update_columns([asset::Column::UpdateMetadataSeq])
+                .to_owned(),
+        )
+        .build(DbBackend::Postgres);
+
+    query.sql = format!(
+        "{} WHERE excluded.update_metadata_seq >= asset.update_metadata_seq OR asset.update_metadata_seq IS NULL",
+        query.sql
+    );
+
+    txn.execute(query)
+        .await
+        .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;
+
+    Ok(())
+}
+
+pub async fn unprotected_upsert_creators<T>(
+    txn: &T,
+    id: Vec<u8>,
+    creators: &Vec<Creator>,
+    slot_updated: i64,
+    seq: i64,
+) -> Result<(), IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    if !creators.is_empty() {
+        // Vec to hold base creator information.
+        let mut db_creator_infos = Vec::with_capacity(creators.len());
+
+        // Vec to hold info on whether a creator is verified.  This info is protected by `seq` number.
+        let mut db_creator_verified_infos = Vec::with_capacity(creators.len());
+
+        // Set to prevent duplicates.
+        let mut creators_set = HashSet::new();
+
+        for (i, c) in creators.iter().enumerate() {
+            if creators_set.contains(&c.address) {
+                continue;
+            }
+
+            db_creator_infos.push(asset_creators::ActiveModel {
+                asset_id: Set(id.clone()),
+                creator: Set(c.address.to_bytes().to_vec()),
+                position: Set(i as i16),
+                share: Set(c.share as i32),
+                slot_updated: Set(Some(slot_updated)),
+                ..Default::default()
+            });
+
+            db_creator_verified_infos.push(asset_creators::ActiveModel {
+                asset_id: Set(id.clone()),
+                creator: Set(c.address.to_bytes().to_vec()),
+                verified: Set(c.verified),
+                verified_seq: Set(Some(seq)),
+                ..Default::default()
+            });
+
+            creators_set.insert(c.address);
+        }
+
+        // This statement will update base information for each creator.
+        let query = asset_creators::Entity::insert_many(db_creator_infos)
+            .on_conflict(
+                OnConflict::columns([
+                    asset_creators::Column::AssetId,
+                    asset_creators::Column::Creator,
+                ])
+                .update_columns([
+                    asset_creators::Column::Position,
+                    asset_creators::Column::Share,
+                    asset_creators::Column::SlotUpdated,
+                ])
+                .to_owned(),
+            )
+            .build(DbBackend::Postgres);
+        txn.execute(query).await?;
+
+        // This statement will update whether the creator is verified and the
+        // `verified_seq` number.
+        let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos)
+            .on_conflict(
+                OnConflict::columns([
+                    asset_creators::Column::AssetId,
+                    asset_creators::Column::Creator,
+                ])
+                .update_columns([
+                    asset_creators::Column::Verified,
+                    asset_creators::Column::VerifiedSeq,
+                ])
+                .to_owned(),
+            )
+            .build(DbBackend::Postgres);
+        query.sql = format!(
+            "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL",
+            query.sql
+        );
+        txn.execute(query).await?;
+    }
+
+    Ok(())
+}
+
+pub async fn unprotected_upsert_asset_v1_account_attachments<T>(
+    txn: &T,
+    edition_attachment_address: Vec<u8>,
+    slot_updated: i64,
+) -> Result<(), IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    let attachment = asset_v1_account_attachments::ActiveModel {
+        id: Set(edition_attachment_address),
+        slot_updated: Set(slot_updated),
+        attachment_type: Set(V1AccountAttachments::MasterEditionV2),
+        ..Default::default()
+    };
+
+    let query = asset_v1_account_attachments::Entity::insert(attachment)
+        .on_conflict(
+            OnConflict::columns([asset_v1_account_attachments::Column::Id])
+                .do_nothing()
+                .to_owned(),
+        )
+        .build(DbBackend::Postgres);
+    txn.execute(query)
+        .await
+        .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
+
+    Ok(())
+}
+
+pub async fn unprotected_upsert_asset_authority<T>(
+    txn: &T,
+    asset_id: Vec<u8>,
+    authority: Vec<u8>,
+    slot_updated: i64,
+    seq: i64,
+) -> Result<(), IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    let model = asset_authority::ActiveModel {
+        asset_id: Set(asset_id),
+        authority: Set(authority),
+        seq: Set(seq),
+        slot_updated: Set(slot_updated),
+        ..Default::default()
+    };
+
+    // Do not attempt to modify any existing values:
+    // `ON CONFLICT ('asset_id') DO NOTHING`.
+    let query = asset_authority::Entity::insert(model)
+        .on_conflict(
+            OnConflict::columns([asset_authority::Column::AssetId])
+                .do_nothing()
+                .to_owned(),
+        )
+        .build(DbBackend::Postgres);
+    txn.execute(query)
+        .await
+        .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
+
+    Ok(())
+}
diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs
index a024d5ebe..4bee5cc0c 100644
--- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/decompress.rs
@@ -1,12 +1,13 @@
 use crate::{
     error::IngesterError,
-    program_transformers::bubblegum::upsert_asset_with_leaf_info_for_decompression,
+    program_transformers::bubblegum::{
+        asset_should_be_updated, upsert_asset_with_compression_info,
+        upsert_asset_with_leaf_info_for_decompression,
+    },
 };
 use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction};
 use sea_orm::{query::*, ConnectionTrait};
 
-use super::upsert_asset_with_compression_info;
-
 pub async fn decompress<'c, T>(
     _parsing_result: &BubblegumInstruction,
     bundle: &InstructionBundle<'c>,
@@ -17,8 +18,14 @@ where
 {
     let id_bytes = bundle.keys.get(3).unwrap().0.as_slice();
 
+    // First check to see if this asset has been decompressed.
+    if !asset_should_be_updated(txn, id_bytes.to_vec(), None).await? {
+        return Ok(());
+    }
+
     // Partial update of asset table with just leaf.
     upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?;
+
     upsert_asset_with_compression_info(
         txn,
         id_bytes.to_vec(),
diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs
index 88896de64..4f0b97e72 100644
--- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs
@@ -1,7 +1,7 @@
 use crate::{
     error::IngesterError,
     program_transformers::bubblegum::{
-        save_changelog_event, upsert_asset_with_leaf_info,
+        asset_should_be_updated, save_changelog_event, upsert_asset_with_leaf_info,
         upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
     },
 };
@@ -30,6 +30,13 @@ where
                 ..
             } => {
                 let id_bytes = id.to_bytes();
+
+                // First check to see if this asset has been decompressed or updated by
+                // `update_metadata`.
+                if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+                    return Ok(());
+                }
+
                 let owner_bytes = owner.to_bytes().to_vec();
                 let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
                     None
@@ -48,7 +55,6 @@ where
                     le.schema.data_hash(),
                     le.schema.creator_hash(),
                     seq as i64,
-                    false,
                 )
                 .await?;
 
diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
index 323c1e35c..6874cb5ef 100644
--- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
@@ -1,8 +1,12 @@
 use crate::{
     error::IngesterError,
     program_transformers::bubblegum::{
-        save_changelog_event, upsert_asset_with_compression_info, upsert_asset_with_leaf_info,
-        upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info,
+        asset_should_be_updated, save_changelog_event, unprotected_upsert_asset_authority,
+        unprotected_upsert_asset_base_info, unprotected_upsert_asset_data,
+        unprotected_upsert_asset_v1_account_attachments, unprotected_upsert_creators,
+        upsert_asset_with_compression_info, upsert_asset_with_leaf_info,
+        upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
+        upsert_asset_with_update_metadata_seq, upsert_collection_info,
     },
     tasks::{DownloadMetadata, IntoTaskData, TaskData},
 };
@@ -15,23 +19,16 @@ use blockbuster::{
     },
 };
 use chrono::Utc;
+use digital_asset_types::dao::sea_orm_active_enums::{
+    SpecificationAssetClass, SpecificationVersions,
+};
 use digital_asset_types::{
-    dao::{
-        asset, asset_authority, asset_creators, asset_data, asset_v1_account_attachments,
-        sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType},
-    },
+    dao::sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType},
     json::ChainDataV1,
 };
 use log::warn;
 use num_traits::FromPrimitive;
-use sea_orm::{
-    entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue,
-};
-use std::collections::HashSet;
-
-use digital_asset_types::dao::sea_orm_active_enums::{
-    SpecificationAssetClass, SpecificationVersions, V1AccountAttachments,
-};
+use sea_orm::{query::*, ConnectionTrait, JsonValue};
 
 // TODO -> consider moving structs into these functions to avoid clone
 
@@ -60,8 +57,14 @@ where
                 nonce,
                 ..
             } => {
-                let (edition_attachment_address, _) = find_master_edition_account(&id);
                 let id_bytes = id.to_bytes();
+
+                // First check to see if this asset has been decompressed or updated by
+                // `update_metadata`.
+                if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+                    return Ok(None);
+                }
+
                 let slot_i = bundle.slot as i64;
                 let uri = metadata.uri.replace('\0', "");
                 let name = metadata.name.clone().into_bytes();
@@ -86,92 +89,38 @@ where
                     false => ChainMutability::Immutable,
                 };
 
-                let data = asset_data::ActiveModel {
-                    id: Set(id_bytes.to_vec()),
-                    chain_data_mutability: Set(chain_mutability),
-                    chain_data: Set(chain_data_json),
-                    metadata_url: Set(uri.clone()),
-                    metadata: Set(JsonValue::String("processing".to_string())),
-                    metadata_mutability: Set(Mutability::Mutable),
-                    slot_updated: Set(slot_i),
-                    reindex: Set(Some(true)),
-                    raw_name: Set(name.to_vec()),
-                    raw_symbol: Set(symbol.to_vec()),
-                    ..Default::default()
-                };
-
-                let mut query = asset_data::Entity::insert(data)
-                    .on_conflict(
-                        OnConflict::columns([asset_data::Column::Id])
-                            .update_columns([
-                                asset_data::Column::ChainDataMutability,
-                                asset_data::Column::ChainData,
-                                asset_data::Column::MetadataUrl,
-                                asset_data::Column::MetadataMutability,
-                                asset_data::Column::SlotUpdated,
-                                asset_data::Column::Reindex,
-                            ])
-                            .to_owned(),
-                    )
-                    .build(DbBackend::Postgres);
-                query.sql = format!(
-                    "{} WHERE excluded.slot_updated > asset_data.slot_updated",
-                    query.sql
-                );
-                txn.execute(query)
-                    .await
-                    .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
-                // Insert into `asset` table.
-                let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
-                    None
-                } else {
-                    Some(delegate.to_bytes().to_vec())
-                };
-                let tree_id = bundle.keys.get(3).unwrap().0.to_vec();
-
-                // Set initial mint info.
-                let asset_model = asset::ActiveModel {
-                    id: Set(id_bytes.to_vec()),
-                    owner_type: Set(OwnerType::Single),
-                    frozen: Set(false),
-                    tree_id: Set(Some(tree_id.clone())),
-                    specification_version: Set(Some(SpecificationVersions::V1)),
-                    specification_asset_class: Set(Some(SpecificationAssetClass::Nft)),
-                    nonce: Set(Some(nonce as i64)),
-                    royalty_target_type: Set(RoyaltyTargetType::Creators),
-                    royalty_target: Set(None),
-                    royalty_amount: Set(metadata.seller_fee_basis_points as i32), //basis points
-                    asset_data: Set(Some(id_bytes.to_vec())),
-                    slot_updated: Set(Some(slot_i)),
-                    ..Default::default()
-                };
+                unprotected_upsert_asset_data(
+                    txn,
+                    id_bytes.to_vec(),
+                    chain_mutability,
+                    chain_data_json,
+                    uri.clone(),
+                    Mutability::Mutable,
+                    JsonValue::String("processing".to_string()),
+                    slot_i,
+                    Some(true),
+                    name.to_vec(),
+                    symbol.to_vec(),
+                )
+                .await?;
 
-                // Upsert asset table base info.
-                let mut query = asset::Entity::insert(asset_model)
-                    .on_conflict(
-                        OnConflict::columns([asset::Column::Id])
-                            .update_columns([
-                                asset::Column::OwnerType,
-                                asset::Column::Frozen,
-                                asset::Column::SpecificationVersion,
-                                asset::Column::SpecificationAssetClass,
-                                asset::Column::RoyaltyTargetType,
-                                asset::Column::RoyaltyTarget,
-                                asset::Column::RoyaltyAmount,
-                                asset::Column::AssetData,
-                            ])
-                            .to_owned(),
-                    )
-                    .build(DbBackend::Postgres);
+                // Upsert into `asset` table.
 
-                // Do not overwrite changes that happened after the asset was decompressed.
-                query.sql = format!(
-                    "{} WHERE excluded.slot_updated > asset.slot_updated OR asset.slot_updated IS NULL",
-                    query.sql
-                );
-                txn.execute(query)
-                    .await
-                    .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
+                // Set base mint info.
+                let tree_id = bundle.keys.get(3).unwrap().0.to_vec();
+                unprotected_upsert_asset_base_info(
+                    txn,
+                    id_bytes.to_vec(),
+                    OwnerType::Single,
+                    false,
+                    SpecificationVersions::V1,
+                    SpecificationAssetClass::Nft,
+                    RoyaltyTargetType::Creators,
+                    None,
+                    metadata.seller_fee_basis_points as i32,
+                    slot_i,
+                )
+                .await?;
 
                 // Partial update of asset table with just compression info elements.
                 upsert_asset_with_compression_info(
@@ -195,11 +144,15 @@ where
                     le.schema.data_hash(),
                     le.schema.creator_hash(),
                     seq as i64,
-                    false,
                 )
                 .await?;
 
                 // Partial update of asset table with just leaf owner and delegate.
+                let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
+                    None
+                } else {
+                    Some(delegate.to_bytes().to_vec())
+                };
                 upsert_asset_with_owner_and_delegate_info(
                     txn,
                     id_bytes.to_vec(),
@@ -211,121 +164,36 @@ where
 
                 upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
 
-                let attachment = asset_v1_account_attachments::ActiveModel {
-                    id: Set(edition_attachment_address.to_bytes().to_vec()),
-                    slot_updated: Set(slot_i),
-                    attachment_type: Set(V1AccountAttachments::MasterEditionV2),
-                    ..Default::default()
-                };
-
-                let query = asset_v1_account_attachments::Entity::insert(attachment)
-                    .on_conflict(
-                        OnConflict::columns([asset_v1_account_attachments::Column::Id])
-                            .do_nothing()
-                            .to_owned(),
-                    )
-                    .build(DbBackend::Postgres);
-                txn.execute(query)
-                    .await
-                    .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
-
-                // Insert into `asset_creators` table.
-                let creators = &metadata.creators;
-                if !creators.is_empty() {
-                    // Vec to hold base creator information.
-                    let mut db_creator_infos = Vec::with_capacity(creators.len());
-
-                    // Vec to hold info on whether a creator is verified.  This info is protected by `seq` number.
-                    let mut db_creator_verified_infos = Vec::with_capacity(creators.len());
-
-                    // Set to prevent duplicates.
-                    let mut creators_set = HashSet::new();
-
-                    for (i, c) in creators.iter().enumerate() {
-                        if creators_set.contains(&c.address) {
-                            continue;
-                        }
-                        db_creator_infos.push(asset_creators::ActiveModel {
-                            asset_id: Set(id_bytes.to_vec()),
-                            creator: Set(c.address.to_bytes().to_vec()),
-                            position: Set(i as i16),
-                            share: Set(c.share as i32),
-                            slot_updated: Set(Some(slot_i)),
-                            ..Default::default()
-                        });
-
-                        db_creator_verified_infos.push(asset_creators::ActiveModel {
-                            asset_id: Set(id_bytes.to_vec()),
-                            creator: Set(c.address.to_bytes().to_vec()),
-                            verified: Set(c.verified),
-                            seq: Set(Some(seq as i64)),
-                            ..Default::default()
-                        });
-
-                        creators_set.insert(c.address);
-                    }
-
-                    // This statement will update base information for each creator.
-                    let query = asset_creators::Entity::insert_many(db_creator_infos)
-                        .on_conflict(
-                            OnConflict::columns([
-                                asset_creators::Column::AssetId,
-                                asset_creators::Column::Creator,
-                            ])
-                            .update_columns([
-                                asset_creators::Column::Position,
-                                asset_creators::Column::Share,
-                                asset_creators::Column::SlotUpdated,
-                            ])
-                            .to_owned(),
-                        )
-                        .build(DbBackend::Postgres);
-                    txn.execute(query).await?;
+                // Upsert into `asset_v1_account_attachments` table.
+                let (edition_attachment_address, _) = find_master_edition_account(&id);
+                unprotected_upsert_asset_v1_account_attachments(
+                    txn,
+                    edition_attachment_address.to_bytes().to_vec(),
+                    slot_i,
+                )
+                .await?;
 
-                    // This statement will update whether the creator is verified and the `seq`
-                    // number.  `seq` is used to protect the `verified` field, allowing for `mint`
-                    // and `verifyCreator` to be processed out of order.
-                    let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos)
-                        .on_conflict(
-                            OnConflict::columns([
-                                asset_creators::Column::AssetId,
-                                asset_creators::Column::Creator,
-                            ])
-                            .update_columns([
-                                asset_creators::Column::Verified,
-                                asset_creators::Column::Seq,
-                            ])
-                            .to_owned(),
-                        )
-                        .build(DbBackend::Postgres);
-                    query.sql = format!(
-                        "{} WHERE excluded.seq > asset_creators.seq OR asset_creators.seq IS NULL",
-                        query.sql
-                    );
-                    txn.execute(query).await?;
-                }
+                // Upsert into `asset_creators` table.
+                unprotected_upsert_creators(
+                    txn,
+                    id_bytes.to_vec(),
+                    &metadata.creators,
+                    slot_i,
+                    seq as i64,
+                )
+                .await?;
 
                 // Insert into `asset_authority` table.
-                let model = asset_authority::ActiveModel {
-                    asset_id: Set(id_bytes.to_vec()),
-                    authority: Set(bundle.keys.get(0).unwrap().0.to_vec()), //TODO - we need to rem,ove the optional bubblegum signer logic
-                    seq: Set(seq as i64),
-                    slot_updated: Set(slot_i),
-                    ..Default::default()
-                };
-
-                // Do not attempt to modify any existing values:
-                // `ON CONFLICT ('asset_id') DO NOTHING`.
-                let query = asset_authority::Entity::insert(model)
-                    .on_conflict(
-                        OnConflict::columns([asset_authority::Column::AssetId])
-                            .do_nothing()
-                            .to_owned(),
-                    )
-                    .build(DbBackend::Postgres);
-                txn.execute(query)
-                    .await
-                    .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
+                //TODO - we need to remove the optional bubblegum signer logic
+                let authority = bundle.keys.get(0).unwrap().0.to_vec();
+                unprotected_upsert_asset_authority(
+                    txn,
+                    id_bytes.to_vec(),
+                    authority,
+                    seq as i64,
+                    slot_i,
+                )
+                .await?;
 
                 // Upsert into `asset_grouping` table with base collection info.
                 upsert_collection_info(
@@ -337,6 +205,8 @@ where
                 )
                 .await?;
 
+                upsert_asset_with_update_metadata_seq(txn, id_bytes.to_vec(), seq as i64).await?;
+
                 if uri.is_empty() {
                     warn!(
                         "URI is empty for mint {}. Skipping background task.",
@@ -348,6 +218,7 @@ where
                 let mut task = DownloadMetadata {
                     asset_data_id: id_bytes.to_vec(),
                     uri: metadata.uri.clone(),
+                    seq: seq as i64,
                     created_at: Some(Utc::now().naive_utc()),
                 };
                 task.sanitize();
diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs
index bcc102c0b..5ed21dbf7 100644
--- a/nft_ingester/src/program_transformers/bubblegum/mod.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs
@@ -17,6 +17,7 @@ mod delegate;
 mod mint_v1;
 mod redeem;
 mod transfer;
+mod update_metadata;
 
 pub use db::*;
 
@@ -53,7 +54,8 @@ where
         InstructionName::VerifyCollection => "VerifyCollection",
         InstructionName::UnverifyCollection => "UnverifyCollection",
         InstructionName::SetAndVerifyCollection => "SetAndVerifyCollection",
-        InstructionName::SetDecompressibleState | InstructionName::UpdateMetadata => todo!(),
+        InstructionName::SetDecompressibleState => "SetDecompressibleState",
+        InstructionName::UpdateMetadata => "UpdateMetadata",
     };
     info!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id);
 
@@ -94,6 +96,15 @@ where
         | InstructionName::SetAndVerifyCollection => {
             collection_verification::process(parsing_result, bundle, txn, cl_audits).await?;
         }
+        InstructionName::SetDecompressibleState => (), // Nothing to index.
+        InstructionName::UpdateMetadata => {
+            let task =
+                update_metadata::update_metadata(parsing_result, bundle, txn, cl_audits).await?;
+
+            if let Some(t) = task {
+                task_manager.send(t)?;
+            }
+        }
         _ => debug!("Bubblegum: Not Implemented Instruction"),
     }
     Ok(())
diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs
index b9b7f2c27..ce45620e5 100644
--- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs
@@ -4,7 +4,8 @@ use log::debug;
 use crate::{
     error::IngesterError,
     program_transformers::bubblegum::{
-        save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, upsert_asset_with_seq,
+        asset_should_be_updated, save_changelog_event, u32_to_u8_array,
+        upsert_asset_with_leaf_info, upsert_asset_with_seq,
     },
 };
 use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction};
@@ -32,6 +33,13 @@ where
         );
         debug!("Indexing redeem for asset id: {:?}", asset_id);
         let id_bytes = asset_id.to_bytes();
+
+        // First check to see if this asset has been decompressed or updated by
+        // `update_metadata`.
+        if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+            return Ok(());
+        }
+
         let tree_id = cl.id.to_bytes();
         let nonce = cl.index as i64;
 
@@ -45,7 +53,6 @@ where
             [0; 32],
             [0; 32],
             seq as i64,
-            false,
         )
         .await?;
 
diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs
index 573f33a8f..2d74707f1 100644
--- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs
+++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs
@@ -2,8 +2,8 @@ use super::save_changelog_event;
 use crate::{
     error::IngesterError,
     program_transformers::bubblegum::{
-        upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info,
-        upsert_asset_with_seq,
+        asset_should_be_updated, upsert_asset_with_leaf_info,
+        upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
     },
 };
 use blockbuster::{
@@ -32,6 +32,13 @@ where
                 ..
             } => {
                 let id_bytes = id.to_bytes();
+
+                // First check to see if this asset has been decompressed or updated by
+                // `update_metadata`.
+                if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+                    return Ok(());
+                }
+
                 let owner_bytes = owner.to_bytes().to_vec();
                 let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
                     None
@@ -51,7 +58,6 @@ where
                     le.schema.data_hash(),
                     le.schema.creator_hash(),
                     seq as i64,
-                    false,
                 )
                 .await?;
 
diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs
new file mode 100644
index 000000000..2dd04c384
--- /dev/null
+++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs
@@ -0,0 +1,292 @@
+use crate::{
+    error::IngesterError,
+    program_transformers::bubblegum::{
+        asset_should_be_updated, save_changelog_event, unprotected_upsert_asset_authority,
+        unprotected_upsert_asset_base_info, unprotected_upsert_asset_data,
+        unprotected_upsert_asset_v1_account_attachments, unprotected_upsert_creators,
+        upsert_asset_with_compression_info, upsert_asset_with_leaf_info,
+        upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq,
+        upsert_asset_with_update_metadata_seq, upsert_collection_info,
+    },
+    tasks::{DownloadMetadata, IntoTaskData, TaskData},
+};
+use blockbuster::{
+    instruction::InstructionBundle,
+    programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload},
+    token_metadata::{
+        pda::find_master_edition_account,
+        state::{TokenStandard, UseMethod, Uses},
+    },
+};
+use chrono::Utc;
+use digital_asset_types::dao::sea_orm_active_enums::{
+    SpecificationAssetClass, SpecificationVersions,
+};
+use digital_asset_types::{
+    dao::{
+        asset_creators,
+        sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType},
+    },
+    json::ChainDataV1,
+};
+use log::warn;
+use num_traits::FromPrimitive;
+use sea_orm::{entity::*, query::*, ConnectionTrait, EntityTrait, JsonValue};
+
+pub async fn update_metadata<'c, T>(
+    parsing_result: &BubblegumInstruction,
+    bundle: &InstructionBundle<'c>,
+    txn: &'c T,
+    cl_audits: bool,
+) -> Result<Option<TaskData>, IngesterError>
+where
+    T: ConnectionTrait + TransactionTrait,
+{
+    if let (
+        Some(le),
+        Some(cl),
+        Some(Payload::UpdateMetadata {
+            current_metadata,
+            update_args,
+        }),
+    ) = (
+        &parsing_result.leaf_update,
+        &parsing_result.tree_update,
+        &parsing_result.payload,
+    ) {
+        let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
+
+        #[allow(unreachable_patterns)]
+        return match le.schema {
+            LeafSchema::V1 {
+                id,
+                delegate,
+                owner,
+                nonce,
+                ..
+            } => {
+                let id_bytes = id.to_bytes();
+
+                // First check to see if this asset has been decompressed or updated by
+                // `update_metadata`.
+                if !asset_should_be_updated(txn, id_bytes.to_vec(), Some(seq as i64)).await? {
+                    return Ok(None);
+                }
+
+                // Upsert into `asset_data` table.
+
+                let slot_i = bundle.slot as i64;
+
+                let uri = if let Some(uri) = &update_args.uri {
+                    uri.replace('\0', "")
+                } else {
+                    current_metadata.uri.replace('\0', "")
+                };
+
+                let name = if let Some(name) = update_args.name.clone() {
+                    name
+                } else {
+                    current_metadata.name.clone()
+                };
+
+                let symbol = if let Some(symbol) = update_args.symbol.clone() {
+                    symbol
+                } else {
+                    current_metadata.symbol.clone()
+                };
+
+                let primary_sale_happened =
+                    if let Some(primary_sale_happened) = update_args.primary_sale_happened {
+                        primary_sale_happened
+                    } else {
+                        current_metadata.primary_sale_happened
+                    };
+
+                let mut chain_data = ChainDataV1 {
+                    name: name.clone(),
+                    symbol: symbol.clone(),
+                    edition_nonce: current_metadata.edition_nonce,
+                    primary_sale_happened,
+                    token_standard: Some(TokenStandard::NonFungible),
+                    uses: current_metadata.uses.clone().map(|u| Uses {
+                        use_method: UseMethod::from_u8(u.use_method as u8).unwrap(),
+                        remaining: u.remaining,
+                        total: u.total,
+                    }),
+                };
+                chain_data.sanitize();
+                let chain_data_json = serde_json::to_value(chain_data)
+                    .map_err(|e| IngesterError::DeserializationError(e.to_string()))?;
+
+                let is_mutable = if let Some(is_mutable) = update_args.is_mutable {
+                    is_mutable
+                } else {
+                    current_metadata.is_mutable
+                };
+
+                let chain_mutability = if is_mutable {
+                    ChainMutability::Mutable
+                } else {
+                    ChainMutability::Immutable
+                };
+
+                unprotected_upsert_asset_data(
+                    txn,
+                    id_bytes.to_vec(),
+                    chain_mutability,
+                    chain_data_json,
+                    uri.clone(),
+                    Mutability::Mutable,
+                    JsonValue::String("processing".to_string()),
+                    slot_i,
+                    Some(true),
+                    name.into_bytes().to_vec(),
+                    symbol.into_bytes().to_vec(),
+                )
+                .await?;
+
+                // Upsert into `asset` table.
+
+                // Set base mint info.
+                let tree_id = bundle.keys.get(5).unwrap().0.to_vec();
+                let seller_fee_basis_points =
+                    if let Some(seller_fee_basis_points) = update_args.seller_fee_basis_points {
+                        seller_fee_basis_points
+                    } else {
+                        current_metadata.seller_fee_basis_points
+                    };
+
+                unprotected_upsert_asset_base_info(
+                    txn,
+                    id_bytes.to_vec(),
+                    OwnerType::Single,
+                    false,
+                    SpecificationVersions::V1,
+                    SpecificationAssetClass::Nft,
+                    RoyaltyTargetType::Creators,
+                    None,
+                    seller_fee_basis_points as i32,
+                    slot_i,
+                )
+                .await?;
+
+                // Partial update of asset table with just compression info elements.
+                upsert_asset_with_compression_info(
+                    txn,
+                    id_bytes.to_vec(),
+                    true,
+                    false,
+                    1,
+                    None,
+                    false,
+                )
+                .await?;
+
+                // Partial update of asset table with just leaf.
+                upsert_asset_with_leaf_info(
+                    txn,
+                    id_bytes.to_vec(),
+                    nonce as i64,
+                    tree_id,
+                    le.leaf_hash.to_vec(),
+                    le.schema.data_hash(),
+                    le.schema.creator_hash(),
+                    seq as i64,
+                )
+                .await?;
+
+                // Partial update of asset table with just leaf owner and delegate.
+                let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] {
+                    None
+                } else {
+                    Some(delegate.to_bytes().to_vec())
+                };
+                upsert_asset_with_owner_and_delegate_info(
+                    txn,
+                    id_bytes.to_vec(),
+                    owner.to_bytes().to_vec(),
+                    delegate,
+                    seq as i64,
+                )
+                .await?;
+
+                upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
+
+                // Upsert into `asset_v1_account_attachments` table.
+                let (edition_attachment_address, _) = find_master_edition_account(&id);
+                unprotected_upsert_asset_v1_account_attachments(
+                    txn,
+                    edition_attachment_address.to_bytes().to_vec(),
+                    slot_i,
+                )
+                .await?;
+
+                // Update `asset_creators` table.
+
+                // Delete any existing creators.
+                asset_creators::Entity::delete_many()
+                    .filter(
+                        Condition::all().add(asset_creators::Column::AssetId.eq(id_bytes.to_vec())),
+                    )
+                    .exec(txn)
+                    .await?;
+
+                let creators = if let Some(creators) = &update_args.creators {
+                    creators
+                } else {
+                    &current_metadata.creators
+                };
+
+                // Upsert into `asset_creators` table.
+                unprotected_upsert_creators(txn, id_bytes.to_vec(), creators, slot_i, seq as i64)
+                    .await?;
+
+                // Insert into `asset_authority` table.
+                //TODO - we need to remove the optional bubblegum signer logic
+                let authority = bundle.keys.get(0).unwrap().0.to_vec();
+                unprotected_upsert_asset_authority(
+                    txn,
+                    id_bytes.to_vec(),
+                    authority,
+                    seq as i64,
+                    slot_i,
+                )
+                .await?;
+
+                // Upsert into `asset_grouping` table with base collection info.
+                upsert_collection_info(
+                    txn,
+                    id_bytes.to_vec(),
+                    current_metadata.collection.clone(),
+                    slot_i,
+                    seq as i64,
+                )
+                .await?;
+
+                upsert_asset_with_update_metadata_seq(txn, id_bytes.to_vec(), seq as i64).await?;
+
+                if uri.is_empty() {
+                    warn!(
+                        "URI is empty for mint {}. Skipping background task.",
+                        bs58::encode(id).into_string()
+                    );
+                    return Ok(None);
+                }
+
+                let mut task = DownloadMetadata {
+                    asset_data_id: id_bytes.to_vec(),
+                    uri,
+                    seq: seq as i64,
+                    created_at: Some(Utc::now().naive_utc()),
+                };
+                task.sanitize();
+                let t = task.into_task_data()?;
+                Ok(Some(t))
+            }
+            _ => Err(IngesterError::NotImplemented),
+        };
+    }
+    Err(IngesterError::ParsingError(
+        "Ix not parsed correctly".to_string(),
+    ))
+}
diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs
index 879581341..70fdaee63 100644
--- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs
+++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs
@@ -23,7 +23,7 @@ use num_traits::FromPrimitive;
 use plerkle_serialization::Pubkey as FBPubkey;
 use sea_orm::{
     entity::*, query::*, sea_query::OnConflict, ActiveValue::Set, ConnectionTrait, DbBackend,
-    DbErr, EntityTrait, FromQueryResult, JoinType, JsonValue,
+    DbErr, EntityTrait, JsonValue,
 };
 use std::collections::HashSet;
 
@@ -147,8 +147,10 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
         slot_updated: Set(slot_i),
         reindex: Set(Some(true)),
         id: Set(id.to_vec()),
-        raw_name: Set(name.to_vec()),
-        raw_symbol: Set(symbol.to_vec()),
+        raw_name: Set(Some(name.to_vec())),
+        raw_symbol: Set(Some(symbol.to_vec())),
+        download_metadata_seq: Set(Some(0)),
+        ..Default::default()
     };
     let txn = conn.begin().await?;
     let mut query = asset_data::Entity::insert(asset_data_model)
@@ -161,6 +163,8 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
                     asset_data::Column::MetadataMutability,
                     asset_data::Column::SlotUpdated,
                     asset_data::Column::Reindex,
+                    asset_data::Column::RawName,
+                    asset_data::Column::RawSymbol,
                 ])
                 .to_owned(),
         )
@@ -272,13 +276,14 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
     txn.execute(query)
         .await
         .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
+
     if let Some(c) = &metadata.collection {
         let model = asset_grouping::ActiveModel {
             asset_id: Set(id.to_vec()),
             group_key: Set("collection".to_string()),
             group_value: Set(Some(c.key.to_string())),
-            verified: Set(Some(c.verified)),
-            seq: Set(None),
+            verified: Set(c.verified),
+            group_info_seq: Set(None),
             slot_updated: Set(Some(slot_i)),
             ..Default::default()
         };
@@ -289,10 +294,10 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
                     asset_grouping::Column::GroupKey,
                 ])
                 .update_columns([
-                    asset_grouping::Column::GroupKey,
                     asset_grouping::Column::GroupValue,
-                    asset_grouping::Column::Seq,
+                    asset_grouping::Column::Verified,
                     asset_grouping::Column::SlotUpdated,
+                    asset_grouping::Column::GroupInfoSeq,
                 ])
                 .to_owned(),
             )
@@ -307,6 +312,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
     }
     txn.commit().await?;
     let creators = data.creators.unwrap_or_default();
+
     if !creators.is_empty() {
         let mut creators_set = HashSet::new();
         let existing_creators: Vec<asset_creators::Model> = asset_creators::Entity::find()
@@ -317,6 +323,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
             )
             .all(conn)
             .await?;
+
         if !existing_creators.is_empty() {
             let mut db_creators = Vec::with_capacity(creators.len());
             for (i, c) in creators.into_iter().enumerate() {
@@ -328,7 +335,6 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
                     creator: Set(c.address.to_bytes().to_vec()),
                     share: Set(c.share as i32),
                     verified: Set(c.verified),
-                    seq: Set(Some(0)),
                     slot_updated: Set(Some(slot_i)),
                     position: Set(i as i16),
                     ..Default::default()
@@ -344,7 +350,8 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
                 )
                 .exec(&txn)
                 .await?;
-            if db_creators.len() > 0 {
+
+            if !db_creators.is_empty() {
                 let mut query = asset_creators::Entity::insert_many(db_creators)
                     .on_conflict(
                         OnConflict::columns([
@@ -355,7 +362,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
                             asset_creators::Column::Creator,
                             asset_creators::Column::Share,
                             asset_creators::Column::Verified,
-                            asset_creators::Column::Seq,
+                            asset_creators::Column::VerifiedSeq,
                             asset_creators::Column::SlotUpdated,
                         ])
                         .to_owned(),
@@ -383,6 +390,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
     let mut task = DownloadMetadata {
         asset_data_id: id.to_vec(),
         uri,
+        seq: 0,
         created_at: Some(Utc::now().naive_utc()),
     };
     task.sanitize();
diff --git a/nft_ingester/src/tasks/common/mod.rs b/nft_ingester/src/tasks/common/mod.rs
index bf39e455d..3b533c2f7 100644
--- a/nft_ingester/src/tasks/common/mod.rs
+++ b/nft_ingester/src/tasks/common/mod.rs
@@ -18,6 +18,7 @@ const TASK_NAME: &str = "DownloadMetadata";
 pub struct DownloadMetadata {
     pub asset_data_id: Vec<u8>,
     pub uri: String,
+    pub seq: i64,
     #[serde(skip_serializing)]
     pub created_at: Option<NaiveDateTime>,
 }
@@ -110,24 +111,29 @@ impl BgTask for DownloadMetadataTask {
             id: Unchanged(download_metadata.asset_data_id.clone()),
             metadata: Set(body),
             reindex: Set(Some(false)),
+            download_metadata_seq: Set(Some(download_metadata.seq)),
             ..Default::default()
         };
         debug!(
             "download metadata for {:?}",
             bs58::encode(download_metadata.asset_data_id.clone()).into_string()
         );
-        asset_data::Entity::update(model)
-            .filter(asset_data::Column::Id.eq(download_metadata.asset_data_id.clone()))
-            .exec(db)
-            .await
-            .map(|_| ())
-            .map_err(|db| {
-                IngesterError::TaskManagerError(format!(
-                    "Database error with {}, error: {}",
-                    self.name(),
-                    db
-                ))
-            })?;
+        let mut query = asset_data::Entity::update(model)
+            .filter(asset_data::Column::Id.eq(download_metadata.asset_data_id.clone()));
+        if download_metadata.seq != 0 {
+            query = query.filter(
+                Condition::any()
+                    .add(asset_data::Column::DownloadMetadataSeq.lte(download_metadata.seq))
+                    .add(asset_data::Column::DownloadMetadataSeq.is_null()),
+            );
+        }
+        query.exec(db).await.map(|_| ()).map_err(|db| {
+            IngesterError::TaskManagerError(format!(
+                "Database error with {}, error: {}",
+                self.name(),
+                db
+            ))
+        })?;
 
         if meta_url.is_err() {
             return Err(IngesterError::UnrecoverableTaskError(format!(
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
index 469626eac..8142c3012 100644
--- a/rust-toolchain.toml
+++ b/rust-toolchain.toml
@@ -1,2 +1,2 @@
 [toolchain]
-channel = "1.70.0"
\ No newline at end of file
+channel = "1.73.0"
diff --git a/tools/acc_forwarder/Cargo.toml b/tools/acc_forwarder/Cargo.toml
index 04e0ee842..96d5d4eec 100644
--- a/tools/acc_forwarder/Cargo.toml
+++ b/tools/acc_forwarder/Cargo.toml
@@ -24,6 +24,6 @@ solana-account-decoder = "~1.16.16"
 solana-client = "~1.16.16"
 solana-sdk = "~1.16.16"
 solana-transaction-status = "~1.16.16"
-spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] }
+spl-token = { version = "4.0.0", features = ["no-entrypoint"] }
 tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread", "time"] }
 txn_forwarder = { path = "../txn_forwarder" }
diff --git a/tools/bgtask_creator/src/main.rs b/tools/bgtask_creator/src/main.rs
index a08dd87d1..17d1bba0c 100644
--- a/tools/bgtask_creator/src/main.rs
+++ b/tools/bgtask_creator/src/main.rs
@@ -322,6 +322,7 @@ WHERE
                     let mut task = DownloadMetadata {
                         asset_data_id: asset.id,
                         uri: asset.metadata_url,
+                        seq: 0,
                         created_at: Some(Utc::now().naive_utc()),
                     };
 
diff --git a/tools/load_generation/Cargo.toml b/tools/load_generation/Cargo.toml
index 2f9452300..e0fb82ace 100644
--- a/tools/load_generation/Cargo.toml
+++ b/tools/load_generation/Cargo.toml
@@ -12,5 +12,5 @@ solana-client = "~1.16.16"
 solana-program = "~1.16.16"
 solana-sdk = "~1.16.16"
 spl-associated-token-account = { version = ">= 1.1.3, < 3.0", features = ["no-entrypoint"] }
-spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] }
+spl-token = { version = "4.0.0", features = ["no-entrypoint"] }
 tokio = { version ="1.25.0", features = ["macros", "rt-multi-thread"] }