Skip to content

Commit

Permalink
Merge to the latest
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Jan 15, 2024
2 parents 8fd5434 + 6187597 commit 1dc77cf
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 204 deletions.
138 changes: 70 additions & 68 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use crate::{
error::{Discv5Error, RequestError},
packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind, ProtocolIdentity},
rpc::{
Message, Notification, Payload, Request, RequestBody, RequestId, Response, ResponseBody,
Message, Payload, RelayInitNotification, RelayMsgNotification, Request, RequestBody,
RequestId, Response, ResponseBody,
},
socket,
socket::{FilterConfig, Outbound, Socket},
Expand Down Expand Up @@ -114,8 +115,8 @@ pub enum HandlerIn {
WhoAreYou(WhoAreYouRef, Option<Enr>),

/// A response to a [`HandlerOut::FindHolePunchEnr`]. Returns the ENR and the
/// [`Notification::RelayMsg`] we intend to relay to that peer.
HolePunchEnr(Enr, Notification),
/// [`RelayInitNotification`] from [`HandlerOut::FindHolePunchEnr`].
HolePunchEnr(Enr, RelayInitNotification),

/// Observed socket has been update. The old socket and the current socket.
SocketUpdate(Option<SocketAddr>, SocketAddr),
Expand Down Expand Up @@ -147,8 +148,17 @@ pub enum HandlerOut {
RequestFailed(RequestId, RequestError),

/// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the
/// [`Notification::RelayMsg`] we intend to send to it.
FindHolePunchEnr(NodeId, Notification),
/// [`RelayMsgNotification`] we intend to send to it.
FindHolePunchEnr(RelayInitNotification),

/// Triggers a ping to all peers, outside of the regular ping interval. Needed to trigger
/// renewed session establishment after updating the local ENR from unreachable to reachable
/// and clearing all sessions. Only this way does the local node have a chance to make it into
/// its peers kbuckets before the session expires (defaults to 24 hours). This is the case
/// since its peers, running this implementation, will only respond to PINGs from nodes in its
/// kbucktes and unreachable ENRs don't make it into kbuckets upon [`HandlerOut::Established`]
/// event.
PingAllPeers,
}

/// How we connected to the node.
Expand Down Expand Up @@ -372,7 +382,10 @@ impl Handler {
}
HandlerIn::Response(dst, response) => self.send_response::<P>(dst, *response).await,
HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge::<P>(wru_ref, enr).await,
HandlerIn::HolePunchEnr(tgt_enr, relay_msg_notif) => {
HandlerIn::HolePunchEnr(tgt_enr, relay_init) => {
// Assemble the notification for the target
let (inr_enr, _tgt, timed_out_nonce) = relay_init.into();
let relay_msg_notif = RelayMsgNotification::new(inr_enr, timed_out_nonce);
if let Err(e) = self.send_relay_msg_notif::<P>(tgt_enr, relay_msg_notif).await {
warn!("Failed to relay. Error: {}", e);
}
Expand All @@ -381,14 +394,22 @@ impl Handler {
let ip = socket.ip();
let port = socket.port();
if old_socket.is_none() {
// This node goes from being unreachable to being reachable. Remove
// its sessions to trigger a WHOAREYOU from peers on next sent
// This node goes from being unreachable to being reachable, but
// keeps the same enr key (hence same node id). Remove its
// sessions to trigger a WHOAREYOU from peers on next sent
// message. If the peer is running this implementation of
// discovery, this makes it possible for the local node to be
// inserted into its peers' kbuckets before the session they
// already had expires. Session duration, in this impl defaults to
// 24 hours.
self.sessions.clear()
self.sessions.clear();
if let Err(e) = self
.service_send
.send(HandlerOut::PingAllPeers)
.await
{
warn!("Failed to inform that request failed {}", e);
}
}
self.nat_utils.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port));
}
Expand Down Expand Up @@ -1113,39 +1134,35 @@ impl Handler {

match message {
Message::Response(response) => self.handle_response::<P>(node_address, response).await,
Message::Notification(notification) => match notification {
Notification::RelayInit(initiator, target, timed_out_nonce) => {
let initiator_node_id = initiator.node_id();
if initiator_node_id != node_address.node_id {
warn!("peer {node_address} tried to initiate hole punch attempt for another node {initiator_node_id}, banning peer {node_address}");
self.fail_session(&node_address, RequestError::MaliciousRelayInit, true)
.await;
let ban_timeout = self
.nat_utils
.ban_duration
.map(|v| Instant::now() + v);
PERMIT_BAN_LIST.write().ban(node_address, ban_timeout);
} else if let Err(e) = self.on_relay_init(initiator, target, timed_out_nonce).await {
warn!("failed handling notification to relay for {node_address}, {e}");
}
Message::RelayInitNotification(notification) => {
let initiator_node_id = notification.initiator_enr().node_id();
if initiator_node_id != node_address.node_id {
warn!("peer {node_address} tried to initiate hole punch attempt for another node {initiator_node_id}, banning peer {node_address}");
self.fail_session(&node_address, RequestError::MaliciousRelayInit, true)
.await;
let ban_timeout = self
.nat_utils
.ban_duration
.map(|v| Instant::now() + v);
PERMIT_BAN_LIST.write().ban(node_address, ban_timeout);
} else if let Err(e) = self.on_relay_init(notification).await {
warn!("failed handling notification to relay for {node_address}, {e}");
}
Notification::RelayMsg(initiator, timed_out_nonce) => {
match self.nat_utils.is_behind_nat {
Some(false) => {
// The iniator may not be malicious and initiated a hole punch attempt when
// a request to this node timed out for another reason
debug!("peer {node_address} relayed a hole punch notification but we are not behind Nat");
}
_ => {
if let Err(e) = self.on_relay_msg(initiator, timed_out_nonce).await {
warn!(
"failed handling notification relayed from {node_address}, {e}"
);
}
}
Message::RelayMsgNotification(notification) => {
match self.nat_utils.is_behind_nat {
Some(false) => {
// inr may not be malicious and initiated a hole punch attempt when
// a request to this node timed out for another reason
debug!("peer {node_address} relayed a hole punch notification but we are not behind nat");
}
_ => {
if let Err(e) = self.on_relay_msg::<P>(notification).await {
warn!("failed handling notification relayed from {node_address}, {e}");
}
}
}
},
}
Message::Request(_) => {
warn!(
"Peer sent message type {} that shouldn't be sent in packet type `Session Message`, {}",
Expand Down Expand Up @@ -1224,7 +1241,7 @@ impl Handler {
warn!("Received a response in a `Message` packet, should be sent in a `SessionMessage`");
self.handle_response::<P>(node_address, response).await
}
Message::Notification(_) => {
Message::RelayInitNotification(_) | Message::RelayMsgNotification(_) => {
warn!(
"Peer sent message type {} that shouldn't be sent in packet type `Message`, {}",
message.msg_type(),
Expand Down Expand Up @@ -1582,7 +1599,7 @@ impl HolePunchNat for Handler {
}
if let Some(session) = self.sessions.get_mut(&relay) {
let relay_init_notif =
Notification::RelayInit(local_enr, target_node_address.node_id, timed_out_nonce);
RelayInitNotification::new(local_enr, target_node_address.node_id, timed_out_nonce);
trace!(
"Sending notif to relay {}. relay init: {}",
relay.node_id,
Expand Down Expand Up @@ -1613,41 +1630,33 @@ impl HolePunchNat for Handler {

async fn on_relay_init(
&mut self,
initr: Enr,
tgt: NodeId,
timed_out_nonce: MessageNonce,
relay_init: RelayInitNotification,
) -> Result<(), NatError> {
// Assemble the notification for the target
let relay_msg_notif = Notification::RelayMsg(initr, timed_out_nonce);

// Check for target peer in our kbuckets otherwise drop notification.
if let Err(e) = self
.service_send
.send(HandlerOut::FindHolePunchEnr(tgt, relay_msg_notif))
.send(HandlerOut::FindHolePunchEnr(relay_init))
.await
{
return Err(NatError::Relay(e.into()));
}
Ok(())
}

async fn on_relay_msg(
async fn on_relay_msg<P: ProtocolIdentity>(
&mut self,
initr: Enr,
timed_out_nonce: MessageNonce,
relay_msg: RelayMsgNotification,
) -> Result<(), NatError> {
let (inr_enr, timed_out_msg_nonce) = relay_msg.into();
let initiator_node_address =
match NodeContact::try_from_enr(initr, self.nat_utils.ip_mode) {
match NodeContact::try_from_enr(inr_enr, self.nat_utils.ip_mode) {
Ok(contact) => contact.node_address(),
Err(e) => return Err(NatError::Target(e.into())),
};

// A session may already have been established.
if self.sessions.get(&initiator_node_address).is_some() {
trace!(
"Session already established with initiator: {}",
initiator_node_address
);
trace!("Session already established with initiator: {initiator_node_address}");
return Ok(());
}
// Possibly, an attempt to punch this hole, using another relay, is in progress.
Expand All @@ -1656,29 +1665,22 @@ impl HolePunchNat for Handler {
.get(&initiator_node_address)
.is_some()
{
trace!(
"WHOAREYOU packet already sent to initiator: {}",
initiator_node_address
);
trace!("WHOAREYOU packet already sent to initiator: {initiator_node_address}");
return Ok(());
}

// If not hole punch attempts are in progress, spawn a WHOAREYOU event to punch a hole in
// our NAT for initiator.
let whoareyou_ref = WhoAreYouRef(initiator_node_address, timed_out_nonce);
if let Err(e) = self
.service_send
.send(HandlerOut::WhoAreYou(whoareyou_ref))
.await
{
return Err(NatError::Target(e.into()));
}
let whoareyou_ref = WhoAreYouRef(initiator_node_address, timed_out_msg_nonce);
self.send_challenge::<P>(whoareyou_ref, None).await;

Ok(())
}

async fn send_relay_msg_notif<P: ProtocolIdentity>(
&mut self,
tgt_enr: Enr,
relay_msg_notif: Notification,
relay_msg_notif: RelayMsgNotification,
) -> Result<(), NatError> {
let tgt_node_address =
match NodeContact::try_from_enr(tgt_enr, self.nat_utils.ip_mode) {
Expand Down
21 changes: 8 additions & 13 deletions src/handler/nat_hole_punch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::net::SocketAddr;

use enr::NodeId;

use crate::{
node_info::NodeAddress, packet::MessageNonce, rpc::Notification, Enr, ProtocolIdentity,
node_info::NodeAddress,
packet::MessageNonce,
rpc::{RelayInitNotification, RelayMsgNotification},
Enr, ProtocolIdentity,
};

mod error;
Expand All @@ -27,26 +28,20 @@ pub trait HolePunchNat {

/// A RelayInit notification is received over discv5 indicating this node is the relay. Should
/// trigger sending a RelayMsg to the target.
async fn on_relay_init(
&mut self,
initr: Enr,
tgt: NodeId,
timed_out_nonce: MessageNonce,
) -> Result<(), Error>;
async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), Error>;

/// A RelayMsg notification is received over discv5 indicating this node is the target. Should
/// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg.
async fn on_relay_msg(
async fn on_relay_msg<P: ProtocolIdentity>(
&mut self,
initr: Enr,
timed_out_nonce: MessageNonce,
relay_msg: RelayMsgNotification,
) -> Result<(), Error>;

/// Send a RELAYMSG notification.
async fn send_relay_msg_notif<P: ProtocolIdentity>(
&mut self,
tgt_enr: Enr,
relay_msg_notif: Notification,
relay_msg_notif: RelayMsgNotification,
) -> Result<(), Error>;

/// A hole punched for a peer closes. Should trigger an empty packet to be sent to the
Expand Down
Loading

0 comments on commit 1dc77cf

Please sign in to comment.