Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate streams for spl token and mpl bumblegum #58

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions indexer/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashMap, vec};

use futures::{channel::mpsc::SendError, Sink, Stream};
use hub_core::prelude::*;
use solana_program::pubkey::Pubkey;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::{prelude::*, tonic::Status};

Expand Down Expand Up @@ -29,7 +30,7 @@ impl GeyserGrpcConnector {
Ok((subscribe_tx, stream))
}

pub fn build_request() -> SubscribeRequest {
pub fn build_request(program_id: Pubkey) -> SubscribeRequest {
let mut slots = HashMap::new();
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});

Expand All @@ -38,20 +39,16 @@ impl GeyserGrpcConnector {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec![spl_token::ID.to_string(), mpl_bubblegum::ID.to_string()],
account_include: vec![program_id.to_string()],
account_exclude: Vec::new(),
account_required: Vec::new(),
});

SubscribeRequest {
accounts: HashMap::new(),
slots,
transactions,
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
commitment: Some(CommitmentLevel::Finalized as i32),
accounts_data_slice: vec![],
entry: HashMap::new(),
..Default::default()
}
}
}
290 changes: 49 additions & 241 deletions indexer/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,23 @@
use std::{convert::TryInto, sync::Arc};
use std::sync::Arc;

use anchor_lang::AnchorDeserialize;
use backoff::ExponentialBackoff;
use dashmap::DashMap;
use futures::{sink::SinkExt, stream::StreamExt};
use holaplex_hub_nfts_solana_core::{
db::Connection,
proto::{
solana_nft_events::Event::UpdateMintOwner, MintOwnershipUpdate, SolanaNftEventKey,
SolanaNftEvents,
},
sea_orm::Set,
CollectionMint, CompressionLeaf,
use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents};
use hub_core::{
backon::{ExponentialBuilder, Retryable},
prelude::*,
producer::Producer,
tokio,
};
use holaplex_hub_nfts_solana_entity::compression_leafs;
use hub_core::{prelude::*, producer::Producer, tokio::task};
use mpl_bubblegum::utils::get_asset_id;
use solana_client::rpc_client::RpcClient;
use solana_program::program_pack::Pack;
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use spl_token::{instruction::TokenInstruction, state::Account};
use yellowstone_grpc_client::GeyserGrpcClientError;
use yellowstone_grpc_proto::{
prelude::{
subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction,
},
tonic::Status,
};
use yellowstone_grpc_proto::prelude::SubscribeRequest;

use crate::{Args, GeyserGrpcConnector};
use crate::{processor::Processor, Args, GeyserGrpcConnector};

#[derive(Clone)]
pub struct MessageHandler {
db: Connection,
dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>>,
rpc: Arc<RpcClient>,
connector: GeyserGrpcConnector,
producer: Producer<SolanaNftEvents>,
processor: Processor,
}

impl MessageHandler {
Expand All @@ -51,241 +32,68 @@ impl MessageHandler {
let db = Connection::new(db)
.await
.context("failed to get database connection")?;
let dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>> = DashMap::new();

let rpc = Arc::new(RpcClient::new(solana_endpoint));
let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token);

let processor = Processor::new(db, rpc, producer);

Ok(Self {
db,
dashmap,
rpc,
connector,
producer,
processor,
})
}

pub async fn run(&self) -> Result<()> {
let request = GeyserGrpcConnector::build_request();
let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?;

loop {
let request = request.clone();
async fn connect(&self, request: SubscribeRequest) -> Result<()> {
(|| async {
let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?;

subscribe_tx
.send(request)
.send(request.clone())
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;

while let Some(message) = stream.next().await {
self.handle_message(message).await?;
self.processor.process(message).await?;
}
}
}

async fn handle_message(&self, message: Result<SubscribeUpdate, Status>) -> Result<()> {
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) {
for tx in transactions {
task::spawn(self.clone().process_transaction(tx));
}
}
},
_ => {},
},
Err(error) => return Err(anyhow!("stream error: {:?}", error)),
};

Ok(())
Ok(())
})
.retry(
&ExponentialBuilder::default()
.with_max_times(10)
.with_jitter(),
)
.notify(|err: &Error, dur: Duration| {
error!("stream error: {:?} retrying in {:?}", err, dur);
})
.await
}

async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> {
let message = tx
.transaction
.clone()
.context("SubscribeTransactionInfo not found")?
.transaction
.context("Transaction not found")?
.message
.context("Message not found")?;
let sig = tx
.transaction
.as_ref()
.ok_or_else(|| anyhow!("failed to get transaction"))?
.signature
.clone();

let keys = message.clone().account_keys;

for (idx, key) in message.clone().account_keys.iter().enumerate() {
let key: &[u8] = key;
let k = Pubkey::try_from(key)?;
if k == spl_token::ID {
self.process_spl_token_transaction(idx, &keys, &sig, &message)
.await?;
} else if k == mpl_bubblegum::ID {
self.process_mpl_bubblegum_transaction(idx, &keys, &sig, &message)
.await?;
pub async fn run(self) -> Result<()> {
let spl_token_stream = tokio::spawn({
let handler = self.clone();
async move {
handler
.connect(GeyserGrpcConnector::build_request(spl_token::ID))
.await
}
}

Ok(())
}

async fn process_mpl_bubblegum_transaction(
&self,
program_account_index: usize,
keys: &[Vec<u8>],
sig: &Vec<u8>,
message: &Message,
) -> Result<()> {
for ins in message.instructions.iter() {
let account_indices = ins.accounts.clone();
let program_idx: usize = ins.program_id_index.try_into()?;

if program_idx == program_account_index {
let conn = self.db.get();
let data = ins.data.clone();
let data = data.as_slice();

let tkn_instruction =
mpl_bubblegum::instruction::Transfer::try_from_slice(&data[8..])?;
let new_leaf_owner_account_index = account_indices[3];
let merkle_tree_account_index = account_indices[4];
let new_leaf_owner_bytes: &[u8] = &keys[new_leaf_owner_account_index as usize];
let merkle_tree_bytes: &[u8] = &keys[merkle_tree_account_index as usize];
let new_leaf_owner = Pubkey::try_from(new_leaf_owner_bytes)?;
let merkle_tree = Pubkey::try_from(merkle_tree_bytes)?;

let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce);

let compression_leaf =
CompressionLeaf::find_by_asset_id(conn, asset_id.to_string())
.await?
.context("compression leaf not found")?;
});

let collection_mint_id = compression_leaf.id;
let leaf_owner = compression_leaf.leaf_owner.clone();
let mut compression_leaf: compression_leafs::ActiveModel = compression_leaf.into();

compression_leaf.leaf_owner = Set(new_leaf_owner.to_string());

CompressionLeaf::update(conn, compression_leaf).await?;

self.producer
.send(
Some(&SolanaNftEvents {
event: Some(UpdateMintOwner(MintOwnershipUpdate {
mint_address: asset_id.to_string(),
sender: leaf_owner,
recipient: new_leaf_owner.to_string(),
tx_signature: Signature::new(sig.as_slice()).to_string(),
})),
}),
Some(&SolanaNftEventKey {
id: collection_mint_id.to_string(),
..Default::default()
}),
)
.await?;
let mpl_bubblegum_stream = tokio::spawn({
async move {
self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID))
.await
}
}

Ok(())
}

async fn process_spl_token_transaction(
&self,
program_account_index: usize,
keys: &[Vec<u8>],
sig: &Vec<u8>,
message: &Message,
) -> Result<()> {
let conn = self.db.get();
for ins in message.instructions.iter() {
let account_indices = ins.accounts.clone();
let program_idx: usize = ins.program_id_index.try_into()?;

if program_idx == program_account_index {
let data = ins.data.clone();
let data = data.as_slice();
let tkn_instruction = spl_token::instruction::TokenInstruction::unpack(data)?;

let transfer_info = match tkn_instruction {
TokenInstruction::TransferChecked { amount, .. } => Some((amount, 2)),
TokenInstruction::Transfer { amount } => Some((amount, 1)),
_ => None,
};

if transfer_info.is_none() {
continue;
}

if let Some((1, destination_ata_index)) = transfer_info {
let source_account_index = account_indices[0];
let source_bytes: &[u8] = &keys[source_account_index as usize];
let source = Pubkey::try_from(source_bytes)?;
});

let collection_mint =
CollectionMint::find_by_ata(conn, source.to_string()).await?;

if collection_mint.is_none() {
return Ok(());
}

let destination_account_index = account_indices[destination_ata_index];
let destination_bytes: &[u8] = &keys[destination_account_index as usize];
let destination = Pubkey::try_from(destination_bytes)?;

let acct = fetch_account(&self.rpc, &destination).await?;
let destination_tkn_act = Account::unpack(&acct.data)?;
let new_owner = destination_tkn_act.owner.to_string();
let mint = collection_mint.context("No mint found")?;

CollectionMint::update_owner_and_ata(
conn,
&mint,
new_owner.clone(),
destination.to_string(),
)
.await?;

self.producer
.send(
Some(&SolanaNftEvents {
event: Some(UpdateMintOwner(MintOwnershipUpdate {
mint_address: destination_tkn_act.mint.to_string(),
sender: mint.owner.to_string(),
recipient: new_owner,
tx_signature: Signature::new(sig.as_slice()).to_string(),
})),
}),
Some(&SolanaNftEventKey {
id: mint.id.to_string(),
..Default::default()
}),
)
.await?;
}
tokio::select! {
Err(e) = spl_token_stream => {
bail!("spl token stream error: {:?}", e)
},
Err(e) = mpl_bubblegum_stream => {
bail!("mpl bumblegum stream error: {:?}", e)
}
}

Ok(())
}
}

async fn fetch_account(
rpc: &Arc<RpcClient>,
address: &Pubkey,
) -> Result<solana_sdk::account::Account, solana_client::client_error::ClientError> {
backoff::future::retry(ExponentialBackoff::default(), || async {
let acct = rpc.get_account(address)?;

Ok(acct)
})
.await
}
1 change: 1 addition & 0 deletions indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod connector;
mod handler;
mod processor;
use clap::{arg, command};
pub use connector::GeyserGrpcConnector;
pub use handler::MessageHandler;
Expand Down
Loading