From b6f73531888a94bf65fc7e8400e6d723577f697c Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 23 Oct 2024 15:47:40 +0100 Subject: [PATCH 01/22] Request/Response: remove NetworkCodec trait --- crates/services/p2p/src/behavior.rs | 2 +- crates/services/p2p/src/codecs.rs | 21 +++------------------ crates/services/p2p/src/codecs/postcard.rs | 4 ++-- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 9fe58c5dec5..c2618d2960b 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,7 +1,7 @@ use crate::{ codecs::{ postcard::PostcardCodec, - NetworkCodec, + RequestResponseCodec, }, config::Config, discovery, diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 505cf40c9bf..5b8bfa60eb1 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -1,11 +1,7 @@ pub mod postcard; use crate::{ - gossipsub::messages::{ - GossipTopicTag, - GossipsubBroadcastRequest, - GossipsubMessage, - }, + gossipsub::messages::GossipTopicTag, request_response::messages::{ RequestMessage, V2ResponseMessage, @@ -27,19 +23,8 @@ pub trait GossipsubCodec { gossipsub_topic: GossipTopicTag, ) -> Result; } - -// TODO: https://github.com/FuelLabs/fuel-core/issues/2368 -// Remove this trait -/// Main Codec trait -/// Needs to be implemented and provided to FuelBehaviour -pub trait NetworkCodec: - GossipsubCodec< - RequestMessage = GossipsubBroadcastRequest, - ResponseMessage = GossipsubMessage, - > + request_response::Codec - + Clone - + Send - + 'static +pub trait RequestResponseCodec: + request_response::Codec { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index db3fe814e4f..0cb1671bca7 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,6 +1,6 @@ use super::{ GossipsubCodec, - NetworkCodec, + RequestResponseCodec, }; use crate::{ gossipsub::messages::{ @@ -176,7 +176,7 @@ impl GossipsubCodec for PostcardCodec { } } -impl NetworkCodec for PostcardCodec { +impl RequestResponseCodec for PostcardCodec { fn get_req_res_protocols( &self, ) -> impl Iterator::Protocol> { From 529d1d6413af981e4582efec76b77e0e07a4a5d8 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 23 Oct 2024 17:03:12 +0100 Subject: [PATCH 02/22] Dinstinguish between Format being used and Codec for requests and responses in libP2p codecs --- crates/fuel-core/src/p2p_test_helpers.rs | 4 +- crates/services/p2p/src/behavior.rs | 12 +- crates/services/p2p/src/codecs.rs | 27 ++- crates/services/p2p/src/codecs/bounded.rs | 192 ++++++++++++++++ crates/services/p2p/src/codecs/postcard.rs | 249 ++++----------------- crates/services/p2p/src/p2p_service.rs | 11 +- crates/services/p2p/src/service.rs | 4 +- 7 files changed, 276 insertions(+), 223 deletions(-) create mode 100644 crates/services/p2p/src/codecs/bounded.rs diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 3a5f6e78009..3cc1f2a205a 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -23,7 +23,7 @@ use fuel_core_chain_config::{ StateConfig, }; use fuel_core_p2p::{ - codecs::postcard::PostcardCodec, + codecs::bounded::BoundedCodec, network_service::FuelP2PService, p2p_service::FuelP2PEvent, request_response::messages::{ @@ -142,7 +142,7 @@ impl Bootstrap { /// Spawn a bootstrap node. pub async fn new(node_config: &Config) -> anyhow::Result { let bootstrap_config = extract_p2p_config(node_config).await; - let codec = PostcardCodec::new(bootstrap_config.max_block_size); + let codec = BoundedCodec::new(bootstrap_config.max_block_size); let (sender, _) = broadcast::channel(bootstrap_config.reserved_nodes.len().saturating_add(1)); let mut bootstrap = FuelP2PService::new(sender, bootstrap_config, codec).await?; diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index c2618d2960b..644bce60ee2 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,7 +1,8 @@ use crate::{ codecs::{ - postcard::PostcardCodec, - RequestResponseCodec, + bounded::BoundedCodec, + postcard::PostcardDataFormat, + RequestResponseProtocols, }, config::Config, discovery, @@ -59,11 +60,14 @@ pub struct FuelBehaviour { discovery: discovery::Behaviour, /// RequestResponse protocol - request_response: request_response::Behaviour, + request_response: request_response::Behaviour>, } impl FuelBehaviour { - pub(crate) fn new(p2p_config: &Config, codec: PostcardCodec) -> anyhow::Result { + pub(crate) fn new( + p2p_config: &Config, + codec: BoundedCodec, + ) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 5b8bfa60eb1..8cab18aed94 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -1,15 +1,23 @@ +pub mod bounded; pub mod postcard; -use crate::{ - gossipsub::messages::GossipTopicTag, - request_response::messages::{ - RequestMessage, - V2ResponseMessage, - }, -}; +use crate::gossipsub::messages::GossipTopicTag; use libp2p::request_response; +use serde::{ + Deserialize, + Serialize, +}; use std::io; +trait DataFormatCodec { + type Error; + fn deserialize<'a, R: Deserialize<'a>>( + encoded_data: &'a [u8], + ) -> Result; + + fn serialize(data: &D) -> Result, Self::Error>; +} + /// Implement this in order to handle serialization & deserialization of Gossipsub messages pub trait GossipsubCodec { type RequestMessage; @@ -23,9 +31,8 @@ pub trait GossipsubCodec { gossipsub_topic: GossipTopicTag, ) -> Result; } -pub trait RequestResponseCodec: - request_response::Codec -{ + +pub trait RequestResponseProtocols: request_response::Codec { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour fn get_req_res_protocols( diff --git a/crates/services/p2p/src/codecs/bounded.rs b/crates/services/p2p/src/codecs/bounded.rs new file mode 100644 index 00000000000..80c61998619 --- /dev/null +++ b/crates/services/p2p/src/codecs/bounded.rs @@ -0,0 +1,192 @@ +use std::{ + io, + marker::PhantomData, +}; + +use async_trait::async_trait; +use futures::{ + AsyncRead, + AsyncReadExt, + AsyncWriteExt, +}; +use libp2p::request_response; + +use crate::{ + gossipsub::messages::{ + GossipTopicTag, + GossipsubBroadcastRequest, + GossipsubMessage, + }, + request_response::messages::{ + RequestMessage, + V1ResponseMessage, + V2ResponseMessage, + }, +}; +use strum::IntoEnumIterator; + +use super::{ + postcard::RequestResponseProtocol, + DataFormatCodec, + GossipsubCodec, + RequestResponseProtocols, +}; + +#[derive(Debug, Clone)] +pub struct BoundedCodec { + _format: PhantomData, + /// Used for `max_size` parameter when reading Response Message + /// Necessary in order to avoid DoS attacks + /// Currently the size mostly depends on the max size of the Block + max_response_size: usize, +} + +impl BoundedCodec { + pub fn new(max_block_size: usize) -> Self { + assert_ne!( + max_block_size, 0, + "PostcardCodec does not support zero block size" + ); + + Self { + _format: PhantomData, + max_response_size: max_block_size, + } + } +} + +/// Since Postcard does not support async reads or writes out of the box +/// We prefix Request & Response Messages with the length of the data in bytes +/// We expect the substream to be properly closed when response channel is dropped. +/// Since the request protocol used here expects a response, the sender considers this +/// early close as a protocol violation which results in the connection being closed. +/// If the substream was not properly closed when dropped, the sender would instead +/// run into a timeout waiting for the response. +#[async_trait] +impl request_response::Codec for BoundedCodec +where + Format: DataFormatCodec + Send, +{ + type Protocol = RequestResponseProtocol; + type Request = RequestMessage; + type Response = V2ResponseMessage; + + async fn read_request( + &mut self, + _protocol: &Self::Protocol, + socket: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut response = Vec::new(); + socket + .take(self.max_response_size as u64) + .read_to_end(&mut response) + .await?; + Format::deserialize(&response) + } + + async fn read_response( + &mut self, + protocol: &Self::Protocol, + socket: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut response = Vec::new(); + socket + .take(self.max_response_size as u64) + .read_to_end(&mut response) + .await?; + + match protocol { + RequestResponseProtocol::V1 => { + let v1_response = Format::deserialize::(&response)?; + Ok(v1_response.into()) + } + RequestResponseProtocol::V2 => { + Format::deserialize::(&response) + } + } + } + + async fn write_request( + &mut self, + _protocol: &Self::Protocol, + socket: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: futures::AsyncWrite + Unpin + Send, + { + let encoded_data = Format::serialize(&req)?; + socket.write_all(&encoded_data).await?; + Ok(()) + } + + async fn write_response( + &mut self, + protocol: &Self::Protocol, + socket: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: futures::AsyncWrite + Unpin + Send, + { + let encoded_data = match protocol { + RequestResponseProtocol::V1 => { + let v1_response: V1ResponseMessage = res.into(); + Format::serialize(&v1_response)? + } + RequestResponseProtocol::V2 => Format::serialize(&res)?, + }; + socket.write_all(&encoded_data).await?; + Ok(()) + } +} + +impl GossipsubCodec for BoundedCodec +where + Format: DataFormatCodec + Send, +{ + type RequestMessage = GossipsubBroadcastRequest; + type ResponseMessage = GossipsubMessage; + + fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { + let encoded_data = match data { + GossipsubBroadcastRequest::NewTx(tx) => postcard::to_stdvec(&*tx), + }; + + encoded_data.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) + } + + fn decode( + &self, + encoded_data: &[u8], + gossipsub_tag: GossipTopicTag, + ) -> Result { + let decoded_response = match gossipsub_tag { + GossipTopicTag::NewTx => { + GossipsubMessage::NewTx(Format::deserialize(encoded_data)?) + } + }; + + Ok(decoded_response) + } +} + +impl RequestResponseProtocols for Codec +where + Codec: request_response::Codec, +{ + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol> { + // TODO: Iterating over versions in reverse order should prefer + // peers to use V2 over V1 for exchanging messages. However, this is + // not guaranteed by the specs for the `request_response` protocol. + RequestResponseProtocol::iter().rev() + } +} diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 0cb1671bca7..e1c7b6e54f5 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,204 +1,47 @@ -use super::{ - GossipsubCodec, - RequestResponseCodec, +use super::DataFormatCodec; +use crate::request_response::messages::{ + V1_REQUEST_RESPONSE_PROTOCOL_ID, + V2_REQUEST_RESPONSE_PROTOCOL_ID, }; -use crate::{ - gossipsub::messages::{ - GossipTopicTag, - GossipsubBroadcastRequest, - GossipsubMessage, - }, - request_response::messages::{ - RequestMessage, - V1ResponseMessage, - V2ResponseMessage, - V1_REQUEST_RESPONSE_PROTOCOL_ID, - V2_REQUEST_RESPONSE_PROTOCOL_ID, - }, -}; -use async_trait::async_trait; -use futures::{ - AsyncRead, - AsyncReadExt, - AsyncWriteExt, -}; -use libp2p::request_response; + use serde::{ Deserialize, Serialize, }; use std::io; -use strum::IntoEnumIterator; use strum_macros::EnumIter; -/// Helper method for decoding data -/// Reusable across `RequestResponseCodec` and `GossipsubCodec` -fn deserialize<'a, R: Deserialize<'a>>(encoded_data: &'a [u8]) -> Result { - postcard::from_bytes(encoded_data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) -} - -fn serialize(data: &D) -> Result, io::Error> { - postcard::to_stdvec(&data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) -} +#[derive(Clone)] +pub struct PostcardDataFormat; -#[derive(Debug, Clone)] -pub struct PostcardCodec { - /// Used for `max_size` parameter when reading Response Message - /// Necessary in order to avoid DoS attacks - /// Currently the size mostly depends on the max size of the Block - max_response_size: usize, -} +impl DataFormatCodec for PostcardDataFormat { + type Error = io::Error; -impl PostcardCodec { - pub fn new(max_block_size: usize) -> Self { - assert_ne!( - max_block_size, 0, - "PostcardCodec does not support zero block size" - ); - - Self { - max_response_size: max_block_size, - } + fn deserialize<'a, R: Deserialize<'a>>( + encoded_data: &'a [u8], + ) -> Result { + postcard::from_bytes(encoded_data) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } -} - -/// Since Postcard does not support async reads or writes out of the box -/// We prefix Request & Response Messages with the length of the data in bytes -/// We expect the substream to be properly closed when response channel is dropped. -/// Since the request protocol used here expects a response, the sender considers this -/// early close as a protocol violation which results in the connection being closed. -/// If the substream was not properly closed when dropped, the sender would instead -/// run into a timeout waiting for the response. -#[async_trait] -impl request_response::Codec for PostcardCodec { - type Protocol = PostcardProtocol; - type Request = RequestMessage; - type Response = V2ResponseMessage; - - async fn read_request( - &mut self, - _protocol: &Self::Protocol, - socket: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut response = Vec::new(); - socket - .take(self.max_response_size as u64) - .read_to_end(&mut response) - .await?; - deserialize(&response) - } - - async fn read_response( - &mut self, - protocol: &Self::Protocol, - socket: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut response = Vec::new(); - socket - .take(self.max_response_size as u64) - .read_to_end(&mut response) - .await?; - - match protocol { - PostcardProtocol::V1 => { - let v1_response = deserialize::(&response)?; - Ok(v1_response.into()) - } - PostcardProtocol::V2 => deserialize::(&response), - } - } - - async fn write_request( - &mut self, - _protocol: &Self::Protocol, - socket: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: futures::AsyncWrite + Unpin + Send, - { - let encoded_data = serialize(&req)?; - socket.write_all(&encoded_data).await?; - Ok(()) - } - - async fn write_response( - &mut self, - protocol: &Self::Protocol, - socket: &mut T, - res: Self::Response, - ) -> io::Result<()> - where - T: futures::AsyncWrite + Unpin + Send, - { - let encoded_data = match protocol { - PostcardProtocol::V1 => { - let v1_response: V1ResponseMessage = res.into(); - serialize(&v1_response)? - } - PostcardProtocol::V2 => serialize(&res)?, - }; - socket.write_all(&encoded_data).await?; - Ok(()) - } -} - -impl GossipsubCodec for PostcardCodec { - type RequestMessage = GossipsubBroadcastRequest; - type ResponseMessage = GossipsubMessage; - - fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { - let encoded_data = match data { - GossipsubBroadcastRequest::NewTx(tx) => postcard::to_stdvec(&*tx), - }; - - encoded_data.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } - - fn decode( - &self, - encoded_data: &[u8], - gossipsub_tag: GossipTopicTag, - ) -> Result { - let decoded_response = match gossipsub_tag { - GossipTopicTag::NewTx => GossipsubMessage::NewTx(deserialize(encoded_data)?), - }; - - Ok(decoded_response) - } -} -impl RequestResponseCodec for PostcardCodec { - fn get_req_res_protocols( - &self, - ) -> impl Iterator::Protocol> { - // TODO: Iterating over versions in reverse order should prefer - // peers to use V2 over V1 for exchanging messages. However, this is - // not guaranteed by the specs for the `request_response` protocol. - PostcardProtocol::iter().rev() + fn serialize(data: &D) -> Result, Self::Error> { + postcard::to_stdvec(&data) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } } #[derive(Debug, Default, Clone, EnumIter)] -pub enum PostcardProtocol { +pub enum RequestResponseProtocol { #[default] V1, V2, } -impl AsRef for PostcardProtocol { +impl AsRef for RequestResponseProtocol { fn as_ref(&self) -> &str { match self { - PostcardProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, - PostcardProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, + RequestResponseProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, + RequestResponseProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, } } } @@ -208,12 +51,18 @@ impl AsRef for PostcardProtocol { mod tests { use fuel_core_types::blockchain::SealedBlockHeader; - use request_response::Codec as _; + use libp2p::request_response::Codec; use super::*; - use crate::request_response::messages::{ - ResponseMessageErrorCode, - MAX_REQUEST_SIZE, + use crate::{ + codecs::bounded::BoundedCodec, + request_response::messages::{ + RequestMessage, + ResponseMessageErrorCode, + V1ResponseMessage, + V2ResponseMessage, + MAX_REQUEST_SIZE, + }, }; #[test] @@ -229,17 +78,17 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec = PostcardCodec::new(1024); + let mut codec: BoundedCodec = BoundedCodec::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V2, &mut buf, response) + .write_response(&RequestResponseProtocol::V2, &mut buf, response) .await .expect("Valid Vec should be serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V2, &mut buf.as_slice()) .await .expect("Valid Vec should be deserialized using v1"); @@ -256,17 +105,17 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec = PostcardCodec::new(1024); + let mut codec: BoundedCodec = BoundedCodec::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V1, &mut buf, response) + .write_response(&RequestResponseProtocol::V1, &mut buf, response) .await .expect("Valid Vec should be serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) .await .expect("Valid Vec should be deserialized using v1"); @@ -283,17 +132,17 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec = PostcardCodec::new(1024); + let mut codec: BoundedCodec = BoundedCodec::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V2, &mut buf, response.clone()) + .write_response(&RequestResponseProtocol::V2, &mut buf, response.clone()) .await .expect("Valid Vec is serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V2, &mut buf.as_slice()) .await .expect("Valid Vec is deserialized using v1"); @@ -315,17 +164,17 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec = PostcardCodec::new(1024); + let mut codec: BoundedCodec = BoundedCodec::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .write_response(&RequestResponseProtocol::V1, &mut buf, response.clone()) .await .expect("Valid Vec is serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) .await .expect("Valid Vec is deserialized using v1"); @@ -344,19 +193,19 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec = PostcardCodec::new(1024); + let mut codec: BoundedCodec = BoundedCodec::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .write_response(&RequestResponseProtocol::V1, &mut buf, response.clone()) .await .expect("Valid Vec is serialized using v1"); let deserialized_as_v1 = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `V1ResponseMessage` type. - deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + PostcardDataFormat::deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); // Then assert!(matches!( @@ -369,13 +218,13 @@ mod tests { async fn codec__read_response_is_backwards_compatible_with_v1() { // Given let response = V1ResponseMessage::SealedHeaders(None); - let mut codec = PostcardCodec::new(1024); + let mut codec: BoundedCodec = BoundedCodec::new(1024); // When - let buf = serialize(&response) + let buf = PostcardDataFormat::serialize(&response) .expect("Serialization as V1ResponseMessage should succeed"); let deserialized = codec - .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) .await .expect("Valid Vec is deserialized using v1"); diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 05dd2ec1b38..6ed01baafb0 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,7 +4,8 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ - postcard::PostcardCodec, + bounded::BoundedCodec, + postcard::PostcardDataFormat, GossipsubCodec, }, config::{ @@ -122,7 +123,7 @@ pub struct FuelP2PService { inbound_requests_table: HashMap>, /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages - network_codec: PostcardCodec, + network_codec: BoundedCodec, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -211,7 +212,7 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - codec: PostcardCodec, + codec: BoundedCodec, ) -> anyhow::Result { let metrics = config.metrics; @@ -832,7 +833,7 @@ mod tests { PublishError, }; use crate::{ - codecs::postcard::PostcardCodec, + codecs::bounded::BoundedCodec, config::Config, gossipsub::{ messages::{ @@ -915,7 +916,7 @@ mod tests { broadcast::channel(p2p_config.reserved_nodes.len().saturating_add(1)); let mut service = - FuelP2PService::new(sender, p2p_config, PostcardCodec::new(max_block_size)) + FuelP2PService::new(sender, p2p_config, BoundedCodec::new(max_block_size)) .await .unwrap(); service.start().await.unwrap(); diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c85e1e3a6c8..8b23e84cc42 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,5 +1,5 @@ use crate::{ - codecs::postcard::PostcardCodec, + codecs::bounded::BoundedCodec, config::{ Config, NotInitialized, @@ -787,7 +787,7 @@ where let mut p2p_service = FuelP2PService::new( broadcast.reserved_peers_broadcast.clone(), config, - PostcardCodec::new(max_block_size), + BoundedCodec::new(max_block_size), ) .await?; p2p_service.start().await?; From a4465d5d4a0e33778e1b77d86b7b708711b165e0 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 23 Oct 2024 17:28:06 +0100 Subject: [PATCH 03/22] Move RequestResponse protocol definitions to dedicated module --- crates/services/p2p/src/codecs/bounded.rs | 14 ++++---- crates/services/p2p/src/codecs/postcard.rs | 36 +++++-------------- crates/services/p2p/src/request_response.rs | 1 + .../p2p/src/request_response/messages.rs | 3 -- .../p2p/src/request_response/protocols.rs | 20 +++++++++++ 5 files changed, 38 insertions(+), 36 deletions(-) create mode 100644 crates/services/p2p/src/request_response/protocols.rs diff --git a/crates/services/p2p/src/codecs/bounded.rs b/crates/services/p2p/src/codecs/bounded.rs index 80c61998619..a7cdc56d860 100644 --- a/crates/services/p2p/src/codecs/bounded.rs +++ b/crates/services/p2p/src/codecs/bounded.rs @@ -10,6 +10,7 @@ use futures::{ AsyncWriteExt, }; use libp2p::request_response; +use strum::IntoEnumIterator as _; use crate::{ gossipsub::messages::{ @@ -17,16 +18,17 @@ use crate::{ GossipsubBroadcastRequest, GossipsubMessage, }, - request_response::messages::{ - RequestMessage, - V1ResponseMessage, - V2ResponseMessage, + request_response::{ + messages::{ + RequestMessage, + V1ResponseMessage, + V2ResponseMessage, + }, + protocols::RequestResponseProtocol, }, }; -use strum::IntoEnumIterator; use super::{ - postcard::RequestResponseProtocol, DataFormatCodec, GossipsubCodec, RequestResponseProtocols, diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index e1c7b6e54f5..4425710d30a 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,15 +1,10 @@ use super::DataFormatCodec; -use crate::request_response::messages::{ - V1_REQUEST_RESPONSE_PROTOCOL_ID, - V2_REQUEST_RESPONSE_PROTOCOL_ID, -}; use serde::{ Deserialize, Serialize, }; use std::io; -use strum_macros::EnumIter; #[derive(Clone)] pub struct PostcardDataFormat; @@ -30,22 +25,6 @@ impl DataFormatCodec for PostcardDataFormat { } } -#[derive(Debug, Default, Clone, EnumIter)] -pub enum RequestResponseProtocol { - #[default] - V1, - V2, -} - -impl AsRef for RequestResponseProtocol { - fn as_ref(&self) -> &str { - match self { - RequestResponseProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, - RequestResponseProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, - } - } -} - #[cfg(test)] #[allow(non_snake_case)] mod tests { @@ -56,12 +35,15 @@ mod tests { use super::*; use crate::{ codecs::bounded::BoundedCodec, - request_response::messages::{ - RequestMessage, - ResponseMessageErrorCode, - V1ResponseMessage, - V2ResponseMessage, - MAX_REQUEST_SIZE, + request_response::{ + messages::{ + RequestMessage, + ResponseMessageErrorCode, + V1ResponseMessage, + V2ResponseMessage, + MAX_REQUEST_SIZE, + }, + protocols::RequestResponseProtocol, }, }; diff --git a/crates/services/p2p/src/request_response.rs b/crates/services/p2p/src/request_response.rs index ba63992f3cb..c1ad1b047e6 100644 --- a/crates/services/p2p/src/request_response.rs +++ b/crates/services/p2p/src/request_response.rs @@ -1 +1,2 @@ pub mod messages; +pub mod protocols; diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 2a0e03ba2cd..5a542b9009c 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,9 +18,6 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; -pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; -pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; - /// Max Size in Bytes of the Request Message #[cfg(test)] pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs new file mode 100644 index 00000000000..1430f0f6f9e --- /dev/null +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -0,0 +1,20 @@ +use strum_macros::EnumIter; + +pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; + +#[derive(Debug, Default, Clone, EnumIter)] +pub enum RequestResponseProtocol { + #[default] + V1, + V2, +} + +impl AsRef for RequestResponseProtocol { + fn as_ref(&self) -> &str { + match self { + RequestResponseProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, + RequestResponseProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, + } + } +} From 7b858c5a07114fd9c12f223453bd959e1fab7d99 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 23 Oct 2024 17:50:59 +0100 Subject: [PATCH 04/22] Decouple GossibSub and RequestResponse codecs --- crates/fuel-core/src/p2p_test_helpers.rs | 16 ++++-- crates/services/p2p/src/behavior.rs | 6 +-- crates/services/p2p/src/codecs.rs | 1 + crates/services/p2p/src/codecs/bounded.rs | 50 +++--------------- crates/services/p2p/src/codecs/unbounded.rs | 56 +++++++++++++++++++++ crates/services/p2p/src/p2p_service.rs | 33 +++++++----- crates/services/p2p/src/service.rs | 6 ++- 7 files changed, 105 insertions(+), 63 deletions(-) create mode 100644 crates/services/p2p/src/codecs/unbounded.rs diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 3cc1f2a205a..64247460bf0 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -23,7 +23,10 @@ use fuel_core_chain_config::{ StateConfig, }; use fuel_core_p2p::{ - codecs::bounded::BoundedCodec, + codecs::{ + bounded::BoundedCodec, + unbounded::UnboundedCodec, + }, network_service::FuelP2PService, p2p_service::FuelP2PEvent, request_response::messages::{ @@ -142,10 +145,17 @@ impl Bootstrap { /// Spawn a bootstrap node. pub async fn new(node_config: &Config) -> anyhow::Result { let bootstrap_config = extract_p2p_config(node_config).await; - let codec = BoundedCodec::new(bootstrap_config.max_block_size); + let request_response_codec = BoundedCodec::new(bootstrap_config.max_block_size); + let gossipsub_codec = UnboundedCodec::new(); let (sender, _) = broadcast::channel(bootstrap_config.reserved_nodes.len().saturating_add(1)); - let mut bootstrap = FuelP2PService::new(sender, bootstrap_config, codec).await?; + let mut bootstrap = FuelP2PService::new( + sender, + bootstrap_config, + gossipsub_codec, + request_response_codec, + ) + .await?; bootstrap.start().await?; let listeners = bootstrap.multiaddrs(); diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 644bce60ee2..3db54043611 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -66,7 +66,7 @@ pub struct FuelBehaviour { impl FuelBehaviour { pub(crate) fn new( p2p_config: &Config, - codec: BoundedCodec, + request_response_codec: BoundedCodec, ) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); @@ -114,7 +114,7 @@ impl FuelBehaviour { BlockHeight::default(), ); - let req_res_protocol = codec + let req_res_protocol = request_response_codec .get_req_res_protocols() .map(|protocol| (protocol, ProtocolSupport::Full)); @@ -123,7 +123,7 @@ impl FuelBehaviour { .with_max_concurrent_streams(p2p_config.max_concurrent_streams); let request_response = request_response::Behaviour::with_codec( - codec.clone(), + request_response_codec.clone(), req_res_protocol, req_res_config, ); diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 8cab18aed94..b7c296fc89c 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -1,5 +1,6 @@ pub mod bounded; pub mod postcard; +pub mod unbounded; use crate::gossipsub::messages::GossipTopicTag; use libp2p::request_response; diff --git a/crates/services/p2p/src/codecs/bounded.rs b/crates/services/p2p/src/codecs/bounded.rs index a7cdc56d860..4ed5e2d89db 100644 --- a/crates/services/p2p/src/codecs/bounded.rs +++ b/crates/services/p2p/src/codecs/bounded.rs @@ -12,25 +12,17 @@ use futures::{ use libp2p::request_response; use strum::IntoEnumIterator as _; -use crate::{ - gossipsub::messages::{ - GossipTopicTag, - GossipsubBroadcastRequest, - GossipsubMessage, - }, - request_response::{ - messages::{ - RequestMessage, - V1ResponseMessage, - V2ResponseMessage, - }, - protocols::RequestResponseProtocol, +use crate::request_response::{ + messages::{ + RequestMessage, + V1ResponseMessage, + V2ResponseMessage, }, + protocols::RequestResponseProtocol, }; use super::{ DataFormatCodec, - GossipsubCodec, RequestResponseProtocols, }; @@ -149,36 +141,6 @@ where } } -impl GossipsubCodec for BoundedCodec -where - Format: DataFormatCodec + Send, -{ - type RequestMessage = GossipsubBroadcastRequest; - type ResponseMessage = GossipsubMessage; - - fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { - let encoded_data = match data { - GossipsubBroadcastRequest::NewTx(tx) => postcard::to_stdvec(&*tx), - }; - - encoded_data.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } - - fn decode( - &self, - encoded_data: &[u8], - gossipsub_tag: GossipTopicTag, - ) -> Result { - let decoded_response = match gossipsub_tag { - GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(Format::deserialize(encoded_data)?) - } - }; - - Ok(decoded_response) - } -} - impl RequestResponseProtocols for Codec where Codec: request_response::Codec, diff --git a/crates/services/p2p/src/codecs/unbounded.rs b/crates/services/p2p/src/codecs/unbounded.rs new file mode 100644 index 00000000000..1b5697a62fe --- /dev/null +++ b/crates/services/p2p/src/codecs/unbounded.rs @@ -0,0 +1,56 @@ +use std::{ + io, + marker::PhantomData, +}; + +use crate::gossipsub::messages::{ + GossipTopicTag, + GossipsubBroadcastRequest, + GossipsubMessage, +}; + +use super::{ + DataFormatCodec, + GossipsubCodec, +}; + +#[derive(Debug, Clone)] +pub struct UnboundedCodec { + _format: PhantomData, +} + +impl UnboundedCodec { + pub fn new() -> Self { + UnboundedCodec { + _format: PhantomData, + } + } +} + +impl GossipsubCodec for UnboundedCodec +where + Format: DataFormatCodec + Send, +{ + type RequestMessage = GossipsubBroadcastRequest; + type ResponseMessage = GossipsubMessage; + + fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { + match data { + GossipsubBroadcastRequest::NewTx(tx) => Format::serialize(&*tx), + } + } + + fn decode( + &self, + encoded_data: &[u8], + gossipsub_tag: GossipTopicTag, + ) -> Result { + let decoded_response = match gossipsub_tag { + GossipTopicTag::NewTx => { + GossipsubMessage::NewTx(Format::deserialize(encoded_data)?) + } + }; + + Ok(decoded_response) + } +} diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 6ed01baafb0..59379815dd1 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -6,6 +6,7 @@ use crate::{ codecs::{ bounded::BoundedCodec, postcard::PostcardDataFormat, + unbounded::UnboundedCodec, GossipsubCodec, }, config::{ @@ -122,8 +123,8 @@ pub struct FuelP2PService { /// to the peer that requested it. inbound_requests_table: HashMap>, - /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages - network_codec: BoundedCodec, + /// `UboundedCodec` as GossipsubCodec for encoding and decoding of Gossipsub messages + gossipsub_codec: UnboundedCodec, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -212,7 +213,8 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - codec: BoundedCodec, + gossipsub_codec: UnboundedCodec, + request_response_codec: BoundedCodec, ) -> anyhow::Result { let metrics = config.metrics; @@ -228,7 +230,7 @@ impl FuelP2PService { // configure and build P2P Service let (transport_function, connection_state) = build_transport_function(&config); let tcp_config = tcp::Config::new().port_reuse(true); - let behaviour = FuelBehaviour::new(&config, codec.clone())?; + let behaviour = FuelBehaviour::new(&config, request_response_codec)?; let swarm_builder = SwarmBuilder::with_existing_identity(config.keypair.clone()) .with_tokio() @@ -288,7 +290,7 @@ impl FuelP2PService { local_address: config.address, tcp_port: config.tcp_port, swarm, - network_codec: codec, + gossipsub_codec, outbound_requests_table: HashMap::default(), inbound_requests_table: HashMap::default(), network_metadata, @@ -382,7 +384,7 @@ impl FuelP2PService { .topics .get_gossipsub_topic_hash(&message); - match self.network_codec.encode(message) { + match self.gossipsub_codec.encode(message) { Ok(encoded_data) => self .swarm .behaviour_mut() @@ -595,7 +597,7 @@ impl FuelP2PService { message_id, } => { let correct_topic = self.get_topic_tag(&message.topic)?; - match self.network_codec.decode(&message.data, correct_topic) { + match self.gossipsub_codec.decode(&message.data, correct_topic) { Ok(decoded_message) => Some(FuelP2PEvent::GossipsubMessage { peer_id: propagation_source, message_id, @@ -833,7 +835,10 @@ mod tests { PublishError, }; use crate::{ - codecs::bounded::BoundedCodec, + codecs::{ + bounded::BoundedCodec, + unbounded::UnboundedCodec, + }, config::Config, gossipsub::{ messages::{ @@ -915,10 +920,14 @@ mod tests { let (sender, _) = broadcast::channel(p2p_config.reserved_nodes.len().saturating_add(1)); - let mut service = - FuelP2PService::new(sender, p2p_config, BoundedCodec::new(max_block_size)) - .await - .unwrap(); + let mut service = FuelP2PService::new( + sender, + p2p_config, + UnboundedCodec::new(), + BoundedCodec::new(max_block_size), + ) + .await + .unwrap(); service.start().await.unwrap(); service } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 8b23e84cc42..71c4874a82c 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,5 +1,8 @@ use crate::{ - codecs::bounded::BoundedCodec, + codecs::{ + bounded::BoundedCodec, + unbounded::UnboundedCodec, + }, config::{ Config, NotInitialized, @@ -787,6 +790,7 @@ where let mut p2p_service = FuelP2PService::new( broadcast.reserved_peers_broadcast.clone(), config, + UnboundedCodec::new(), BoundedCodec::new(max_block_size), ) .await?; From 1aa98dc73d06aca10cb7d1452b2fc32cefa3ef5e Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 15:19:22 +0000 Subject: [PATCH 05/22] Simplify DataFormat trait --- crates/services/p2p/src/codecs.rs | 8 ++-- crates/services/p2p/src/codecs/bounded.rs | 41 +++++++-------------- crates/services/p2p/src/codecs/postcard.rs | 39 +++++++++++++++++--- crates/services/p2p/src/codecs/unbounded.rs | 23 +++--------- 4 files changed, 58 insertions(+), 53 deletions(-) diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index b7c296fc89c..f4a0b87924f 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -10,13 +10,15 @@ use serde::{ }; use std::io; -trait DataFormatCodec { +// TODO: Deprecate this trait in favour of something similar to Encode + Decode in storage crate +// In fact, we should probably have a single trait that can be used both here and in the storage crate +trait DataFormat { type Error; fn deserialize<'a, R: Deserialize<'a>>( + &self, encoded_data: &'a [u8], ) -> Result; - - fn serialize(data: &D) -> Result, Self::Error>; + fn serialize(&self, data: &D) -> Result, Self::Error>; } /// Implement this in order to handle serialization & deserialization of Gossipsub messages diff --git a/crates/services/p2p/src/codecs/bounded.rs b/crates/services/p2p/src/codecs/bounded.rs index 4ed5e2d89db..8393219a7a6 100644 --- a/crates/services/p2p/src/codecs/bounded.rs +++ b/crates/services/p2p/src/codecs/bounded.rs @@ -1,7 +1,4 @@ -use std::{ - io, - marker::PhantomData, -}; +use std::io; use async_trait::async_trait; use futures::{ @@ -22,31 +19,17 @@ use crate::request_response::{ }; use super::{ - DataFormatCodec, + DataFormat, RequestResponseProtocols, }; #[derive(Debug, Clone)] pub struct BoundedCodec { - _format: PhantomData, + pub(crate) data_format: Format, /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block - max_response_size: usize, -} - -impl BoundedCodec { - pub fn new(max_block_size: usize) -> Self { - assert_ne!( - max_block_size, 0, - "PostcardCodec does not support zero block size" - ); - - Self { - _format: PhantomData, - max_response_size: max_block_size, - } - } + pub(crate) max_response_size: usize, } /// Since Postcard does not support async reads or writes out of the box @@ -59,7 +42,7 @@ impl BoundedCodec { #[async_trait] impl request_response::Codec for BoundedCodec where - Format: DataFormatCodec + Send, + Format: DataFormat + Send, { type Protocol = RequestResponseProtocol; type Request = RequestMessage; @@ -78,7 +61,7 @@ where .take(self.max_response_size as u64) .read_to_end(&mut response) .await?; - Format::deserialize(&response) + self.data_format.deserialize(&response) } async fn read_response( @@ -97,11 +80,13 @@ where match protocol { RequestResponseProtocol::V1 => { - let v1_response = Format::deserialize::(&response)?; + let v1_response = self + .data_format + .deserialize::(&response)?; Ok(v1_response.into()) } RequestResponseProtocol::V2 => { - Format::deserialize::(&response) + self.data_format.deserialize::(&response) } } } @@ -115,7 +100,7 @@ where where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = Format::serialize(&req)?; + let encoded_data = self.data_format.serialize(&req)?; socket.write_all(&encoded_data).await?; Ok(()) } @@ -132,9 +117,9 @@ where let encoded_data = match protocol { RequestResponseProtocol::V1 => { let v1_response: V1ResponseMessage = res.into(); - Format::serialize(&v1_response)? + self.data_format.serialize(&v1_response)? } - RequestResponseProtocol::V2 => Format::serialize(&res)?, + RequestResponseProtocol::V2 => self.data_format.serialize(&res)?, }; socket.write_all(&encoded_data).await?; Ok(()) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 4425710d30a..ab52fe713d6 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,4 +1,8 @@ -use super::DataFormatCodec; +use super::{ + bounded::BoundedCodec, + unbounded::UnboundedCodec, + DataFormat, +}; use serde::{ Deserialize, @@ -9,22 +13,45 @@ use std::io; #[derive(Clone)] pub struct PostcardDataFormat; -impl DataFormatCodec for PostcardDataFormat { +impl DataFormat for PostcardDataFormat { type Error = io::Error; fn deserialize<'a, R: Deserialize<'a>>( + &self, encoded_data: &'a [u8], ) -> Result { postcard::from_bytes(encoded_data) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } - fn serialize(data: &D) -> Result, Self::Error> { + fn serialize(&self, data: &D) -> Result, Self::Error> { postcard::to_stdvec(&data) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } } +impl BoundedCodec { + pub fn new(max_block_size: usize) -> Self { + assert_ne!( + max_block_size, 0, + "PostcardCodec does not support zero block size" + ); + + Self { + data_format: PostcardDataFormat, + max_response_size: max_block_size, + } + } +} + +impl UnboundedCodec { + pub fn new() -> Self { + UnboundedCodec { + data_format: PostcardDataFormat, + } + } +} + #[cfg(test)] #[allow(non_snake_case)] mod tests { @@ -187,7 +214,7 @@ mod tests { let deserialized_as_v1 = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `V1ResponseMessage` type. - PostcardDataFormat::deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + codec.data_format.deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); // Then assert!(matches!( @@ -203,7 +230,9 @@ mod tests { let mut codec: BoundedCodec = BoundedCodec::new(1024); // When - let buf = PostcardDataFormat::serialize(&response) + let buf = codec + .data_format + .serialize(&response) .expect("Serialization as V1ResponseMessage should succeed"); let deserialized = codec .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) diff --git a/crates/services/p2p/src/codecs/unbounded.rs b/crates/services/p2p/src/codecs/unbounded.rs index 1b5697a62fe..6ddcc4b9088 100644 --- a/crates/services/p2p/src/codecs/unbounded.rs +++ b/crates/services/p2p/src/codecs/unbounded.rs @@ -1,7 +1,4 @@ -use std::{ - io, - marker::PhantomData, -}; +use std::io; use crate::gossipsub::messages::{ GossipTopicTag, @@ -10,33 +7,25 @@ use crate::gossipsub::messages::{ }; use super::{ - DataFormatCodec, + DataFormat, GossipsubCodec, }; #[derive(Debug, Clone)] pub struct UnboundedCodec { - _format: PhantomData, -} - -impl UnboundedCodec { - pub fn new() -> Self { - UnboundedCodec { - _format: PhantomData, - } - } + pub(crate) data_format: Format, } impl GossipsubCodec for UnboundedCodec where - Format: DataFormatCodec + Send, + Format: DataFormat + Send, { type RequestMessage = GossipsubBroadcastRequest; type ResponseMessage = GossipsubMessage; fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { match data { - GossipsubBroadcastRequest::NewTx(tx) => Format::serialize(&*tx), + GossipsubBroadcastRequest::NewTx(tx) => self.data_format.serialize(&*tx), } } @@ -47,7 +36,7 @@ where ) -> Result { let decoded_response = match gossipsub_tag { GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(Format::deserialize(encoded_data)?) + GossipsubMessage::NewTx(self.data_format.deserialize(encoded_data)?) } }; From 79b507d344f90d479be349a087ef91bba4b66cbe Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 18:22:21 +0000 Subject: [PATCH 06/22] Use Encode and Decode trait in favour of DataFormat --- crates/services/p2p/src/codecs.rs | 58 ++++++++++++++---- crates/services/p2p/src/codecs/bounded.rs | 58 +++++++++++++----- crates/services/p2p/src/codecs/postcard.rs | 67 ++++++++------------- crates/services/p2p/src/codecs/unbounded.rs | 31 ++++++++-- 4 files changed, 139 insertions(+), 75 deletions(-) diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index f4a0b87924f..3f995bc0093 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -4,21 +4,55 @@ pub mod unbounded; use crate::gossipsub::messages::GossipTopicTag; use libp2p::request_response; -use serde::{ - Deserialize, - Serialize, + +use std::{ + borrow::Cow, + io, }; -use std::io; -// TODO: Deprecate this trait in favour of something similar to Encode + Decode in storage crate -// In fact, we should probably have a single trait that can be used both here and in the storage crate -trait DataFormat { +// TODO: This trait is largely copied by the storage crate, we should unify them +/// The trait is usually implemented by the encoder that stores serialized objects. +pub trait Encoder: Send { + /// Returns the serialized object as a slice. + fn as_bytes(&self) -> Cow<[u8]>; +} + +/// The trait encodes the type to the bytes and passes it to the `Encoder`, +/// which stores it and provides a reference to it. That allows gives more +/// flexibility and more performant encoding, allowing the use of slices and arrays +/// instead of vectors in some cases. Since the [`Encoder`] returns `Cow<[u8]>`, +/// it is always possible to take ownership of the serialized value. +pub trait Encode { type Error; - fn deserialize<'a, R: Deserialize<'a>>( - &self, - encoded_data: &'a [u8], - ) -> Result; - fn serialize(&self, data: &D) -> Result, Self::Error>; + /// The encoder type that stores serialized object. + type Encoder<'a>: Encoder + where + T: 'a; + + /// Encodes the object to the bytes and passes it to the `Encoder`. + fn encode(t: &T) -> Result, Self::Error>; +} + +/// The trait decodes the type from the bytes. +pub trait Decode { + type Error; + /// Decodes the type `T` from the bytes. + fn decode(bytes: &[u8]) -> Result; +} + +impl<'a> Encoder for Cow<'a, [u8]> { + fn as_bytes(&self) -> Cow<[u8]> { + match self { + Cow::Borrowed(borrowed) => Cow::Borrowed(borrowed), + Cow::Owned(owned) => Cow::Borrowed(owned.as_ref()), + } + } +} + +impl Encoder for [u8; SIZE] { + fn as_bytes(&self) -> Cow<[u8]> { + Cow::Borrowed(self.as_slice()) + } } /// Implement this in order to handle serialization & deserialization of Gossipsub messages diff --git a/crates/services/p2p/src/codecs/bounded.rs b/crates/services/p2p/src/codecs/bounded.rs index 8393219a7a6..b8869b47890 100644 --- a/crates/services/p2p/src/codecs/bounded.rs +++ b/crates/services/p2p/src/codecs/bounded.rs @@ -1,4 +1,7 @@ -use std::io; +use std::{ + io, + marker::PhantomData, +}; use async_trait::async_trait; use futures::{ @@ -19,19 +22,35 @@ use crate::request_response::{ }; use super::{ - DataFormat, + Decode, + Encode, + Encoder, RequestResponseProtocols, }; #[derive(Debug, Clone)] pub struct BoundedCodec { - pub(crate) data_format: Format, + pub(crate) _data_format: PhantomData, /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block pub(crate) max_response_size: usize, } +impl BoundedCodec { + pub fn new(max_block_size: usize) -> Self { + assert_ne!( + max_block_size, 0, + "BoundedCodec does not support zero block size" + ); + + Self { + _data_format: PhantomData, + max_response_size: max_block_size, + } + } +} + /// Since Postcard does not support async reads or writes out of the box /// We prefix Request & Response Messages with the length of the data in bytes /// We expect the substream to be properly closed when response channel is dropped. @@ -42,7 +61,13 @@ pub struct BoundedCodec { #[async_trait] impl request_response::Codec for BoundedCodec where - Format: DataFormat + Send, + Format: Encode + + Decode + + Encode + + Decode + + Encode + + Decode + + Send, { type Protocol = RequestResponseProtocol; type Request = RequestMessage; @@ -61,7 +86,7 @@ where .take(self.max_response_size as u64) .read_to_end(&mut response) .await?; - self.data_format.deserialize(&response) + Format::decode(&response) } async fn read_response( @@ -80,13 +105,12 @@ where match protocol { RequestResponseProtocol::V1 => { - let v1_response = self - .data_format - .deserialize::(&response)?; + let v1_response = + >::decode(&response)?; Ok(v1_response.into()) } RequestResponseProtocol::V2 => { - self.data_format.deserialize::(&response) + >::decode(&response) } } } @@ -100,8 +124,8 @@ where where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = self.data_format.serialize(&req)?; - socket.write_all(&encoded_data).await?; + let encoded_data = Format::encode(&req)?; + socket.write_all(&encoded_data.as_bytes()).await?; Ok(()) } @@ -114,14 +138,18 @@ where where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = match protocol { + match protocol { RequestResponseProtocol::V1 => { let v1_response: V1ResponseMessage = res.into(); - self.data_format.serialize(&v1_response)? + let res = Format::encode(&v1_response)?.as_bytes().into_owned(); + socket.write_all(&res).await?; + } + RequestResponseProtocol::V2 => { + let res = Format::encode(&res)?.as_bytes().into_owned(); + socket.write_all(&res).await?; } - RequestResponseProtocol::V2 => self.data_format.serialize(&res)?, }; - socket.write_all(&encoded_data).await?; + Ok(()) } } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index ab52fe713d6..0f94d24f50d 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,54 +1,39 @@ use super::{ - bounded::BoundedCodec, - unbounded::UnboundedCodec, - DataFormat, + Decode, + Encode, }; -use serde::{ - Deserialize, - Serialize, +use std::{ + borrow::Cow, + io, }; -use std::io; #[derive(Clone)] pub struct PostcardDataFormat; -impl DataFormat for PostcardDataFormat { +impl Encode for PostcardDataFormat +where + T: ?Sized + serde::Serialize, +{ + type Encoder<'a> = Cow<'a, [u8]> where T: 'a; type Error = io::Error; - fn deserialize<'a, R: Deserialize<'a>>( - &self, - encoded_data: &'a [u8], - ) -> Result { - postcard::from_bytes(encoded_data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } - - fn serialize(&self, data: &D) -> Result, Self::Error> { - postcard::to_stdvec(&data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) + fn encode(value: &T) -> Result, Self::Error> { + Ok(Cow::Owned(postcard::to_allocvec(value).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e.to_string()) + })?)) } } -impl BoundedCodec { - pub fn new(max_block_size: usize) -> Self { - assert_ne!( - max_block_size, 0, - "PostcardCodec does not support zero block size" - ); - - Self { - data_format: PostcardDataFormat, - max_response_size: max_block_size, - } - } -} +impl Decode for PostcardDataFormat +where + T: serde::de::DeserializeOwned, +{ + type Error = io::Error; -impl UnboundedCodec { - pub fn new() -> Self { - UnboundedCodec { - data_format: PostcardDataFormat, - } + fn decode(bytes: &[u8]) -> Result { + postcard::from_bytes(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } } @@ -214,7 +199,7 @@ mod tests { let deserialized_as_v1 = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `V1ResponseMessage` type. - codec.data_format.deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + >::decode(&buf).expect("Deserialization as V1ResponseMessage should succeed"); // Then assert!(matches!( @@ -230,12 +215,10 @@ mod tests { let mut codec: BoundedCodec = BoundedCodec::new(1024); // When - let buf = codec - .data_format - .serialize(&response) + let buf = >::encode(&response) .expect("Serialization as V1ResponseMessage should succeed"); let deserialized = codec - .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut &*buf) .await .expect("Valid Vec is deserialized using v1"); diff --git a/crates/services/p2p/src/codecs/unbounded.rs b/crates/services/p2p/src/codecs/unbounded.rs index 6ddcc4b9088..6c54b897224 100644 --- a/crates/services/p2p/src/codecs/unbounded.rs +++ b/crates/services/p2p/src/codecs/unbounded.rs @@ -1,4 +1,9 @@ -use std::io; +use std::{ + io, + marker::PhantomData, +}; + +use fuel_core_types::fuel_tx::Transaction; use crate::gossipsub::messages::{ GossipTopicTag, @@ -7,25 +12,39 @@ use crate::gossipsub::messages::{ }; use super::{ - DataFormat, + Decode, + Encode, + Encoder, GossipsubCodec, }; #[derive(Debug, Clone)] pub struct UnboundedCodec { - pub(crate) data_format: Format, + pub(crate) _data_format: PhantomData, +} + +impl UnboundedCodec { + pub fn new() -> Self { + UnboundedCodec { + _data_format: PhantomData, + } + } } impl GossipsubCodec for UnboundedCodec where - Format: DataFormat + Send, + Format: Encode + + Decode + + Send, { type RequestMessage = GossipsubBroadcastRequest; type ResponseMessage = GossipsubMessage; fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { match data { - GossipsubBroadcastRequest::NewTx(tx) => self.data_format.serialize(&*tx), + GossipsubBroadcastRequest::NewTx(tx) => { + Ok(Format::encode(&*tx)?.as_bytes().into_owned()) + } } } @@ -36,7 +55,7 @@ where ) -> Result { let decoded_response = match gossipsub_tag { GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(self.data_format.deserialize(encoded_data)?) + GossipsubMessage::NewTx(Format::decode(encoded_data)?) } }; From ead761a08a1a103bdd103f8f7e66da077fb92186 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 18:32:01 +0000 Subject: [PATCH 07/22] Rename codecs to message handlers --- crates/fuel-core/src/p2p_test_helpers.rs | 9 +++++---- crates/services/p2p/src/behavior.rs | 7 ++++--- crates/services/p2p/src/codecs.rs | 10 +++++----- .../src/codecs/{unbounded.rs => gossipsub.rs} | 8 ++++---- crates/services/p2p/src/codecs/postcard.rs | 20 ++++++++++++------- .../{bounded.rs => request_response.rs} | 6 +++--- crates/services/p2p/src/p2p_service.rs | 18 ++++++++--------- crates/services/p2p/src/service.rs | 8 ++++---- 8 files changed, 47 insertions(+), 39 deletions(-) rename crates/services/p2p/src/codecs/{unbounded.rs => gossipsub.rs} (87%) rename crates/services/p2p/src/codecs/{bounded.rs => request_response.rs} (96%) diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 64247460bf0..55b95df03de 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -24,8 +24,8 @@ use fuel_core_chain_config::{ }; use fuel_core_p2p::{ codecs::{ - bounded::BoundedCodec, - unbounded::UnboundedCodec, + request_response::RequestResponseMessageHandler, + unbounded::GossipsubMessageHandler, }, network_service::FuelP2PService, p2p_service::FuelP2PEvent, @@ -145,8 +145,9 @@ impl Bootstrap { /// Spawn a bootstrap node. pub async fn new(node_config: &Config) -> anyhow::Result { let bootstrap_config = extract_p2p_config(node_config).await; - let request_response_codec = BoundedCodec::new(bootstrap_config.max_block_size); - let gossipsub_codec = UnboundedCodec::new(); + let request_response_codec = + RequestResponseMessageHandler::new(bootstrap_config.max_block_size); + let gossipsub_codec = GossipsubMessageHandler::new(); let (sender, _) = broadcast::channel(bootstrap_config.reserved_nodes.len().saturating_add(1)); let mut bootstrap = FuelP2PService::new( diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 3db54043611..feaf5741cf2 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,7 +1,7 @@ use crate::{ codecs::{ - bounded::BoundedCodec, postcard::PostcardDataFormat, + request_response::RequestResponseMessageHandler, RequestResponseProtocols, }, config::Config, @@ -60,13 +60,14 @@ pub struct FuelBehaviour { discovery: discovery::Behaviour, /// RequestResponse protocol - request_response: request_response::Behaviour>, + request_response: + request_response::Behaviour>, } impl FuelBehaviour { pub(crate) fn new( p2p_config: &Config, - request_response_codec: BoundedCodec, + request_response_codec: RequestResponseMessageHandler, ) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 3f995bc0093..e5691227597 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -1,9 +1,9 @@ -pub mod bounded; +pub mod gossipsub; pub mod postcard; -pub mod unbounded; +pub mod request_response; use crate::gossipsub::messages::GossipTopicTag; -use libp2p::request_response; +use libp2p::request_response as libp2p_request_response; use std::{ borrow::Cow, @@ -69,10 +69,10 @@ pub trait GossipsubCodec { ) -> Result; } -pub trait RequestResponseProtocols: request_response::Codec { +pub trait RequestResponseProtocols: libp2p_request_response::Codec { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour fn get_req_res_protocols( &self, - ) -> impl Iterator::Protocol>; + ) -> impl Iterator::Protocol>; } diff --git a/crates/services/p2p/src/codecs/unbounded.rs b/crates/services/p2p/src/codecs/gossipsub.rs similarity index 87% rename from crates/services/p2p/src/codecs/unbounded.rs rename to crates/services/p2p/src/codecs/gossipsub.rs index 6c54b897224..238f0888e81 100644 --- a/crates/services/p2p/src/codecs/unbounded.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -19,19 +19,19 @@ use super::{ }; #[derive(Debug, Clone)] -pub struct UnboundedCodec { +pub struct GossipsubMessageHandler { pub(crate) _data_format: PhantomData, } -impl UnboundedCodec { +impl GossipsubMessageHandler { pub fn new() -> Self { - UnboundedCodec { + GossipsubMessageHandler { _data_format: PhantomData, } } } -impl GossipsubCodec for UnboundedCodec +impl GossipsubCodec for GossipsubMessageHandler where Format: Encode + Decode diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 0f94d24f50d..3af157f26e8 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -46,7 +46,7 @@ mod tests { use super::*; use crate::{ - codecs::bounded::BoundedCodec, + codecs::bounded::RequestResponseMessageHandler, request_response::{ messages::{ RequestMessage, @@ -72,7 +72,8 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec: BoundedCodec = BoundedCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When @@ -99,7 +100,8 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec: BoundedCodec = BoundedCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When @@ -126,7 +128,8 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec: BoundedCodec = BoundedCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When @@ -158,7 +161,8 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec: BoundedCodec = BoundedCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When @@ -187,7 +191,8 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec: BoundedCodec = BoundedCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When @@ -212,7 +217,8 @@ mod tests { async fn codec__read_response_is_backwards_compatible_with_v1() { // Given let response = V1ResponseMessage::SealedHeaders(None); - let mut codec: BoundedCodec = BoundedCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); // When let buf = >::encode(&response) diff --git a/crates/services/p2p/src/codecs/bounded.rs b/crates/services/p2p/src/codecs/request_response.rs similarity index 96% rename from crates/services/p2p/src/codecs/bounded.rs rename to crates/services/p2p/src/codecs/request_response.rs index b8869b47890..95af7922f94 100644 --- a/crates/services/p2p/src/codecs/bounded.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -29,7 +29,7 @@ use super::{ }; #[derive(Debug, Clone)] -pub struct BoundedCodec { +pub struct RequestResponseMessageHandler { pub(crate) _data_format: PhantomData, /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks @@ -37,7 +37,7 @@ pub struct BoundedCodec { pub(crate) max_response_size: usize, } -impl BoundedCodec { +impl RequestResponseMessageHandler { pub fn new(max_block_size: usize) -> Self { assert_ne!( max_block_size, 0, @@ -59,7 +59,7 @@ impl BoundedCodec { /// If the substream was not properly closed when dropped, the sender would instead /// run into a timeout waiting for the response. #[async_trait] -impl request_response::Codec for BoundedCodec +impl request_response::Codec for RequestResponseMessageHandler where Format: Encode + Decode diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 59379815dd1..ae2d0b585ab 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,9 +4,9 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ - bounded::BoundedCodec, + gossipsub::GossipsubMessageHandler, postcard::PostcardDataFormat, - unbounded::UnboundedCodec, + request_response::RequestResponseMessageHandler, GossipsubCodec, }, config::{ @@ -124,7 +124,7 @@ pub struct FuelP2PService { inbound_requests_table: HashMap>, /// `UboundedCodec` as GossipsubCodec for encoding and decoding of Gossipsub messages - gossipsub_codec: UnboundedCodec, + gossipsub_codec: GossipsubMessageHandler, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -213,8 +213,8 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - gossipsub_codec: UnboundedCodec, - request_response_codec: BoundedCodec, + gossipsub_codec: GossipsubMessageHandler, + request_response_codec: RequestResponseMessageHandler, ) -> anyhow::Result { let metrics = config.metrics; @@ -836,8 +836,8 @@ mod tests { }; use crate::{ codecs::{ - bounded::BoundedCodec, - unbounded::UnboundedCodec, + gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, }, config::Config, gossipsub::{ @@ -923,8 +923,8 @@ mod tests { let mut service = FuelP2PService::new( sender, p2p_config, - UnboundedCodec::new(), - BoundedCodec::new(max_block_size), + GossipsubMessageHandler::new(), + RequestResponseMessageHandler::new(max_block_size), ) .await .unwrap(); diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 71c4874a82c..d0a6af89893 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,7 +1,7 @@ use crate::{ codecs::{ - bounded::BoundedCodec, - unbounded::UnboundedCodec, + request_response::RequestResponseMessageHandler, + unbounded::GossipsubMessageHandler, }, config::{ Config, @@ -790,8 +790,8 @@ where let mut p2p_service = FuelP2PService::new( broadcast.reserved_peers_broadcast.clone(), config, - UnboundedCodec::new(), - BoundedCodec::new(max_block_size), + GossipsubMessageHandler::new(), + RequestResponseMessageHandler::new(max_block_size), ) .await?; p2p_service.start().await?; From ed2937633f58c8d0f6f8377165f2d32f3b1c0575 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 18:44:55 +0000 Subject: [PATCH 08/22] Format --- crates/services/p2p/src/codecs/postcard.rs | 2 +- crates/services/p2p/src/service.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 3af157f26e8..dc3d09ef5f7 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -46,7 +46,7 @@ mod tests { use super::*; use crate::{ - codecs::bounded::RequestResponseMessageHandler, + codecs::request_response::RequestResponseMessageHandler, request_response::{ messages::{ RequestMessage, diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index d0a6af89893..29e40d7c363 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,7 +1,7 @@ use crate::{ codecs::{ + gossipsub::GossipsubMessageHandler, request_response::RequestResponseMessageHandler, - unbounded::GossipsubMessageHandler, }, config::{ Config, From 138716b69b36c02eef4dffd57f20908452dc2758 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 21:40:38 +0000 Subject: [PATCH 09/22] Encode trait function takes &self as argument --- crates/services/p2p/src/codecs.rs | 4 +-- crates/services/p2p/src/codecs/gossipsub.rs | 12 ++------ crates/services/p2p/src/codecs/postcard.rs | 30 +++++++++++++++++-- .../p2p/src/codecs/request_response.rs | 26 +++++----------- 4 files changed, 40 insertions(+), 32 deletions(-) diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index e5691227597..407130fdc35 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -30,7 +30,7 @@ pub trait Encode { T: 'a; /// Encodes the object to the bytes and passes it to the `Encoder`. - fn encode(t: &T) -> Result, Self::Error>; + fn encode<'b>(&self, t: &'b T) -> Result, Self::Error>; } /// The trait decodes the type from the bytes. @@ -41,7 +41,7 @@ pub trait Decode { } impl<'a> Encoder for Cow<'a, [u8]> { - fn as_bytes(&self) -> Cow<[u8]> { + fn as_bytes(&self) -> Cow<'_, [u8]> { match self { Cow::Borrowed(borrowed) => Cow::Borrowed(borrowed), Cow::Owned(owned) => Cow::Borrowed(owned.as_ref()), diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index 238f0888e81..878d0634bd2 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -20,15 +20,7 @@ use super::{ #[derive(Debug, Clone)] pub struct GossipsubMessageHandler { - pub(crate) _data_format: PhantomData, -} - -impl GossipsubMessageHandler { - pub fn new() -> Self { - GossipsubMessageHandler { - _data_format: PhantomData, - } - } + pub(crate) data_format: Format, } impl GossipsubCodec for GossipsubMessageHandler @@ -43,7 +35,7 @@ where fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { match data { GossipsubBroadcastRequest::NewTx(tx) => { - Ok(Format::encode(&*tx)?.as_bytes().into_owned()) + Ok(self.data_format.encode(&*tx)?.as_bytes().into_owned()) } } } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index dc3d09ef5f7..5241efc5299 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,4 +1,6 @@ use super::{ + gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, Decode, Encode, }; @@ -11,6 +13,28 @@ use std::{ #[derive(Clone)] pub struct PostcardDataFormat; +impl RequestResponseMessageHandler { + pub fn new(max_block_size: usize) -> Self { + assert_ne!( + max_block_size, 0, + "BoundedCodec does not support zero block size" + ); + + Self { + data_format: PostcardDataFormat, + max_response_size: max_block_size, + } + } +} + +impl GossipsubMessageHandler { + pub fn new() -> Self { + GossipsubMessageHandler { + data_format: PostcardDataFormat, + } + } +} + impl Encode for PostcardDataFormat where T: ?Sized + serde::Serialize, @@ -18,7 +42,7 @@ where type Encoder<'a> = Cow<'a, [u8]> where T: 'a; type Error = io::Error; - fn encode(value: &T) -> Result, Self::Error> { + fn encode<'b>(&self, value: &'b T) -> Result, Self::Error> { Ok(Cow::Owned(postcard::to_allocvec(value).map_err(|e| { io::Error::new(io::ErrorKind::Other, e.to_string()) })?)) @@ -221,7 +245,9 @@ mod tests { RequestResponseMessageHandler::new(1024); // When - let buf = >::encode(&response) + let buf = codec + .data_format + .encode(&response) .expect("Serialization as V1ResponseMessage should succeed"); let deserialized = codec .read_response(&RequestResponseProtocol::V1, &mut &*buf) diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs index 95af7922f94..230601fb355 100644 --- a/crates/services/p2p/src/codecs/request_response.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -30,27 +30,13 @@ use super::{ #[derive(Debug, Clone)] pub struct RequestResponseMessageHandler { - pub(crate) _data_format: PhantomData, + pub(crate) data_format: Format, /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block pub(crate) max_response_size: usize, } -impl RequestResponseMessageHandler { - pub fn new(max_block_size: usize) -> Self { - assert_ne!( - max_block_size, 0, - "BoundedCodec does not support zero block size" - ); - - Self { - _data_format: PhantomData, - max_response_size: max_block_size, - } - } -} - /// Since Postcard does not support async reads or writes out of the box /// We prefix Request & Response Messages with the length of the data in bytes /// We expect the substream to be properly closed when response channel is dropped. @@ -124,7 +110,7 @@ where where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = Format::encode(&req)?; + let encoded_data = self.data_format.encode(&req)?; socket.write_all(&encoded_data.as_bytes()).await?; Ok(()) } @@ -141,11 +127,15 @@ where match protocol { RequestResponseProtocol::V1 => { let v1_response: V1ResponseMessage = res.into(); - let res = Format::encode(&v1_response)?.as_bytes().into_owned(); + let res = self + .data_format + .encode(&v1_response)? + .as_bytes() + .into_owned(); socket.write_all(&res).await?; } RequestResponseProtocol::V2 => { - let res = Format::encode(&res)?.as_bytes().into_owned(); + let res = self.data_format.encode(&res)?.as_bytes().into_owned(); socket.write_all(&res).await?; } }; From 58ffe6756ab6e456d82377d0e8e4dc34537a4853 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:06:11 +0000 Subject: [PATCH 10/22] Minor improvements --- crates/services/p2p/src/codecs.rs | 6 ---- crates/services/p2p/src/codecs/gossipsub.rs | 5 +-- .../p2p/src/codecs/request_response.rs | 32 ++++++++----------- 3 files changed, 14 insertions(+), 29 deletions(-) diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 407130fdc35..2b99724e918 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -49,12 +49,6 @@ impl<'a> Encoder for Cow<'a, [u8]> { } } -impl Encoder for [u8; SIZE] { - fn as_bytes(&self) -> Cow<[u8]> { - Cow::Borrowed(self.as_slice()) - } -} - /// Implement this in order to handle serialization & deserialization of Gossipsub messages pub trait GossipsubCodec { type RequestMessage; diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index 878d0634bd2..2fe6b7f504b 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -1,7 +1,4 @@ -use std::{ - io, - marker::PhantomData, -}; +use std::io; use fuel_core_types::fuel_tx::Transaction; diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs index 230601fb355..11889448532 100644 --- a/crates/services/p2p/src/codecs/request_response.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -1,16 +1,4 @@ -use std::{ - io, - marker::PhantomData, -}; - -use async_trait::async_trait; -use futures::{ - AsyncRead, - AsyncReadExt, - AsyncWriteExt, -}; -use libp2p::request_response; -use strum::IntoEnumIterator as _; +use std::io; use crate::request_response::{ messages::{ @@ -20,6 +8,14 @@ use crate::request_response::{ }, protocols::RequestResponseProtocol, }; +use async_trait::async_trait; +use futures::{ + AsyncRead, + AsyncReadExt, + AsyncWriteExt, +}; +use libp2p::request_response; +use strum::IntoEnumIterator as _; use super::{ Decode, @@ -127,15 +123,13 @@ where match protocol { RequestResponseProtocol::V1 => { let v1_response: V1ResponseMessage = res.into(); - let res = self - .data_format - .encode(&v1_response)? - .as_bytes() - .into_owned(); + let res = self.data_format.encode(&v1_response)?; + let res = res.as_bytes(); socket.write_all(&res).await?; } RequestResponseProtocol::V2 => { - let res = self.data_format.encode(&res)?.as_bytes().into_owned(); + let res = self.data_format.encode(&res)?; + let res = res.as_bytes(); socket.write_all(&res).await?; } }; From c564941d769d1524d04dfec9675b6f7da7d5b5f6 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:12:19 +0000 Subject: [PATCH 11/22] Decode trait function takes &self as argument --- crates/fuel-core/src/p2p_test_helpers.rs | 2 +- crates/services/p2p/src/codecs.rs | 2 +- crates/services/p2p/src/codecs/gossipsub.rs | 2 +- crates/services/p2p/src/codecs/postcard.rs | 6 +++--- crates/services/p2p/src/codecs/request_response.rs | 10 ++++------ 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 55b95df03de..2513efdea4d 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -25,7 +25,7 @@ use fuel_core_chain_config::{ use fuel_core_p2p::{ codecs::{ request_response::RequestResponseMessageHandler, - unbounded::GossipsubMessageHandler, + gossipsub::GossipsubMessageHandler, }, network_service::FuelP2PService, p2p_service::FuelP2PEvent, diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 2b99724e918..0d00cbb1787 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -37,7 +37,7 @@ pub trait Encode { pub trait Decode { type Error; /// Decodes the type `T` from the bytes. - fn decode(bytes: &[u8]) -> Result; + fn decode(&self, bytes: &[u8]) -> Result; } impl<'a> Encoder for Cow<'a, [u8]> { diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index 2fe6b7f504b..f225de81f2b 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -44,7 +44,7 @@ where ) -> Result { let decoded_response = match gossipsub_tag { GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(Format::decode(encoded_data)?) + GossipsubMessage::NewTx(self.data_format.decode(encoded_data)?) } }; diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 5241efc5299..7d59d357e2e 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -30,7 +30,7 @@ impl RequestResponseMessageHandler { impl GossipsubMessageHandler { pub fn new() -> Self { GossipsubMessageHandler { - data_format: PostcardDataFormat, + codec: PostcardDataFormat, } } } @@ -55,7 +55,7 @@ where { type Error = io::Error; - fn decode(bytes: &[u8]) -> Result { + fn decode(&self, bytes: &[u8]) -> Result { postcard::from_bytes(bytes) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } @@ -228,7 +228,7 @@ mod tests { let deserialized_as_v1 = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `V1ResponseMessage` type. - >::decode(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + codec.data_format.decode(&buf).expect("Deserialization as V1ResponseMessage should succeed"); // Then assert!(matches!( diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs index 11889448532..238a4afccbf 100644 --- a/crates/services/p2p/src/codecs/request_response.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -68,7 +68,7 @@ where .take(self.max_response_size as u64) .read_to_end(&mut response) .await?; - Format::decode(&response) + self.data_format.decode(&response) } async fn read_response( @@ -87,13 +87,11 @@ where match protocol { RequestResponseProtocol::V1 => { - let v1_response = - >::decode(&response)?; + let v1_response: V1ResponseMessage = + self.data_format.decode(&response)?; Ok(v1_response.into()) } - RequestResponseProtocol::V2 => { - >::decode(&response) - } + RequestResponseProtocol::V2 => self.data_format.decode(&response), } } From c5aa3622bb56ef69ce7aee44231ae20a9d05d2c3 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:15:40 +0000 Subject: [PATCH 12/22] DataFormat is now Codec --- crates/services/p2p/src/behavior.rs | 6 ++-- crates/services/p2p/src/codecs/gossipsub.rs | 12 ++++---- crates/services/p2p/src/codecs/postcard.rs | 30 +++++++++---------- .../p2p/src/codecs/request_response.rs | 21 +++++++------ crates/services/p2p/src/p2p_service.rs | 8 ++--- 5 files changed, 38 insertions(+), 39 deletions(-) diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index feaf5741cf2..0e194c88442 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,6 +1,6 @@ use crate::{ codecs::{ - postcard::PostcardDataFormat, + postcard::PostcardCodec, request_response::RequestResponseMessageHandler, RequestResponseProtocols, }, @@ -61,13 +61,13 @@ pub struct FuelBehaviour { /// RequestResponse protocol request_response: - request_response::Behaviour>, + request_response::Behaviour>, } impl FuelBehaviour { pub(crate) fn new( p2p_config: &Config, - request_response_codec: RequestResponseMessageHandler, + request_response_codec: RequestResponseMessageHandler, ) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index f225de81f2b..d892e795ae3 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -16,13 +16,13 @@ use super::{ }; #[derive(Debug, Clone)] -pub struct GossipsubMessageHandler { - pub(crate) data_format: Format, +pub struct GossipsubMessageHandler { + pub(crate) codec: Codec, } -impl GossipsubCodec for GossipsubMessageHandler +impl GossipsubCodec for GossipsubMessageHandler where - Format: Encode + Codec: Encode + Decode + Send, { @@ -32,7 +32,7 @@ where fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { match data { GossipsubBroadcastRequest::NewTx(tx) => { - Ok(self.data_format.encode(&*tx)?.as_bytes().into_owned()) + Ok(self.codec.encode(&*tx)?.as_bytes().into_owned()) } } } @@ -44,7 +44,7 @@ where ) -> Result { let decoded_response = match gossipsub_tag { GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(self.data_format.decode(encoded_data)?) + GossipsubMessage::NewTx(self.codec.decode(encoded_data)?) } }; diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 7d59d357e2e..b22c28926ed 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -11,9 +11,9 @@ use std::{ }; #[derive(Clone)] -pub struct PostcardDataFormat; +pub struct PostcardCodec; -impl RequestResponseMessageHandler { +impl RequestResponseMessageHandler { pub fn new(max_block_size: usize) -> Self { assert_ne!( max_block_size, 0, @@ -21,21 +21,21 @@ impl RequestResponseMessageHandler { ); Self { - data_format: PostcardDataFormat, + codec: PostcardCodec, max_response_size: max_block_size, } } } -impl GossipsubMessageHandler { +impl GossipsubMessageHandler { pub fn new() -> Self { GossipsubMessageHandler { - codec: PostcardDataFormat, + codec: PostcardCodec, } } } -impl Encode for PostcardDataFormat +impl Encode for PostcardCodec where T: ?Sized + serde::Serialize, { @@ -49,7 +49,7 @@ where } } -impl Decode for PostcardDataFormat +impl Decode for PostcardCodec where T: serde::de::DeserializeOwned, { @@ -96,7 +96,7 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec: RequestResponseMessageHandler = + let mut codec: RequestResponseMessageHandler = RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); @@ -124,7 +124,7 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec: RequestResponseMessageHandler = + let mut codec: RequestResponseMessageHandler = RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); @@ -152,7 +152,7 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec: RequestResponseMessageHandler = + let mut codec: RequestResponseMessageHandler = RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); @@ -185,7 +185,7 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec: RequestResponseMessageHandler = + let mut codec: RequestResponseMessageHandler = RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); @@ -215,7 +215,7 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec: RequestResponseMessageHandler = + let mut codec: RequestResponseMessageHandler = RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); @@ -228,7 +228,7 @@ mod tests { let deserialized_as_v1 = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `V1ResponseMessage` type. - codec.data_format.decode(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + codec.codec.decode(&buf).expect("Deserialization as V1ResponseMessage should succeed"); // Then assert!(matches!( @@ -241,12 +241,12 @@ mod tests { async fn codec__read_response_is_backwards_compatible_with_v1() { // Given let response = V1ResponseMessage::SealedHeaders(None); - let mut codec: RequestResponseMessageHandler = + let mut codec: RequestResponseMessageHandler = RequestResponseMessageHandler::new(1024); // When let buf = codec - .data_format + .codec .encode(&response) .expect("Serialization as V1ResponseMessage should succeed"); let deserialized = codec diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs index 238a4afccbf..bffecb24f91 100644 --- a/crates/services/p2p/src/codecs/request_response.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -25,8 +25,8 @@ use super::{ }; #[derive(Debug, Clone)] -pub struct RequestResponseMessageHandler { - pub(crate) data_format: Format, +pub struct RequestResponseMessageHandler { + pub(crate) codec: Codec, /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block @@ -41,9 +41,9 @@ pub struct RequestResponseMessageHandler { /// If the substream was not properly closed when dropped, the sender would instead /// run into a timeout waiting for the response. #[async_trait] -impl request_response::Codec for RequestResponseMessageHandler +impl request_response::Codec for RequestResponseMessageHandler where - Format: Encode + Codec: Encode + Decode + Encode + Decode @@ -68,7 +68,7 @@ where .take(self.max_response_size as u64) .read_to_end(&mut response) .await?; - self.data_format.decode(&response) + self.codec.decode(&response) } async fn read_response( @@ -87,11 +87,10 @@ where match protocol { RequestResponseProtocol::V1 => { - let v1_response: V1ResponseMessage = - self.data_format.decode(&response)?; + let v1_response: V1ResponseMessage = self.codec.decode(&response)?; Ok(v1_response.into()) } - RequestResponseProtocol::V2 => self.data_format.decode(&response), + RequestResponseProtocol::V2 => self.codec.decode(&response), } } @@ -104,7 +103,7 @@ where where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = self.data_format.encode(&req)?; + let encoded_data = self.codec.encode(&req)?; socket.write_all(&encoded_data.as_bytes()).await?; Ok(()) } @@ -121,12 +120,12 @@ where match protocol { RequestResponseProtocol::V1 => { let v1_response: V1ResponseMessage = res.into(); - let res = self.data_format.encode(&v1_response)?; + let res = self.codec.encode(&v1_response)?; let res = res.as_bytes(); socket.write_all(&res).await?; } RequestResponseProtocol::V2 => { - let res = self.data_format.encode(&res)?; + let res = self.codec.encode(&res)?; let res = res.as_bytes(); socket.write_all(&res).await?; } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index ae2d0b585ab..dc2810ee7ff 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -5,7 +5,7 @@ use crate::{ }, codecs::{ gossipsub::GossipsubMessageHandler, - postcard::PostcardDataFormat, + postcard::PostcardCodec, request_response::RequestResponseMessageHandler, GossipsubCodec, }, @@ -124,7 +124,7 @@ pub struct FuelP2PService { inbound_requests_table: HashMap>, /// `UboundedCodec` as GossipsubCodec for encoding and decoding of Gossipsub messages - gossipsub_codec: GossipsubMessageHandler, + gossipsub_codec: GossipsubMessageHandler, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -213,8 +213,8 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - gossipsub_codec: GossipsubMessageHandler, - request_response_codec: RequestResponseMessageHandler, + gossipsub_codec: GossipsubMessageHandler, + request_response_codec: RequestResponseMessageHandler, ) -> anyhow::Result { let metrics = config.metrics; From 89b022cd09912c3ed0c796b9f4300a51e31eb404 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:18:03 +0000 Subject: [PATCH 13/22] Typo --- crates/services/p2p/src/codecs/postcard.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index b22c28926ed..1f3152a5426 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -17,7 +17,7 @@ impl RequestResponseMessageHandler { pub fn new(max_block_size: usize) -> Self { assert_ne!( max_block_size, 0, - "BoundedCodec does not support zero block size" + "RequestResponseMessageHandler does not support zero block size" ); Self { From 45bbd91b2ace7299e40f713dd2b3206863214f56 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:28:24 +0000 Subject: [PATCH 14/22] CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 570a3130672..9174a8c96be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. +- [2388](https://github.com/FuelLabs/fuel-core/pull/2388): Rework the P2P service codecs to avoid unnecessary coupling between components. The refactoring makes it explicit that the Gossipsub and RequestResponse codecs only share encoding/decoding functionalities from the Postcard codec. It also makes handling Gossipsub and RequestResponse messages completely independent of each other. ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). From caf4f001f87ec6ca8ce03e189bc8deee820e28c2 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:32:06 +0000 Subject: [PATCH 15/22] Add issue to TODO --- crates/services/p2p/src/codecs.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 0d00cbb1787..881ed7a23e9 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -10,8 +10,9 @@ use std::{ io, }; -// TODO: This trait is largely copied by the storage crate, we should unify them -/// The trait is usually implemented by the encoder that stores serialized objects. +// TODO: https://github.com/FuelLabs/fuel-core/issues/2403 +// This trait is largely a copy-paste from the storage crate. +// It would be best to have this trait in a separate crate that both storage and p2p can depend on. pub trait Encoder: Send { /// Returns the serialized object as a slice. fn as_bytes(&self) -> Cow<[u8]>; @@ -22,6 +23,9 @@ pub trait Encoder: Send { /// flexibility and more performant encoding, allowing the use of slices and arrays /// instead of vectors in some cases. Since the [`Encoder`] returns `Cow<[u8]>`, /// it is always possible to take ownership of the serialized value. +// TODO: https://github.com/FuelLabs/fuel-core/issues/2403 +// This trait is largely a copy-paste from the storage crate. +// It would be best to have this trait in a separate crate that both storage and p2p can depend on. pub trait Encode { type Error; /// The encoder type that stores serialized object. @@ -34,6 +38,9 @@ pub trait Encode { } /// The trait decodes the type from the bytes. +// TODO: https://github.com/FuelLabs/fuel-core/issues/2403 +// This trait is largely a copy-paste from the storage crate. +// It would be best to have this trait in a separate crate that both storage and p2p can depend on. pub trait Decode { type Error; /// Decodes the type `T` from the bytes. From a55f695b4a7e5330efe5cba83ab7a301410f11b6 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 28 Oct 2024 22:34:14 +0000 Subject: [PATCH 16/22] Rename lifetime --- crates/services/p2p/src/codecs.rs | 2 +- crates/services/p2p/src/codecs/postcard.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 881ed7a23e9..d9b0e3444a7 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -34,7 +34,7 @@ pub trait Encode { T: 'a; /// Encodes the object to the bytes and passes it to the `Encoder`. - fn encode<'b>(&self, t: &'b T) -> Result, Self::Error>; + fn encode<'a>(&self, t: &'a T) -> Result, Self::Error>; } /// The trait decodes the type from the bytes. diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 1f3152a5426..3bda241a9d9 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -42,7 +42,7 @@ where type Encoder<'a> = Cow<'a, [u8]> where T: 'a; type Error = io::Error; - fn encode<'b>(&self, value: &'b T) -> Result, Self::Error> { + fn encode<'a>(&self, value: &'a T) -> Result, Self::Error> { Ok(Cow::Owned(postcard::to_allocvec(value).map_err(|e| { io::Error::new(io::ErrorKind::Other, e.to_string()) })?)) From fc2d71d001e7106143f3a75ff20821d1136f5098 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 29 Oct 2024 09:47:48 +0000 Subject: [PATCH 17/22] Formatting --- crates/fuel-core/src/p2p_test_helpers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 2513efdea4d..211df26030e 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -24,8 +24,8 @@ use fuel_core_chain_config::{ }; use fuel_core_p2p::{ codecs::{ - request_response::RequestResponseMessageHandler, gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, }, network_service::FuelP2PService, p2p_service::FuelP2PEvent, From 136e6b49d7d7b8d8fbebbb80aa98e8a6e0dba272 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 29 Oct 2024 10:29:04 +0000 Subject: [PATCH 18/22] Placate Clippy --- crates/services/p2p/src/codecs/gossipsub.rs | 2 +- crates/services/p2p/src/codecs/postcard.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index d892e795ae3..6380d247483 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -15,7 +15,7 @@ use super::{ GossipsubCodec, }; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct GossipsubMessageHandler { pub(crate) codec: Codec, } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 3bda241a9d9..b13d5fdd9a7 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -10,7 +10,7 @@ use std::{ io, }; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct PostcardCodec; impl RequestResponseMessageHandler { From 029fbff3fa13fc3b48c48ba4149442e242bb059f Mon Sep 17 00:00:00 2001 From: acerone85 Date: Fri, 15 Nov 2024 15:43:33 +0000 Subject: [PATCH 19/22] Fix test compilation --- crates/services/p2p/src/p2p_service.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 85f20484d6a..e894549bebd 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -1079,7 +1079,8 @@ mod tests { let mut service = FuelP2PService::new( sender, p2p_config, - PostcardCodec::new(max_block_size), + GossipsubMessageHandler::new(), + RequestResponseMessageHandler::new(max_block_size), ) .await .unwrap(); From 05c50fd24c03e663f63adbf0becf2c4ada52b440 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 26 Nov 2024 16:48:03 +0000 Subject: [PATCH 20/22] Reference todo issue --- crates/services/p2p/src/codecs/request_response.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs index bffecb24f91..3208f7b8b17 100644 --- a/crates/services/p2p/src/codecs/request_response.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -142,7 +142,8 @@ where fn get_req_res_protocols( &self, ) -> impl Iterator::Protocol> { - // TODO: Iterating over versions in reverse order should prefer + // TODO: https://github.com/FuelLabs/fuel-core/issues/2458. + // Iterating over versions in reverse order should prefer // peers to use V2 over V1 for exchanging messages. However, this is // not guaranteed by the specs for the `request_response` protocol. RequestResponseProtocol::iter().rev() From e9f14046adec21f4ba4856a89b9307b63fb38fd2 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 26 Nov 2024 17:00:36 +0000 Subject: [PATCH 21/22] Avoid dereference --- crates/services/p2p/src/codecs/gossipsub.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index 6380d247483..e5bab789134 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -32,7 +32,7 @@ where fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { match data { GossipsubBroadcastRequest::NewTx(tx) => { - Ok(self.codec.encode(&*tx)?.as_bytes().into_owned()) + Ok(self.codec.encode(&tx)?.as_bytes().into_owned()) } } } From b94c4519c21460e4399d767b1147e3698e3e3f80 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 26 Nov 2024 17:23:17 +0000 Subject: [PATCH 22/22] Todo: max_response_size should be u64 --- crates/services/p2p/src/codecs/request_response.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs index 3208f7b8b17..5afb2c0557c 100644 --- a/crates/services/p2p/src/codecs/request_response.rs +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -30,6 +30,8 @@ pub struct RequestResponseMessageHandler { /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block + // TODO: https://github.com/FuelLabs/fuel-core/issues/2459. + // Make this a u64 instead of usize. pub(crate) max_response_size: usize, }