Skip to content

Commit

Permalink
feat: support multiple shared caps (#5363)
Browse files Browse the repository at this point in the history
Co-authored-by: Emilia Hane <[email protected]>
  • Loading branch information
mattsse and emhane authored Nov 8, 2023
1 parent e897764 commit 011494d
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 35 deletions.
15 changes: 14 additions & 1 deletion crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! All capability related types

use crate::{version::ParseVersionError, EthMessage, EthVersion};
use crate::{
p2pstream::MAX_RESERVED_MESSAGE_ID, version::ParseVersionError, EthMessage, EthVersion,
};
use alloy_rlp::{Decodable, Encodable, RlpDecodable, RlpEncodable};
use reth_codecs::add_arbitrary_tests;
use reth_primitives::bytes::{BufMut, Bytes};
Expand Down Expand Up @@ -227,6 +229,11 @@ impl SharedCapability {
}
}

/// Returns true if the capability is eth.
pub fn is_eth(&self) -> bool {
matches!(self, SharedCapability::Eth { .. })
}

/// Returns the version of the capability.
pub fn version(&self) -> u8 {
match self {
Expand All @@ -243,6 +250,12 @@ impl SharedCapability {
}
}

/// Returns the message ID offset of the current capability relative to the start of the
/// capability message ID suffix.
pub fn offset_rel_caps_suffix(&self) -> u8 {
self.offset() - MAX_RESERVED_MESSAGE_ID - 1
}

/// Returns the number of protocol messages supported by this capability.
pub fn num_messages(&self) -> Result<u8, SharedCapabilityError> {
match self {
Expand Down
4 changes: 3 additions & 1 deletion crates/net/eth-wire/src/errors/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ pub enum P2PStreamError {
#[error("ping timed out with")]
PingTimeout,
#[error(transparent)]
ParseVersionError(#[from] SharedCapabilityError),
ParseSharedCapability(#[from] SharedCapabilityError),
#[error("capability not supported on stream to this peer")]
CapabilityNotShared,
#[error("mismatched protocol version in Hello message: {0}")]
MismatchedProtocolVersion(GotExpected<ProtocolVersion>),
#[error("started ping task before the handshake completed")]
Expand Down
109 changes: 81 additions & 28 deletions crates/net/eth-wire/src/p2pstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024;

/// [`MAX_RESERVED_MESSAGE_ID`] is the maximum message ID reserved for the `p2p` subprotocol. If
/// there are any incoming messages with an ID greater than this, they are subprotocol messages.
const MAX_RESERVED_MESSAGE_ID: u8 = 0x0f;
pub const MAX_RESERVED_MESSAGE_ID: u8 = 0x0f;

/// [`MAX_P2P_MESSAGE_ID`] is the maximum message ID in use for the `p2p` subprotocol.
const MAX_P2P_MESSAGE_ID: u8 = P2PMessageID::Pong as u8;
Expand Down Expand Up @@ -159,7 +159,7 @@ where

// determine shared capabilities (currently returns only one capability)
let capability_res =
set_capability_offsets(hello.capabilities, their_hello.capabilities.clone());
SharedCapabilities::try_new(hello.capabilities, their_hello.capabilities.clone());

let shared_capability = match capability_res {
Err(err) => {
Expand Down Expand Up @@ -207,6 +207,27 @@ where

/// A P2PStream wraps over any `Stream` that yields bytes and makes it compatible with `p2p`
/// protocol messages.
///
/// This stream supports multiple shared capabilities, that were negotiated during the handshake.
///
/// ### Message-ID based multiplexing
///
/// > Each capability is given as much of the message-ID space as it needs. All such capabilities
/// > must statically specify how many message IDs they require. On connection and reception of the
/// > Hello message, both peers have equivalent information about what capabilities they share
/// > (including versions) and are able to form consensus over the composition of message ID space.
///
/// > Message IDs are assumed to be compact from ID 0x10 onwards (0x00-0x0f is reserved for the
/// > "p2p" capability) and given to each shared (equal-version, equal-name) capability in
/// > alphabetic order. Capability names are case-sensitive. Capabilities which are not shared are
/// > ignored. If multiple versions are shared of the same (equal name) capability, the numerically
/// > highest wins, others are ignored.
///
/// See also <https://github.com/ethereum/devp2p/blob/master/rlpx.md#message-id-based-multiplexing>
///
/// This stream emits Bytes that start with the normalized message id, so that the first byte of
/// each message starts from 0. If this stream only supports a single capability, for example `eth`
/// then the first byte of each message will match [EthMessageID](crate::types::EthMessageID).
#[pin_project]
#[derive(Debug)]
pub struct P2PStream<S> {
Expand All @@ -223,7 +244,7 @@ pub struct P2PStream<S> {
pinger: Pinger,

/// The supported capability for this stream.
shared_capability: SharedCapability,
shared_capabilities: SharedCapabilities,

/// Outgoing messages buffered for sending to the underlying stream.
outgoing_messages: VecDeque<Bytes>,
Expand All @@ -241,13 +262,13 @@ impl<S> P2PStream<S> {
/// Create a new [`P2PStream`] from the provided stream.
/// New [`P2PStream`]s are assumed to have completed the `p2p` handshake successfully and are
/// ready to send and receive subprotocol messages.
pub fn new(inner: S, capability: SharedCapability) -> Self {
pub fn new(inner: S, shared_capabilities: SharedCapabilities) -> Self {
Self {
inner,
encoder: snap::raw::Encoder::new(),
decoder: snap::raw::Decoder::new(),
pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT),
shared_capability: capability,
shared_capabilities,
outgoing_messages: VecDeque::new(),
outgoing_message_buffer_capacity: MAX_P2P_CAPACITY,
disconnecting: false,
Expand All @@ -268,9 +289,12 @@ impl<S> P2PStream<S> {
self.outgoing_message_buffer_capacity = capacity;
}

/// Returns the shared capability for this stream.
pub fn shared_capability(&self) -> &SharedCapability {
&self.shared_capability
/// Returns the shared capabilities for this stream.
///
/// This includes all the shared capabilities that were negotiated during the handshake and
/// their offsets based on the number of messages of each capability.
pub fn shared_capabilities(&self) -> &SharedCapabilities {
&self.shared_capabilities
}

/// Returns `true` if the connection is about to disconnect.
Expand Down Expand Up @@ -460,7 +484,7 @@ where
// * `eth/67` is reserved message IDs 0x10 - 0x19.
// * `qrs/65` is reserved message IDs 0x1a - 0x21.
//
decompress_buf[0] = bytes[0] - this.shared_capability.offset();
decompress_buf[0] = bytes[0] - MAX_RESERVED_MESSAGE_ID - 1;

return Poll::Ready(Some(Ok(decompress_buf)))
}
Expand Down Expand Up @@ -539,7 +563,7 @@ where

// all messages sent in this stream are subprotocol messages, so we need to switch the
// message id based on the offset
compressed[0] = item[0] + this.shared_capability.offset();
compressed[0] = item[0] + MAX_RESERVED_MESSAGE_ID + 1;
this.outgoing_messages.push_back(compressed.freeze());

Ok(())
Expand Down Expand Up @@ -571,16 +595,50 @@ where
}
}

/// Non-empty ordered list of recognized shared capabilities.
#[derive(Debug)]
pub struct SharedCapabilities(Vec<SharedCapability>);

impl SharedCapabilities {
/// Merges the local and peer capabilities and returns a new [`SharedCapabilities`] instance.
pub fn try_new(
local_capabilities: Vec<Capability>,
peer_capabilities: Vec<Capability>,
) -> Result<Self, P2PStreamError> {
Ok(Self(set_capability_offsets(local_capabilities, peer_capabilities)?))
}

/// Iterates over the shared capabilities.
pub fn iter_caps(&self) -> impl Iterator<Item = &SharedCapability> {
self.0.iter()
}

/// Returns the eth capability if it is shared.
pub fn eth(&self) -> Result<&SharedCapability, P2PStreamError> {
for cap in self.iter_caps() {
if cap.is_eth() {
return Ok(cap)
}
}
Err(P2PStreamError::CapabilityNotShared)
}

/// Returns the negotiated eth version if it is shared.
pub fn eth_version(&self) -> Result<u8, P2PStreamError> {
self.eth().map(|cap| cap.version())
}
}

/// Determines the offsets for each shared capability between the input list of peer
/// capabilities and the input list of locally supported capabilities.
///
/// Currently only `eth` versions 66 and 67 are supported.
/// Currently only `eth` versions 66, 67, 68 are supported.
/// Additionally, the `p2p` capability version 5 is supported, but is
/// expected _not_ to be in neither `local_capabilities` or `peer_capabilities`.
pub fn set_capability_offsets(
local_capabilities: Vec<Capability>,
peer_capabilities: Vec<Capability>,
) -> Result<SharedCapability, P2PStreamError> {
) -> Result<Vec<SharedCapability>, P2PStreamError> {
// find intersection of capabilities
let our_capabilities = local_capabilities.into_iter().collect::<HashSet<_>>();

Expand All @@ -597,8 +655,7 @@ pub fn set_capability_offsets(
// This would cause the peers to send messages with the wrong message id, which is usually a
// protocol violation.
//
// The `Ord` implementation for `SmolStr` (used here) currently delegates to rust's `Ord`
// implementation for `str`, which also orders strings lexicographically.
// The `Ord` implementation for `str` orders strings lexicographically.
let mut shared_capability_names = BTreeSet::new();

// find highest shared version of each shared capability
Expand Down Expand Up @@ -640,6 +697,8 @@ pub fn set_capability_offsets(
SharedCapability::UnknownCapability { .. } => {
// Capabilities which are not shared are ignored
debug!("unknown capability: name={:?}, version={}", name, version,);

// TODO(mattsse): track shared caps
}
SharedCapability::Eth { .. } => {
// increment the offset if the capability is known
Expand All @@ -650,17 +709,11 @@ pub fn set_capability_offsets(
}
}

// TODO: support multiple capabilities - we would need a new Stream type to go on top of
// `P2PStream` containing its capability. `P2PStream` would still send pings and handle
// pongs, but instead contain a map of capabilities to their respective stream / channel.
// Each channel would be responsible for containing the offset for that stream and would
// only increment / decrement message IDs.
// NOTE: since the `P2PStream` currently only supports one capability, we set the
// capability with the lowest offset.
Ok(shared_with_offsets
.first()
.ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))?
.clone())
if shared_with_offsets.is_empty() {
return Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))
}

Ok(shared_with_offsets)
}

/// This represents only the reserved `p2p` subprotocol messages.
Expand Down Expand Up @@ -928,7 +981,7 @@ mod tests {

// ensure that the two share a single capability, eth67
assert_eq!(
p2p_stream.shared_capability,
*p2p_stream.shared_capabilities.iter_caps().next().unwrap(),
SharedCapability::Eth {
version: EthVersion::Eth67,
offset: MAX_RESERVED_MESSAGE_ID + 1
Expand All @@ -946,7 +999,7 @@ mod tests {

// ensure that the two share a single capability, eth67
assert_eq!(
p2p_stream.shared_capability,
*p2p_stream.shared_capabilities.iter_caps().next().unwrap(),
SharedCapability::Eth {
version: EthVersion::Eth67,
offset: MAX_RESERVED_MESSAGE_ID + 1
Expand Down Expand Up @@ -1019,7 +1072,7 @@ mod tests {
let peer_capabilities: Vec<Capability> = vec![EthVersion::Eth66.into()];

let shared_capability =
set_capability_offsets(local_capabilities, peer_capabilities).unwrap();
set_capability_offsets(local_capabilities, peer_capabilities).unwrap()[0].clone();

assert_eq!(
shared_capability,
Expand Down
4 changes: 4 additions & 0 deletions crates/net/eth-wire/src/types/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use std::{fmt::Debug, sync::Arc};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;

/// An `eth` protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
3 changes: 1 addition & 2 deletions crates/net/eth-wire/src/types/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

use crate::capability::Capability;
use std::str::FromStr;
use thiserror::Error;

/// Error thrown when failed to parse a valid [`EthVersion`].
#[derive(Debug, Clone, PartialEq, Eq, Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("Unknown eth protocol version: {0}")]
pub struct ParseVersionError(String);

Expand Down
3 changes: 2 additions & 1 deletion crates/net/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ impl SessionError for EthStreamError {
)) |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::ParseVersionError(_) |
P2PStreamError::ParseSharedCapability(_) |
P2PStreamError::CapabilityNotShared |
P2PStreamError::Disconnected(DisconnectReason::UselessPeer) |
P2PStreamError::Disconnected(
DisconnectReason::IncompatibleP2PProtocolVersion
Expand Down
16 changes: 15 additions & 1 deletion crates/net/network/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
};

pub use reth_network_api::{Direction, PeerInfo};

/// Internal identifier for active sessions.
Expand Down Expand Up @@ -912,10 +913,23 @@ async fn authenticate_stream(
}
};

// Ensure we negotiated eth protocol
let version = match p2p_stream.shared_capabilities().eth_version() {
Ok(version) => version,
Err(err) => {
return PendingSessionEvent::Disconnected {
remote_addr,
session_id,
direction,
error: Some(err.into()),
}
}
};

// if the hello handshake was successful we can try status handshake
//
// Before trying status handshake, set up the version to shared_capability
let status = Status { version: p2p_stream.shared_capability().version(), ..status };
let status = Status { version, ..status };
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
Ok(stream_res) => stream_res,
Expand Down
2 changes: 1 addition & 1 deletion examples/manual-p2p/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn handshake_eth(p2p_stream: AuthedP2PStream) -> eyre::Result<(AuthedEthSt
.forkid(Hardfork::Shanghai.fork_id(&MAINNET).unwrap())
.build();

let status = Status { version: p2p_stream.shared_capability().version(), ..status };
let status = Status { version: p2p_stream.shared_capabilities().eth()?.version(), ..status };
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
Ok(eth_unauthed.handshake(status, fork_filter).await?)
}
Expand Down

0 comments on commit 011494d

Please sign in to comment.