Skip to content

Commit

Permalink
Merge pull request #4507 from chenyukang/yukang-fix-websocket-perf-issue
Browse files Browse the repository at this point in the history
[Backport] Fix websocket subscription performance issue
  • Loading branch information
zhangsoledad authored Jun 28, 2024
2 parents 65e76cf + 972a901 commit e3741f2
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions rpc/src/module/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use async_trait::async_trait;
use broadcast::error::RecvError;
use ckb_async_runtime::Handle;
use ckb_jsonrpc_types::Topic;
use ckb_logger::error;
use ckb_notify::NotifyController;
use ckb_notify::NOTIFY_CHANNEL_SIZE;
use ckb_stop_handler::new_tokio_exit_rx;
use futures_util::{stream::BoxStream, Stream};
use jsonrpc_core::Result;
Expand Down Expand Up @@ -211,9 +214,19 @@ impl SubscriptionRpc for SubscriptionRpcImpl {
};
let mut rx = tx.subscribe();
Ok(Box::pin(async_stream::stream! {
while let Ok(msg) = rx.recv().await {
yield msg;
}
loop {
match rx.recv().await {
Ok(msg) => {
yield msg;
}
Err(RecvError::Lagged(cnt)) => {
error!("subscription lagged error: {:?}", cnt);
}
Err(RecvError::Closed) => {
break;
}
}
}
}))
}
}
Expand All @@ -232,11 +245,11 @@ impl SubscriptionRpcImpl {
let mut reject_transaction_receiver = handle
.block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));

let (new_tip_header_sender, _) = broadcast::channel(10);
let (new_tip_block_sender, _) = broadcast::channel(10);
let (proposed_transaction_sender, _) = broadcast::channel(10);
let (new_transaction_sender, _) = broadcast::channel(10);
let (new_reject_transaction_sender, _) = broadcast::channel(10);
let (new_tip_header_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_tip_block_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (proposed_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_reject_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);

let stop_rx = new_tokio_exit_rx();
handle.spawn({
Expand Down Expand Up @@ -266,7 +279,10 @@ impl SubscriptionRpcImpl {
_ = stop_rx.cancelled() => {
break;
},
else => break,
else => {
error!("SubscriptionRpcImpl tokio::select! unexpected error");
break;
}
}
}
}
Expand Down

0 comments on commit e3741f2

Please sign in to comment.