From 092b385c612bc57211574cd7a6ec5b563aa8094f Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 08:20:08 +0500 Subject: [PATCH] use an unbounded channel to process transactions --- Cargo.lock | 15 ++++++++ core/src/db.rs | 2 +- indexer/Cargo.toml | 2 +- indexer/src/handler.rs | 75 ++++++++++++++++++++++++++++++++++++---- indexer/src/lib.rs | 3 ++ indexer/src/processor.rs | 45 +++--------------------- 6 files changed, 93 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63769e3..cd49ec5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,6 +1458,20 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -5406,6 +5420,7 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", + "crossbeam", "dashmap", "futures", "hex", diff --git a/core/src/db.rs b/core/src/db.rs index 2cc9c7f..18ce10b 100644 --- a/core/src/db.rs +++ b/core/src/db.rs @@ -12,7 +12,7 @@ pub struct DbArgs { pub connection_timeout: u64, #[arg(long, env, default_value_t = 10)] pub acquire_timeout: u64, - #[arg(long, env, default_value_t = 60)] + #[arg(long, env, default_value_t = 10)] pub idle_timeout: u64, #[arg(long, env)] pub database_url: String, diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index c9f3173..7b97ec1 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,7 +29,7 @@ dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } - +crossbeam = "0.8.2" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index f70c3ee..223dd7e 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,16 +1,23 @@ use std::sync::Arc; +use crossbeam::channel::{self, Receiver, Sender}; +use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; use hub_core::{ backon::{ExponentialBuilder, Retryable}, prelude::*, producer::Producer, - tokio, + tokio::{ + self, + task::{self, JoinSet}, + }, }; use solana_client::rpc_client::RpcClient; use yellowstone_grpc_client::GeyserGrpcClientError; -use yellowstone_grpc_proto::prelude::SubscribeRequest; +use yellowstone_grpc_proto::prelude::{ + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdateTransaction, +}; use crate::{processor::Processor, Args, GeyserGrpcConnector}; @@ -18,6 +25,10 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, + dashmap: DashMap>, + tx: Sender, + rx: Receiver, + parallelism: usize, } impl MessageHandler { @@ -26,6 +37,7 @@ impl MessageHandler { dragon_mouth_endpoint, dragon_mouth_x_token, solana_endpoint, + parallelism, db, } = args; @@ -35,12 +47,16 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - + let (tx, rx) = channel::unbounded(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, + dashmap: DashMap::new(), + tx, + rx, + parallelism, }) } @@ -54,9 +70,23 @@ impl MessageHandler { .map_err(GeyserGrpcClientError::SubscribeSendError)?; while let Some(message) = stream.next().await { - self.processor.process(message).await?; + match message { + Ok(msg) => match msg.update_oneof { + Some(UpdateOneof::Transaction(tx)) => { + self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + }, + Some(UpdateOneof::Slot(slot)) => { + if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + for tx in transactions { + self.tx.send(tx)?; + } + } + }, + _ => {}, + }, + Err(error) => bail!("stream error: {:?}", error), + }; } - Ok(()) }) .retry( @@ -81,12 +111,42 @@ impl MessageHandler { }); let mpl_bubblegum_stream = tokio::spawn({ + let handler = self.clone(); async move { - self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) + handler + .connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) .await } }); + let rx = self.rx.clone(); + let processor = self.processor; + + let process_task = task::spawn(async move { + let mut set = JoinSet::new(); + + loop { + let processor = processor.clone(); + + while set.len() >= self.parallelism { + match set.join_next().await { + Some(Err(e)) => bail!("failed to join task {:?}", e), + Some(Ok(_)) | None => (), + } + } + + match rx.recv() { + Ok(tx) => { + set.spawn(processor.process_transaction(tx)); + }, + Err(e) => { + error!("{:?}", e); + return Result::<(), Error>::Err(e.into()); + }, + } + } + }); + tokio::select! { Err(e) = spl_token_stream => { bail!("spl token stream error: {:?}", e) @@ -94,6 +154,9 @@ impl MessageHandler { Err(e) = mpl_bubblegum_stream => { bail!("mpl bumblegum stream error: {:?}", e) } + Err(e) = process_task => { + bail!("Receiver err: {:?}", e) + } } } } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 5b66883..fbee2fe 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -18,6 +18,9 @@ pub struct Args { #[arg(long, env)] pub solana_endpoint: String, + #[arg(long, short = 'p', env, default_value_t = 8)] + pub parallelism: usize, + #[command(flatten)] pub db: db::DbArgs, } diff --git a/indexer/src/processor.rs b/indexer/src/processor.rs index 71bc0f2..6847de8 100644 --- a/indexer/src/processor.rs +++ b/indexer/src/processor.rs @@ -2,7 +2,6 @@ use std::{convert::TryInto, sync::Arc}; use anchor_lang::AnchorDeserialize; use backoff::ExponentialBackoff; -use dashmap::DashMap; use holaplex_hub_nfts_solana_core::{ db::Connection, proto::{ @@ -13,25 +12,19 @@ use holaplex_hub_nfts_solana_core::{ CollectionMint, CompressionLeaf, }; use holaplex_hub_nfts_solana_entity::compression_leafs; -use hub_core::{prelude::*, producer::Producer, tokio::task}; +use hub_core::{prelude::*, producer::Producer}; use mpl_bubblegum::utils::get_asset_id; use solana_client::rpc_client::RpcClient; use solana_program::program_pack::Pack; use solana_sdk::{pubkey::Pubkey, signature::Signature}; use spl_token::{instruction::TokenInstruction, state::Account}; -use yellowstone_grpc_proto::{ - prelude::{ - subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, - }, - tonic::Status, -}; +use yellowstone_grpc_proto::prelude::{Message, SubscribeUpdateTransaction}; #[derive(Clone)] pub struct Processor { db: Connection, rpc: Arc, producer: Producer, - dashmap: DashMap>, } impl Processor { @@ -40,38 +33,7 @@ impl Processor { rpc: Arc, producer: Producer, ) -> Self { - Self { - db, - rpc, - producer, - dashmap: DashMap::new(), - } - } - - pub(crate) async fn process(&self, message: Result) -> Result<()> { - match message { - Ok(msg) => match msg.update_oneof { - Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); - }, - Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { - let handles: Vec<_> = transactions - .into_iter() - .map(|tx| task::spawn(self.clone().process_transaction(tx))) - .collect(); - - for handle in handles { - handle.await?? - } - } - }, - _ => {}, - }, - Err(error) => bail!("stream error: {:?}", error), - }; - - Ok(()) + Self { db, rpc, producer } } pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> { @@ -140,6 +102,7 @@ impl Processor { if compression_leaf.is_none() { return Ok(()); } + let compression_leaf = compression_leaf.context("Compression leaf not found")?; let collection_mint_id = compression_leaf.id;