From 6f7d4c298c91ccdc53986bb8d6310a4af7e29b10 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 09:30:51 +0500 Subject: [PATCH] use tokio unbounded channel --- Cargo.lock | 15 --------------- indexer/Cargo.toml | 1 - indexer/src/handler.rs | 40 ++++++++++++++++++++-------------------- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd49ec5..63769e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,20 +1458,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -5420,7 +5406,6 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", - "crossbeam", "dashmap", "futures", "hex", diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 7b97ec1..23ff826 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,7 +29,6 @@ dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } -crossbeam = "0.8.2" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index 223dd7e..2e6b21e 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use crossbeam::channel::{self, Receiver, Sender}; use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; @@ -10,6 +9,10 @@ use hub_core::{ producer::Producer, tokio::{ self, + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, + }, task::{self, JoinSet}, }, }; @@ -25,9 +28,8 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, - dashmap: DashMap>, - tx: Sender, - rx: Receiver, + tx: UnboundedSender, + rx: Arc>>, parallelism: usize, } @@ -47,15 +49,14 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - let (tx, rx) = channel::unbounded(); + let (tx, rx) = mpsc::unbounded_channel(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, - dashmap: DashMap::new(), tx, - rx, + rx: Arc::new(Mutex::new(rx)), parallelism, }) } @@ -63,7 +64,7 @@ impl MessageHandler { async fn connect(&self, request: SubscribeRequest) -> Result<()> { (|| async { let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - + let dashmap = DashMap::new(); subscribe_tx .send(request.clone()) .await @@ -73,10 +74,10 @@ impl MessageHandler { match message { Ok(msg) => match msg.update_oneof { Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); }, Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + if let Some((_, transactions)) = dashmap.remove(&slot.slot) { for tx in transactions { self.tx.send(tx)?; } @@ -119,7 +120,6 @@ impl MessageHandler { } }); - let rx = self.rx.clone(); let processor = self.processor; let process_task = task::spawn(async move { @@ -127,22 +127,22 @@ impl MessageHandler { loop { let processor = processor.clone(); + let mut rx = self.rx.lock().await; while set.len() >= self.parallelism { match set.join_next().await { - Some(Err(e)) => bail!("failed to join task {:?}", e), + Some(Err(e)) => { + return Result::<(), Error>::Err(anyhow!( + "failed to join task {:?}", + e + )); + }, Some(Ok(_)) | None => (), } } - match rx.recv() { - Ok(tx) => { - set.spawn(processor.process_transaction(tx)); - }, - Err(e) => { - error!("{:?}", e); - return Result::<(), Error>::Err(e.into()); - }, + if let Some(tx) = rx.recv().await { + set.spawn(processor.process_transaction(tx)); } } });