Skip to content

Commit

Permalink
fix: fix feeler flags fetch fail
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Jan 17, 2025
1 parent c92c849 commit e625cf0
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 42 deletions.
22 changes: 4 additions & 18 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1531,27 +1531,13 @@ pub enum TransportType {
Wss,
}

impl<'a> From<TransportType> for p2p::multiaddr::Protocol<'a> {
fn from(value: TransportType) -> Self {
match value {
TransportType::Ws => Protocol::Ws,
TransportType::Wss => Protocol::Wss,
_ => unreachable!(),
}
}
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
let mut iter = addr.iter();

iter.find_map(|proto| {
if let Protocol::Ws = proto {
Some(TransportType::Ws)
} else if let Protocol::Wss = proto {
Some(TransportType::Wss)
} else {
None
}
iter.find_map(|proto| match proto {
Protocol::Ws => Some(TransportType::Ws),
Protocol::Wss => Some(TransportType::Wss),
_ => None,
})
.unwrap_or(TransportType::Tcp)
}
25 changes: 21 additions & 4 deletions network/src/peer_registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Peer registry
use crate::network_group::Group;
use crate::peer_store::PeerStore;
use crate::Flags;
use crate::{
errors::{Error, PeerError},
extract_peer_id, Peer, PeerId, SessionType,
Expand All @@ -24,7 +25,7 @@ pub struct PeerRegistry {
// Only whitelist peers or allow all peers.
whitelist_only: bool,
whitelist_peers: HashSet<PeerId>,
feeler_peers: HashSet<PeerId>,
feeler_peers: HashMap<PeerId, Flags>,
}

/// Global network connection status
Expand Down Expand Up @@ -63,7 +64,7 @@ impl PeerRegistry {
PeerRegistry {
peers: HashMap::with_capacity_and_hasher(20, Default::default()),
whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(),
feeler_peers: HashSet::default(),
feeler_peers: HashMap::default(),
max_inbound,
max_outbound,
whitelist_only,
Expand Down Expand Up @@ -191,10 +192,26 @@ impl PeerRegistry {
/// Add feeler dail task
pub fn add_feeler(&mut self, addr: &Multiaddr) {
if let Some(peer_id) = extract_peer_id(addr) {
self.feeler_peers.insert(peer_id);
self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY);
}
}

/// Identify change feeler flags
pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool {
if let Some(peer_id) = extract_peer_id(addr) {
if let Some(i) = self.feeler_peers.get_mut(&peer_id) {
*i = flags;
return true;
}
}
false
}

/// Get feeler session flags
pub fn feeler_flags(&self, addr: &Multiaddr) -> Option<Flags> {
extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned())
}

/// Remove feeler dail task on session disconnects or fails
pub fn remove_feeler(&mut self, addr: &Multiaddr) {
if let Some(peer_id) = extract_peer_id(addr) {
Expand All @@ -205,7 +222,7 @@ impl PeerRegistry {
/// Whether this session is feeler session
pub fn is_feeler(&self, addr: &Multiaddr) -> bool {
extract_peer_id(addr)
.map(|peer_id| self.feeler_peers.contains(&peer_id))
.map(|peer_id| self.feeler_peers.contains_key(&peer_id))
.unwrap_or_default()
}

Expand Down
50 changes: 46 additions & 4 deletions network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Address manager
use crate::peer_store::types::AddrInfo;
use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr};
use p2p::{
multiaddr::{Multiaddr, Protocol},
utils::multiaddr_to_socketaddr,
};
use rand::Rng;
use std::collections::{HashMap, HashSet};

Expand Down Expand Up @@ -92,7 +95,20 @@ impl AddrManager {

/// Remove an address by ip and port
pub fn remove(&mut self, addr: &Multiaddr) -> Option<AddrInfo> {
self.addr_to_id.remove(addr).and_then(|id| {
let base_addr = addr
.iter()
.filter_map(|p| {
if matches!(
p,
Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_)
) {
None
} else {
Some(p)
}
})
.collect();
self.addr_to_id.remove(&base_addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
Expand All @@ -103,14 +119,40 @@ impl AddrManager {

/// Get an address information by ip and port
pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> {
let base_addr = addr
.iter()
.filter_map(|p| {
if matches!(
p,
Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_)
) {
None
} else {
Some(p)
}
})
.collect();
self.addr_to_id
.get(addr)
.get(&base_addr)
.and_then(|id| self.id_to_info.get(id))
}

/// Get a mutable address information by ip and port
pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> {
if let Some(id) = self.addr_to_id.get(addr) {
let base_addr = addr
.iter()
.filter_map(|p| {
if matches!(
p,
Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_)
) {
None
} else {
Some(p)
}
})
.collect();
if let Some(id) = self.addr_to_id.get(&base_addr) {
self.id_to_info.get_mut(id)
} else {
None
Expand Down
7 changes: 2 additions & 5 deletions network/src/protocols/feeler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ impl ServiceProtocol for Feeler {
.remove(&session.address);
} else if context.session.ty.is_outbound() {
let flags = self.network_state.with_peer_registry(|reg| {
if let Some(p) = reg.get_peer(session.id) {
p.identify_info
.as_ref()
.map(|i| i.flags)
.unwrap_or(Flags::COMPATIBILITY)
if let Some(p) = reg.feeler_flags(&session.address) {
p
} else {
Flags::COMPATIBILITY
}
Expand Down
7 changes: 3 additions & 4 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,9 @@ impl Callback for IdentifyCallback {
});
}

if self
.network_state
.with_peer_registry(|reg| reg.is_feeler(&context.session.address))
{
if self.network_state.with_peer_registry_mut(|reg| {
reg.change_feeler_flags(&context.session.address, flags)
}) {
let _ = context
.open_protocols(
context.session.id,
Expand Down
28 changes: 21 additions & 7 deletions network/src/services/outbound_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use ckb_logger::trace;
use ckb_systemtime::unix_time_as_millis;
use futures::{Future, StreamExt};
use p2p::runtime::{Interval, MissedTickBehavior};
use p2p::{multiaddr::MultiAddr, service::ServiceControl};
use p2p::{
multiaddr::{MultiAddr, Protocol},
service::ServiceControl,
};
use rand::prelude::IteratorRandom;
use std::{
pin::Pin,
Expand Down Expand Up @@ -71,8 +74,10 @@ impl OutboundPeerService {

for mut addr in attempt_peers.into_iter().map(|info| info.addr) {
self.network_state.dial_feeler(&self.p2p_control, {
if !matches!(self.transport_type, TransportType::Tcp) {
addr.push(self.transport_type.into());
match &self.transport_type {
TransportType::Tcp => (),
TransportType::Ws => addr.push(Protocol::Ws),
TransportType::Wss => addr.push(Protocol::Wss),
}
addr
});
Expand Down Expand Up @@ -145,17 +150,26 @@ impl OutboundPeerService {

for mut addr in peers {
self.network_state.dial_identify(&self.p2p_control, {
if !matches!(self.transport_type, TransportType::Tcp) {
addr.push(self.transport_type.into());
match &self.transport_type {
TransportType::Tcp => (),
TransportType::Ws => addr.push(Protocol::Ws),
TransportType::Wss => addr.push(Protocol::Wss),
}
addr
});
}
}

fn try_dial_whitelist(&self) {
for addr in self.network_state.config.whitelist_peers() {
self.network_state.dial_identify(&self.p2p_control, addr);
for mut addr in self.network_state.config.whitelist_peers() {
self.network_state.dial_identify(&self.p2p_control, {
match &self.transport_type {
TransportType::Tcp => (),
TransportType::Ws => addr.push(Protocol::Ws),
TransportType::Wss => addr.push(Protocol::Wss),
}
addr
});
}
}

Expand Down

0 comments on commit e625cf0

Please sign in to comment.