Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove p2pservice networkcodec trait #2388

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b6f7353
Request/Response: remove NetworkCodec trait
acerone85 Oct 23, 2024
529d1d6
Dinstinguish between Format being used and Codec for requests and res…
acerone85 Oct 23, 2024
a4465d5
Move RequestResponse protocol definitions to dedicated module
acerone85 Oct 23, 2024
7b858c5
Decouple GossibSub and RequestResponse codecs
acerone85 Oct 23, 2024
f2cd720
Merge branch 'master' into 2368-remove-p2pservice-networkcodec-trait
acerone85 Oct 28, 2024
1aa98dc
Simplify DataFormat trait
acerone85 Oct 28, 2024
79b507d
Use Encode and Decode trait in favour of DataFormat
acerone85 Oct 28, 2024
ead761a
Rename codecs to message handlers
acerone85 Oct 28, 2024
ed29376
Format
acerone85 Oct 28, 2024
138716b
Encode trait function takes &self as argument
acerone85 Oct 28, 2024
58ffe67
Minor improvements
acerone85 Oct 28, 2024
c564941
Decode trait function takes &self as argument
acerone85 Oct 28, 2024
c5aa362
DataFormat is now Codec
acerone85 Oct 28, 2024
89b022c
Typo
acerone85 Oct 28, 2024
45bbd91
CHANGELOG
acerone85 Oct 28, 2024
caf4f00
Add issue to TODO
acerone85 Oct 28, 2024
a55f695
Rename lifetime
acerone85 Oct 28, 2024
fc2d71d
Formatting
acerone85 Oct 29, 2024
136e6b4
Placate Clippy
acerone85 Oct 29, 2024
5bd9e21
Merge branch 'master' into 2368-remove-p2pservice-networkcodec-trait
acerone85 Nov 15, 2024
029fbff
Fix test compilation
acerone85 Nov 15, 2024
39e1ed9
Merge branch 'master' into 2368-remove-p2pservice-networkcodec-trait
acerone85 Nov 26, 2024
05c50fd
Reference todo issue
acerone85 Nov 26, 2024
e9f1404
Avoid dereference
acerone85 Nov 26, 2024
b94c451
Todo: max_response_size should be u64
acerone85 Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use fuel_core_chain_config::{
StateConfig,
};
use fuel_core_p2p::{
codecs::postcard::PostcardCodec,
codecs::{
bounded::BoundedCodec,
unbounded::UnboundedCodec,
},
network_service::FuelP2PService,
p2p_service::FuelP2PEvent,
request_response::messages::{
Expand Down Expand Up @@ -142,10 +145,17 @@ impl Bootstrap {
/// Spawn a bootstrap node.
pub async fn new(node_config: &Config) -> anyhow::Result<Self> {
let bootstrap_config = extract_p2p_config(node_config).await;
let codec = PostcardCodec::new(bootstrap_config.max_block_size);
let request_response_codec = BoundedCodec::new(bootstrap_config.max_block_size);
let gossipsub_codec = UnboundedCodec::new();
let (sender, _) =
broadcast::channel(bootstrap_config.reserved_nodes.len().saturating_add(1));
let mut bootstrap = FuelP2PService::new(sender, bootstrap_config, codec).await?;
let mut bootstrap = FuelP2PService::new(
sender,
bootstrap_config,
gossipsub_codec,
request_response_codec,
)
.await?;
bootstrap.start().await?;

let listeners = bootstrap.multiaddrs();
Expand Down
16 changes: 10 additions & 6 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
codecs::{
postcard::PostcardCodec,
NetworkCodec,
bounded::BoundedCodec,
postcard::PostcardDataFormat,
RequestResponseProtocols,
},
config::Config,
discovery,
Expand Down Expand Up @@ -59,11 +60,14 @@ pub struct FuelBehaviour {
discovery: discovery::Behaviour,

/// RequestResponse protocol
request_response: request_response::Behaviour<PostcardCodec>,
request_response: request_response::Behaviour<BoundedCodec<PostcardDataFormat>>,
}

impl FuelBehaviour {
pub(crate) fn new(p2p_config: &Config, codec: PostcardCodec) -> anyhow::Result<Self> {
pub(crate) fn new(
p2p_config: &Config,
request_response_codec: BoundedCodec<PostcardDataFormat>,
) -> anyhow::Result<Self> {
let local_public_key = p2p_config.keypair.public();
let local_peer_id = PeerId::from_public_key(&local_public_key);

Expand Down Expand Up @@ -110,7 +114,7 @@ impl FuelBehaviour {
BlockHeight::default(),
);

let req_res_protocol = codec
let req_res_protocol = request_response_codec
.get_req_res_protocols()
.map(|protocol| (protocol, ProtocolSupport::Full));

Expand All @@ -119,7 +123,7 @@ impl FuelBehaviour {
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response = request_response::Behaviour::with_codec(
codec.clone(),
request_response_codec.clone(),
req_res_protocol,
req_res_config,
);
Expand Down
41 changes: 17 additions & 24 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
pub mod bounded;
pub mod postcard;
pub mod unbounded;

use crate::{
gossipsub::messages::{
GossipTopicTag,
GossipsubBroadcastRequest,
GossipsubMessage,
},
request_response::messages::{
RequestMessage,
V2ResponseMessage,
},
};
use crate::gossipsub::messages::GossipTopicTag;
use libp2p::request_response;
use serde::{
Deserialize,
Serialize,
};
use std::io;

trait DataFormatCodec {
netrome marked this conversation as resolved.
Show resolved Hide resolved
type Error;
fn deserialize<'a, R: Deserialize<'a>>(
encoded_data: &'a [u8],
netrome marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<R, Self::Error>;

fn serialize<D: Serialize>(data: &D) -> Result<Vec<u8>, Self::Error>;
netrome marked this conversation as resolved.
Show resolved Hide resolved
}
netrome marked this conversation as resolved.
Show resolved Hide resolved

/// Implement this in order to handle serialization & deserialization of Gossipsub messages
pub trait GossipsubCodec {
type RequestMessage;
Expand All @@ -28,19 +33,7 @@ pub trait GossipsubCodec {
) -> Result<Self::ResponseMessage, io::Error>;
}

// TODO: https://github.com/FuelLabs/fuel-core/issues/2368
// Remove this trait
/// Main Codec trait
/// Needs to be implemented and provided to FuelBehaviour
pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + request_response::Codec<Request = RequestMessage, Response = V2ResponseMessage>
+ Clone
+ Send
+ 'static
{
pub trait RequestResponseProtocols: request_response::Codec {
/// Returns RequestResponse's Protocol
/// Needed for initialization of RequestResponse Behaviour
fn get_req_res_protocols(
Expand Down
156 changes: 156 additions & 0 deletions crates/services/p2p/src/codecs/bounded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::{
io,
marker::PhantomData,
};

use async_trait::async_trait;
use futures::{
AsyncRead,
AsyncReadExt,
AsyncWriteExt,
};
use libp2p::request_response;
use strum::IntoEnumIterator as _;

use crate::request_response::{
messages::{
RequestMessage,
V1ResponseMessage,
V2ResponseMessage,
},
protocols::RequestResponseProtocol,
};

use super::{
DataFormatCodec,
RequestResponseProtocols,
};

#[derive(Debug, Clone)]
pub struct BoundedCodec<Format> {
_format: PhantomData<Format>,
netrome marked this conversation as resolved.
Show resolved Hide resolved
/// Used for `max_size` parameter when reading Response Message
/// Necessary in order to avoid DoS attacks
/// Currently the size mostly depends on the max size of the Block
max_response_size: usize,
}

impl<Format> BoundedCodec<Format> {
pub fn new(max_block_size: usize) -> Self {
assert_ne!(
max_block_size, 0,
"PostcardCodec does not support zero block size"
);

Self {
_format: PhantomData,
max_response_size: max_block_size,
}
}
}

/// Since Postcard does not support async reads or writes out of the box
/// We prefix Request & Response Messages with the length of the data in bytes
/// We expect the substream to be properly closed when response channel is dropped.
/// Since the request protocol used here expects a response, the sender considers this
/// early close as a protocol violation which results in the connection being closed.
/// If the substream was not properly closed when dropped, the sender would instead
/// run into a timeout waiting for the response.
#[async_trait]
impl<Format> request_response::Codec for BoundedCodec<Format>
netrome marked this conversation as resolved.
Show resolved Hide resolved
where
Format: DataFormatCodec<Error = io::Error> + Send,
{
type Protocol = RequestResponseProtocol;
type Request = RequestMessage;
type Response = V2ResponseMessage;

async fn read_request<T>(
&mut self,
_protocol: &Self::Protocol,
socket: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let mut response = Vec::new();
socket
.take(self.max_response_size as u64)
.read_to_end(&mut response)
.await?;
Format::deserialize(&response)
}

async fn read_response<T>(
&mut self,
protocol: &Self::Protocol,
socket: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let mut response = Vec::new();
socket
.take(self.max_response_size as u64)
.read_to_end(&mut response)
.await?;

match protocol {
RequestResponseProtocol::V1 => {
let v1_response = Format::deserialize::<V1ResponseMessage>(&response)?;
Ok(v1_response.into())
}
RequestResponseProtocol::V2 => {
Format::deserialize::<V2ResponseMessage>(&response)
}
}
}

async fn write_request<T>(
&mut self,
_protocol: &Self::Protocol,
socket: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
let encoded_data = Format::serialize(&req)?;
socket.write_all(&encoded_data).await?;
Ok(())
}

async fn write_response<T>(
&mut self,
protocol: &Self::Protocol,
socket: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
let encoded_data = match protocol {
RequestResponseProtocol::V1 => {
let v1_response: V1ResponseMessage = res.into();
Format::serialize(&v1_response)?
}
RequestResponseProtocol::V2 => Format::serialize(&res)?,
};
socket.write_all(&encoded_data).await?;
Ok(())
}
}

impl<Codec> RequestResponseProtocols for Codec
where
Codec: request_response::Codec<Protocol = RequestResponseProtocol>,
{
fn get_req_res_protocols(
&self,
) -> impl Iterator<Item = <Self as request_response::Codec>::Protocol> {
// TODO: Iterating over versions in reverse order should prefer
// peers to use V2 over V1 for exchanging messages. However, this is
// not guaranteed by the specs for the `request_response` protocol.
RequestResponseProtocol::iter().rev()
}
}
Loading
Loading