Skip to content

Commit

Permalink
feat: add a protocol to try connecting peers behind firewalls or NAT …
Browse files Browse the repository at this point in the history
…routers
  • Loading branch information
yangby-cryptape committed Jan 2, 2025
1 parent b8cb85c commit 2c81812
Show file tree
Hide file tree
Showing 13 changed files with 1,734 additions and 36 deletions.
227 changes: 192 additions & 35 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::protocols::{
discovery::{DiscoveryAddressManager, DiscoveryProtocol},
feeler::Feeler,
identify::{Flags, IdentifyCallback, IdentifyProtocol},
penetration::{self, Penetration},
ping::PingHandler,
support_protocols::SupportProtocols,
};
Expand All @@ -25,6 +26,7 @@ use ckb_logger::{debug, error, info, trace, warn};
use ckb_spawn::Spawn;
use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken};
use ckb_systemtime::{Duration, Instant};
use ckb_types::{packed, prelude::*};
use ckb_util::{Condvar, Mutex, RwLock};
use futures::{channel::mpsc::Sender, Future};
use ipnetwork::IpNetwork;
Expand All @@ -33,7 +35,10 @@ use p2p::{
builder::ServiceBuilder,
bytes::Bytes,
context::{ServiceContext, SessionContext},
error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
error::{
DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind,
TransportErrorKind,
},
multiaddr::{Multiaddr, Protocol},
secio::{self, error::SecioError, PeerId, SecioKeyPair},
service::{
Expand Down Expand Up @@ -79,6 +84,14 @@ pub struct NetworkState {
/// includes manually public addrs and remote peer observed addrs
public_addrs: RwLock<HashSet<Multiaddr>>,
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
/// All possible addresses.
/// Different with `public_addrs`:
/// - Include remote peer observed addresses which are failed to dial.
/// - Allow loopback addresses, private addresses, and so on.
/// - So, two peers behind a private NAT could be connected with each others.
pub(crate) possible_addrs: RwLock<HashSet<Multiaddr>>,
penetrated_addrs: RwLock<HashMap<PeerId, Instant>>,

local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
pub(crate) bootnodes: Vec<Multiaddr>,
Expand Down Expand Up @@ -116,6 +129,7 @@ impl NetworkState {
})
})
.collect();
let possible_addrs = public_addrs.clone();
info!("Loading the peer store. This process may take a few seconds to complete.");

let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
Expand All @@ -139,6 +153,8 @@ impl NetworkState {
public_addrs: RwLock::new(public_addrs),
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
possible_addrs: RwLock::new(possible_addrs),
penetrated_addrs: RwLock::new(HashMap::default()),
local_private_key,
local_peer_id,
active: AtomicBool::new(true),
Expand Down Expand Up @@ -169,6 +185,7 @@ impl NetworkState {
})
})
.collect();
let possible_addrs = public_addrs.clone();
info!("Loading the peer store. This process may take a few seconds to complete.");
let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
let bootnodes = config.bootnodes();
Expand All @@ -188,6 +205,8 @@ impl NetworkState {
public_addrs: RwLock::new(public_addrs),
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
possible_addrs: RwLock::new(possible_addrs),
penetrated_addrs: RwLock::new(HashMap::default()),
local_private_key,
local_peer_id,
active: AtomicBool::new(true),
Expand Down Expand Up @@ -344,6 +363,15 @@ impl NetworkState {
.collect()
}

pub(crate) fn possible_addrs(&self, count: usize) -> Vec<Multiaddr> {
self.possible_addrs
.read()
.iter()
.take(count)
.cloned()
.collect()
}

pub(crate) fn connection_status(&self) -> ConnectionStatus {
self.peer_registry.read().connection_status()
}
Expand Down Expand Up @@ -533,13 +561,91 @@ impl NetworkState {
/// add observed address for identify protocol
pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
let mut pending_observed_addrs = self.pending_observed_addrs.write();
pending_observed_addrs.extend(iter)
pending_observed_addrs.extend(iter);
}

/// Add possible address for identify protocol
pub(crate) fn add_possible_addr(&self, addr: Multiaddr) {
let mut possible_addrs = self.possible_addrs.write();
possible_addrs.insert(addr);
}

/// Network message processing controller, default is true, if false, discard any received messages
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}

/// Try to connect to a peer which may behind firewalls or NAT routers.
pub(crate) fn try_penetrating(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
let peer_id = extract_peer_id(&addr);
if peer_id.is_none() {
warn!("Do not penetrate addr without peer id, addr: {}", addr);
return;
}
let to_peer_id = peer_id.as_ref().unwrap();
let from_peer_id = self.local_peer_id();
if from_peer_id == to_peer_id {
trace!("Do not penetrate self: {:?}, {}", peer_id, addr);
return;
}
if self.public_addrs.read().contains(&addr) {
trace!(
"Do not penetrate listened address(self): {:?}, {}",
peer_id,
addr
);
return;
}
if self
.peer_registry
.read()
.get_key_by_peer_id(to_peer_id)
.is_some()
{
trace!(
"Do not penetrate peer which in registry: {:?}, {}",
peer_id,
addr
);
return;
}
if let Some(last_penetrated) = self.penetrated_addrs.read().get(to_peer_id) {
if Instant::now().saturating_duration_since(*last_penetrated)
< penetration::PENETRATED_INTERVAL
{
trace!(
"Do not penetrate peer which already penetrated a moment ago: {:?}, {}",
peer_id,
addr
);
return;
}
}
let conn_req = {
let listen_addrs = {
let reader = self.possible_addrs(penetration::ADDRS_COUNT_LIMIT);
let iter = reader
.iter()
.map(Multiaddr::to_vec)
.map(|v| packed::Address::new_builder().bytes(v.pack()).build());
packed::AddressVec::new_builder().extend(iter).build()
};
let content = packed::ConnectionRequest::new_builder()
.from(from_peer_id.as_bytes().pack())
.to(to_peer_id.as_bytes().pack())
.ttl(penetration::MAX_TTL.into())
.listen_addrs(listen_addrs)
.build();
packed::PenetrationMessage::new_builder()
.set(content)
.build()
};
let proto_id = SupportProtocols::Penetration.protocol_id();
self.penetrated_addrs
.write()
.insert(to_peer_id.clone(), Instant::now());
let _ignore_result = p2p_control.try_broadcast(false, None, proto_id, conn_req.as_bytes());
}
}

/// Used to handle global events of tentacle, such as session open/close
Expand Down Expand Up @@ -613,16 +719,14 @@ impl ServiceHandle for EventHandler {
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
match error {
ServiceError::DialerError { address, error } => {
let mut public_addrs = self.network_state.public_addrs.write();

match error {
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
SecioError::ConnectSelf,
)) => {
debug!("dial observed address success: {:?}", address);
if let Some(ip) = multiaddr_to_socketaddr(&address) {
if is_reachable(ip.ip()) {
public_addrs.insert(address);
self.network_state.public_addrs.write().insert(address);
}
}
return;
Expand All @@ -632,11 +736,18 @@ impl ServiceHandle for EventHandler {
{
warn!("DialerError({}) {}", address, e);
}
DialerErrorKind::TransportError(TransportErrorKind::Io(e))
if e.kind() == std::io::ErrorKind::TimedOut =>
{
warn!("DialerError({}) {}, try penetrating", address, e);
self.network_state
.try_penetrating(&context.control().clone().into(), address.clone());
}
_ => {
debug!("DialerError({}) {}", address, error);
}
}
public_addrs.remove(&address);
self.network_state.public_addrs.write().remove(&address);
self.network_state.dial_failed(&address);
}
ServiceError::ProtocolError {
Expand Down Expand Up @@ -926,6 +1037,20 @@ impl NetworkService {
protocol_metas.push(disconnect_message_meta);
}

// Penetration protocol
if config
.support_protocols
.contains(&SupportProtocol::Penetration)
{
let feeler_meta = SupportProtocols::Penetration.build_meta_with_service_handle({
let network_state = Arc::clone(&network_state);
move || {
ProtocolHandle::Callback(Box::new(Penetration::new(Arc::clone(&network_state))))
}
});
protocol_metas.push(feeler_meta);
}

let mut service_builder = ServiceBuilder::default();
let yamux_config = YamuxConfig {
max_stream_count: protocol_metas.len(),
Expand Down Expand Up @@ -1409,35 +1534,8 @@ impl NetworkController {
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind> {
let now = Instant::now();
loop {
let target = target
.map(TargetSession::Single)
.unwrap_or(TargetSession::All);
let result = if quick {
self.p2p_control
.quick_filter_broadcast(target, proto_id, data.clone())
} else {
self.p2p_control
.filter_broadcast(target, proto_id, data.clone())
};
match result {
Ok(()) => {
return Ok(());
}
Err(SendErrorKind::WouldBlock) => {
if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
warn!("Broadcast message to {} timeout", proto_id);
return Err(SendErrorKind::WouldBlock);
}
thread::sleep(P2P_TRY_SEND_INTERVAL);
}
Err(err) => {
warn!("Broadcast message to {} failed: {:?}", proto_id, err);
return Err(err);
}
}
}
self.p2p_control
.try_broadcast(quick, target, proto_id, data)
}

/// Broadcast a message to all connected peers
Expand Down Expand Up @@ -1536,3 +1634,62 @@ pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
TransportType::Tcp
}
}

pub(crate) trait ServiceControlExt {
fn try_broadcast(
&self,
quick: bool,
target: Option<SessionId>,
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind>;

fn try_forward(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind> {
self.try_broadcast(false, Some(session_id), proto_id, data)
}
}

impl ServiceControlExt for ServiceControl {
fn try_broadcast(
&self,
quick: bool,
target: Option<SessionId>,
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind> {
let now = Instant::now();
loop {
let target = target
.map(TargetSession::Single)
.unwrap_or(TargetSession::All);
let result = if quick {
self.quick_filter_broadcast(target, proto_id, data.clone())
} else {
self.filter_broadcast(target, proto_id, data.clone())
};
match result {
Ok(()) => {
trace!("Broadcast message to {} success", proto_id);
return Ok(());
}
Err(SendErrorKind::WouldBlock) => {
if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
warn!("Broadcast message to {} timeout", proto_id);
return Err(SendErrorKind::WouldBlock);
}
trace!("Broadcast message to {} pending for an interval", proto_id);
thread::sleep(P2P_TRY_SEND_INTERVAL);
}
Err(err) => {
warn!("Broadcast message to {} failed: {:?}", proto_id, err);
return Err(err);
}
}
}
}
}
7 changes: 7 additions & 0 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub trait Callback: Clone + Send {
fn add_remote_listen_addrs(&mut self, session: &SessionContext, addrs: Vec<Multiaddr>);
/// Add our address observed by remote peer
fn add_observed_addr(&mut self, addr: Multiaddr, ty: SessionType) -> MisbehaveResult;
/// Add all possible address observed by remote peer
fn add_possible_addr(&mut self, addr: Multiaddr);
/// Report misbehavior
fn misbehave(&mut self, session: &SessionContext, kind: Misbehavior) -> MisbehaveResult;
}
Expand Down Expand Up @@ -166,6 +168,7 @@ impl<T: Callback> IdentifyProtocol<T> {
.remote_infos
.get_mut(&session.id)
.expect("RemoteInfo must exists");
self.callback.add_possible_addr(observed.clone());
let global_ip_only = self.global_ip_only;
if multiaddr_to_socketaddr(&observed)
.map(|socket_addr| socket_addr.ip())
Expand Down Expand Up @@ -570,6 +573,10 @@ impl Callback for IdentifyCallback {
MisbehaveResult::Continue
}

fn add_possible_addr(&mut self, addr: Multiaddr) {
self.network_state.add_possible_addr(addr);
}

fn misbehave(&mut self, session: &SessionContext, reason: Misbehavior) -> MisbehaveResult {
error!(
"IdentifyProtocol detects abnormal behavior, session: {:?}, reason: {:?}",
Expand Down
1 change: 1 addition & 0 deletions network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(crate) mod disconnect_message;
pub(crate) mod discovery;
pub(crate) mod feeler;
pub(crate) mod identify;
pub(crate) mod penetration;
pub(crate) mod ping;
pub(crate) mod support_protocols;

Expand Down
Loading

0 comments on commit 2c81812

Please sign in to comment.