Skip to content

Commit

Permalink
use tokio unbounded channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdul Basit committed Oct 4, 2023
1 parent 092b385 commit 6f7d4c2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 36 deletions.
15 changes: 0 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dashmap = "5.4.0"
spl-token = "=3.5.0"
solana-client = "1.14"
backoff = { version = "0.4.0", features = ["tokio"] }
crossbeam = "0.8.2"

[dependencies.hub-core]
package = "holaplex-hub-core"
Expand Down
40 changes: 20 additions & 20 deletions indexer/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use crossbeam::channel::{self, Receiver, Sender};
use dashmap::DashMap;
use futures::{sink::SinkExt, stream::StreamExt};
use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents};
Expand All @@ -10,6 +9,10 @@ use hub_core::{
producer::Producer,
tokio::{
self,
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
},
task::{self, JoinSet},
},
};
Expand All @@ -25,9 +28,8 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector};
pub struct MessageHandler {
connector: GeyserGrpcConnector,
processor: Processor,
dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>>,
tx: Sender<SubscribeUpdateTransaction>,
rx: Receiver<SubscribeUpdateTransaction>,
tx: UnboundedSender<SubscribeUpdateTransaction>,
rx: Arc<Mutex<UnboundedReceiver<SubscribeUpdateTransaction>>>,
parallelism: usize,
}

Expand All @@ -47,23 +49,22 @@ impl MessageHandler {

let rpc = Arc::new(RpcClient::new(solana_endpoint));
let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token);
let (tx, rx) = channel::unbounded();
let (tx, rx) = mpsc::unbounded_channel();
let processor = Processor::new(db, rpc, producer);

Ok(Self {
connector,
processor,
dashmap: DashMap::new(),
tx,
rx,
rx: Arc::new(Mutex::new(rx)),
parallelism,
})
}

async fn connect(&self, request: SubscribeRequest) -> Result<()> {
(|| async {
let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?;

let dashmap = DashMap::new();
subscribe_tx
.send(request.clone())
.await
Expand All @@ -73,10 +74,10 @@ impl MessageHandler {
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) {
if let Some((_, transactions)) = dashmap.remove(&slot.slot) {
for tx in transactions {
self.tx.send(tx)?;
}
Expand Down Expand Up @@ -119,30 +120,29 @@ impl MessageHandler {
}
});

let rx = self.rx.clone();
let processor = self.processor;

let process_task = task::spawn(async move {
let mut set = JoinSet::new();

loop {
let processor = processor.clone();
let mut rx = self.rx.lock().await;

while set.len() >= self.parallelism {
match set.join_next().await {
Some(Err(e)) => bail!("failed to join task {:?}", e),
Some(Err(e)) => {
return Result::<(), Error>::Err(anyhow!(
"failed to join task {:?}",
e
));
},
Some(Ok(_)) | None => (),
}
}

match rx.recv() {
Ok(tx) => {
set.spawn(processor.process_transaction(tx));
},
Err(e) => {
error!("{:?}", e);
return Result::<(), Error>::Err(e.into());
},
if let Some(tx) = rx.recv().await {
set.spawn(processor.process_transaction(tx));
}
}
});
Expand Down

0 comments on commit 6f7d4c2

Please sign in to comment.