Skip to content

Commit

Permalink
use an unbounded channel to process transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdul Basit committed Oct 4, 2023
1 parent 696b78e commit 092b385
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 49 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct DbArgs {
pub connection_timeout: u64,
#[arg(long, env, default_value_t = 10)]
pub acquire_timeout: u64,
#[arg(long, env, default_value_t = 60)]
#[arg(long, env, default_value_t = 10)]
pub idle_timeout: u64,
#[arg(long, env)]
pub database_url: String,
Expand Down
2 changes: 1 addition & 1 deletion indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dashmap = "5.4.0"
spl-token = "=3.5.0"
solana-client = "1.14"
backoff = { version = "0.4.0", features = ["tokio"] }

crossbeam = "0.8.2"

[dependencies.hub-core]
package = "holaplex-hub-core"
Expand Down
75 changes: 69 additions & 6 deletions indexer/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
use std::sync::Arc;

use crossbeam::channel::{self, Receiver, Sender};
use dashmap::DashMap;
use futures::{sink::SinkExt, stream::StreamExt};
use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents};
use hub_core::{
backon::{ExponentialBuilder, Retryable},
prelude::*,
producer::Producer,
tokio,
tokio::{
self,
task::{self, JoinSet},
},
};
use solana_client::rpc_client::RpcClient;
use yellowstone_grpc_client::GeyserGrpcClientError;
use yellowstone_grpc_proto::prelude::SubscribeRequest;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdateTransaction,
};

use crate::{processor::Processor, Args, GeyserGrpcConnector};

#[derive(Clone)]
pub struct MessageHandler {
connector: GeyserGrpcConnector,
processor: Processor,
dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>>,
tx: Sender<SubscribeUpdateTransaction>,
rx: Receiver<SubscribeUpdateTransaction>,
parallelism: usize,
}

impl MessageHandler {
Expand All @@ -26,6 +37,7 @@ impl MessageHandler {
dragon_mouth_endpoint,
dragon_mouth_x_token,
solana_endpoint,
parallelism,
db,
} = args;

Expand All @@ -35,12 +47,16 @@ impl MessageHandler {

let rpc = Arc::new(RpcClient::new(solana_endpoint));
let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token);

let (tx, rx) = channel::unbounded();
let processor = Processor::new(db, rpc, producer);

Ok(Self {
connector,
processor,
dashmap: DashMap::new(),
tx,
rx,
parallelism,
})
}

Expand All @@ -54,9 +70,23 @@ impl MessageHandler {
.map_err(GeyserGrpcClientError::SubscribeSendError)?;

while let Some(message) = stream.next().await {
self.processor.process(message).await?;
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) {
for tx in transactions {
self.tx.send(tx)?;
}
}
},
_ => {},
},
Err(error) => bail!("stream error: {:?}", error),
};
}

Ok(())
})
.retry(
Expand All @@ -81,19 +111,52 @@ impl MessageHandler {
});

let mpl_bubblegum_stream = tokio::spawn({
let handler = self.clone();
async move {
self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID))
handler
.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID))
.await
}
});

let rx = self.rx.clone();
let processor = self.processor;

let process_task = task::spawn(async move {
let mut set = JoinSet::new();

loop {
let processor = processor.clone();

while set.len() >= self.parallelism {
match set.join_next().await {
Some(Err(e)) => bail!("failed to join task {:?}", e),
Some(Ok(_)) | None => (),
}
}

match rx.recv() {
Ok(tx) => {
set.spawn(processor.process_transaction(tx));
},
Err(e) => {
error!("{:?}", e);
return Result::<(), Error>::Err(e.into());
},
}
}
});

tokio::select! {
Err(e) = spl_token_stream => {
bail!("spl token stream error: {:?}", e)
},
Err(e) = mpl_bubblegum_stream => {
bail!("mpl bumblegum stream error: {:?}", e)
}
Err(e) = process_task => {
bail!("Receiver err: {:?}", e)
}
}
}
}
3 changes: 3 additions & 0 deletions indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub struct Args {
#[arg(long, env)]
pub solana_endpoint: String,

#[arg(long, short = 'p', env, default_value_t = 8)]
pub parallelism: usize,

#[command(flatten)]
pub db: db::DbArgs,
}
45 changes: 4 additions & 41 deletions indexer/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{convert::TryInto, sync::Arc};

use anchor_lang::AnchorDeserialize;
use backoff::ExponentialBackoff;
use dashmap::DashMap;
use holaplex_hub_nfts_solana_core::{
db::Connection,
proto::{
Expand All @@ -13,25 +12,19 @@ use holaplex_hub_nfts_solana_core::{
CollectionMint, CompressionLeaf,
};
use holaplex_hub_nfts_solana_entity::compression_leafs;
use hub_core::{prelude::*, producer::Producer, tokio::task};
use hub_core::{prelude::*, producer::Producer};
use mpl_bubblegum::utils::get_asset_id;
use solana_client::rpc_client::RpcClient;
use solana_program::program_pack::Pack;
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use spl_token::{instruction::TokenInstruction, state::Account};
use yellowstone_grpc_proto::{
prelude::{
subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction,
},
tonic::Status,
};
use yellowstone_grpc_proto::prelude::{Message, SubscribeUpdateTransaction};

#[derive(Clone)]
pub struct Processor {
db: Connection,
rpc: Arc<RpcClient>,
producer: Producer<SolanaNftEvents>,
dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>>,
}

impl Processor {
Expand All @@ -40,38 +33,7 @@ impl Processor {
rpc: Arc<RpcClient>,
producer: Producer<SolanaNftEvents>,
) -> Self {
Self {
db,
rpc,
producer,
dashmap: DashMap::new(),
}
}

pub(crate) async fn process(&self, message: Result<SubscribeUpdate, Status>) -> Result<()> {
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) {
let handles: Vec<_> = transactions
.into_iter()
.map(|tx| task::spawn(self.clone().process_transaction(tx)))
.collect();

for handle in handles {
handle.await??
}
}
},
_ => {},
},
Err(error) => bail!("stream error: {:?}", error),
};

Ok(())
Self { db, rpc, producer }
}

pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> {
Expand Down Expand Up @@ -140,6 +102,7 @@ impl Processor {
if compression_leaf.is_none() {
return Ok(());
}

let compression_leaf = compression_leaf.context("Compression leaf not found")?;

let collection_mint_id = compression_leaf.id;
Expand Down

0 comments on commit 092b385

Please sign in to comment.