Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Limit the number of concurrent queries
Browse files Browse the repository at this point in the history
contrun committed Jan 15, 2025
1 parent 8177e16 commit 2a326ca
Showing 1 changed file with 113 additions and 63 deletions.
176 changes: 113 additions & 63 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,10 @@ const MIN_NUM_OF_PASSIVE_SYNCING_PEERS: usize = 3;
const NUM_SIMULTANEOUS_GET_REQUESTS: usize = 1;
const GET_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);

// The maximum number of concurrent query tasks to run. We will wait for query previous task to exit
// before start new query tasks. This is to avoid consuming to much bandwidth.
const MAX_NUM_CONCURRENT_QUERY_TASKS: usize = 10;

const QUERY_BROADCAST_MESSAGES_TIMEOUT: Duration = Duration::from_secs(20);

fn max_acceptable_gossip_message_timestamp() -> u64 {
@@ -610,7 +614,7 @@ where
messages,
))
.expect("store actor alive");
debug!("Sending new GetBroadcastMessages request after receiving response: peer_id {:?}", &state.peer_id);
trace!("Sending new GetBroadcastMessages request after receiving response: peer_id {:?}", &state.peer_id);
myself
.send_message(GossipSyncingActorMessage::NewGetRequest())
.expect("gossip syncing actor alive");
@@ -1062,6 +1066,7 @@ pub struct ExtendedGossipMessageStoreState<S> {
// A map from peer_id to the messages that need to be saved.
// Our own messages are always saved directly to the store.
messages_to_be_saved: HashMap<PeerId, HashSet<BroadcastMessage>>,
num_query_tasks_running: usize,
}

impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
@@ -1079,6 +1084,7 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
next_id: Default::default(),
output_ports: Default::default(),
messages_to_be_saved: Default::default(),
num_query_tasks_running: 0,
}
}

@@ -1177,6 +1183,90 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
self.broadcast_messages(messages);
}

fn spawn_query_tasks(&mut self, myself: &ActorRef<ExtendedGossipMessageStoreMessage>) {
if MAX_NUM_CONCURRENT_QUERY_TASKS == self.num_query_tasks_running {
return;
}
let peers_to_query = self
.messages_to_be_saved
.keys()
.take(MAX_NUM_CONCURRENT_QUERY_TASKS - self.num_query_tasks_running)
.cloned()
.collect::<Vec<_>>();
self.num_query_tasks_running = self.num_query_tasks_running + peers_to_query.len();

for peer in peers_to_query {
// The assumption is that when a peer sends us a message, it should have all the dependencies.
// So here we will send queries to the peer for the missing messages.
let incomplete_messages = self
.messages_to_be_saved
.remove(&peer)
.expect("peer is a key of hashmap");
let gossip_actor = self.gossip_actor.clone();
let myself = myself.clone();
let incomplete_messages = incomplete_messages.into_iter().collect::<Vec<_>>();

ractor::concurrency::tokio_primitives::spawn(async move {
let mut is_success = true;
let n_queries = incomplete_messages.len();
for messages in
incomplete_messages.chunks(DEFAULT_NUM_OF_BROADCAST_MESSAGE as usize)
{
let queries = messages
.iter()
.filter_map(|m| match m {
BroadcastMessage::ChannelUpdate(channel_update) => {
Some(BroadcastMessageQuery {
flags: BroadcastMessageQueryFlags::ChannelAnnouncement,
channel_outpoint: channel_update.channel_outpoint.clone(),
})
}
_ => None,
})
.collect::<Vec<_>>();
if queries.is_empty() {
continue;
}
match call!(
gossip_actor,
GossipActorMessage::QueryBroadcastMessages,
peer.clone(),
queries.to_vec()
) {
Ok(Ok(result)) => {
let mut all_messages = result.messages;
// We need also to save the incomplete messages to the store.
all_messages.extend(messages.iter().map(Clone::clone).map(Into::into));
myself
.send_message(ExtendedGossipMessageStoreMessage::SaveMessages(
peer.clone(),
all_messages,
))
.expect("actor alive");
}
Ok(Err(e)) => {
error!("Failed to query messages from peer {:?}: {:?}", peer, e);
is_success = false;
}
Err(e) => {
error!("Failed to send query to peer {:?}: {:?}", peer, e);
is_success = false;
}
};
}
myself
.send_message(ExtendedGossipMessageStoreMessage::QueryTaskDone(
QueryResult {
n_queries,
peer,
is_success,
},
))
.expect("actor alive")
});
}
}

fn get_channel_annnouncement(&self, outpoint: &OutPoint) -> Option<ChannelAnnouncement> {
self.store
.get_latest_channel_announcement(outpoint)
@@ -1438,6 +1528,20 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
state.store_and_broadcast_messages(&messages);
}

ExtendedGossipMessageStoreMessage::QueryTaskDone(QueryResult {
n_queries,
peer,
is_success,
}) => {
trace!(
n_quries = n_queries,
peer = format!("{:?}", peer),
is_success = is_success,
"Querying task done"
);
state.num_query_tasks_running = state.num_query_tasks_running - 1;
}

ExtendedGossipMessageStoreMessage::Tick => {
trace!(
"Gossip store maintenance ticked: #subscriptions = {}, #messages_to_be_saved = {}",
@@ -1448,74 +1552,19 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
// These are the messages that have complete dependencies and can be sent to the subscribers.
let complete_messages = state.prune_messages_to_be_saved().await;
state.broadcast_messages(&complete_messages);

// The assumption is that when a peer sends us a message, it should have all the dependencies.
// So here we will send queries to the peer for the missing messages.
for (peer, incomplete_messages) in state.messages_to_be_saved.drain() {
let gossip_actor = state.gossip_actor.clone();
let myself = myself.clone();
let incomplete_messages = incomplete_messages.into_iter().collect::<Vec<_>>();

ractor::concurrency::tokio_primitives::spawn(async move {
for messages in
incomplete_messages.chunks(DEFAULT_NUM_OF_BROADCAST_MESSAGE as usize)
{
let queries = messages
.iter()
.filter_map(|m| match m {
BroadcastMessage::ChannelUpdate(channel_update) => {
Some(BroadcastMessageQuery {
flags: BroadcastMessageQueryFlags::ChannelAnnouncement,
channel_outpoint: channel_update
.channel_outpoint
.clone(),
})
}
_ => None,
})
.collect::<Vec<_>>();
if queries.is_empty() {
continue;
}
match call!(
gossip_actor,
GossipActorMessage::QueryBroadcastMessages,
peer.clone(),
queries.to_vec()
) {
Ok(Ok(result)) => {
let mut all_messages = result.messages;
// We need also to save the incomplete messages to the store.
all_messages
.extend(messages.iter().map(Clone::clone).map(Into::into));
myself
.send_message(
ExtendedGossipMessageStoreMessage::SaveMessages(
peer.clone(),
all_messages,
),
)
.expect("actor alive");
}
Ok(Err(e)) => {
error!(
"Failed to query messages from peer {:?}: {:?}",
peer, e
);
}
Err(e) => {
error!("Failed to send query to peer {:?}: {:?}", peer, e);
}
}
}
});
}
state.spawn_query_tasks(&myself);
}
}
Ok(())
}
}

pub struct QueryResult {
n_queries: usize,
peer: PeerId,
is_success: bool,
}

pub enum ExtendedGossipMessageStoreMessage {
// A new subscription for gossip message updates. We will send a batch of messages to the subscriber
// via the returned output port.
@@ -1541,6 +1590,7 @@ pub enum ExtendedGossipMessageStoreMessage {
// This is normally called immediately after a new subscription is created. This is the time when
// we need to send existing messages to the subscriber.
LoadMessagesFromStore(u64, Cursor),
QueryTaskDone(QueryResult),
// A tick message that is sent periodically to check if there are any messages that are saved out of order.
// If there are, we will send them to the subscribers.
Tick,

0 comments on commit 2a326ca

Please sign in to comment.