diff --git a/crates/net/eth-wire-types/src/upgrade_status.rs b/crates/net/eth-wire-types/src/upgrade_status.rs index 2146ce3fd5..dbbef59fa0 100644 --- a/crates/net/eth-wire-types/src/upgrade_status.rs +++ b/crates/net/eth-wire-types/src/upgrade_status.rs @@ -1,11 +1,40 @@ use alloy_rlp::{RlpDecodable, RlpEncodable}; use serde::{Deserialize, Serialize}; use reth_codecs::derive_arbitrary; -use reth_primitives::{Bytes, B256}; +use reth_primitives::{Bytes, B256, hex}; #[derive_arbitrary(rlp)] #[derive(Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct UpgradeStatus { - pub extension : Vec -} \ No newline at end of file + pub extension : Vec +} + +#[derive_arbitrary(rlp)] +#[derive(Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct UpgradeStatusExtension { + pub disable_peer_tx_broadcast: bool +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::Path; + use alloy_rlp::Encodable; + use reth_primitives::hex; + use crate::{EthMessage, ProtocolMessage}; + + #[test] + fn test_encode_upgrade_status() { + let extension = UpgradeStatusExtension{disable_peer_tx_broadcast: false}; + let mut buffer = Vec::::new(); + let _ = extension.encode(&mut buffer); + let result = alloy_rlp::encode(ProtocolMessage::from(EthMessage::UpgradeStatus(UpgradeStatus{ + extension: buffer, + }))); + + println!("{}", hex::encode(result)) + } +} + diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index 8de5090347..2ef320c393 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -3,6 +3,7 @@ use crate::{ message::{EthBroadcastMessage, ProtocolBroadcastMessage}, p2pstream::HANDSHAKE_TIMEOUT, CanDisconnect, DisconnectReason, EthMessage, EthVersion, ProtocolMessage, Status, + UpgradeStatus, UpgradeStatusExtension }; use futures::{ready, Sink, SinkExt, StreamExt}; use pin_project::pin_project; @@ -15,6 +16,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use alloy_rlp::Encodable; use tokio::time::timeout; use tokio_stream::Stream; use tracing::{debug, trace}; @@ -166,6 +168,38 @@ where return Err(err.into()) } + // For BSC, UpgradeStatus message should be sent during handshake. + //#[cfg(feature = "bsc")] + if version > EthVersion::Eth66 { + // TODO: support disable_peer_tx_broadcast flag + let extension = UpgradeStatusExtension{disable_peer_tx_broadcast: false}; + let mut buffer = Vec::::new(); + let _ = extension.encode(&mut buffer); + self.inner + .send(alloy_rlp::encode(ProtocolMessage::from(EthMessage::UpgradeStatus(UpgradeStatus{ + extension: buffer, + }))).into()) + .await?; + let their_msg_res = self.inner.next().await; + let their_msg = match their_msg_res { + Some(msg) => msg, + None => { + self.inner.disconnect(DisconnectReason::DisconnectRequested).await?; + return Err(EthStreamError::EthHandshakeError(EthHandshakeError::NoResponse)) + } + }?; + let msg = match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) { + Ok(m) => m, + Err(err) => { + debug!("decode error in eth handshake: msg={their_msg:x}"); + self.inner.disconnect(DisconnectReason::DisconnectRequested).await?; + return Err(EthStreamError::InvalidMessage(err)) + } + }; + // TODO: support disable_peer_tx_broadcast flag + debug!("msg {:?}", msg.message) + } + // now we can create the `EthStream` because the peer has successfully completed // the handshake let stream = EthStream::new(version, self.inner); diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index d516625c64..a3bfa99016 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -477,9 +477,10 @@ where // reject message in POS if self.handle.mode().is_stake() { // connections to peers which send invalid messages should be terminated - self.swarm - .sessions_mut() - .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific)); + // self.swarm + // .sessions_mut() + // .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific)); + only_pow(self); } else { only_pow(self); } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index ce93b1547f..6bb181d0b7 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -231,9 +231,11 @@ impl ActiveSession { EthMessage::Receipts(resp) => { on_response!(resp, GetReceipts) }, - EthMessage::UpgradeStatus(resp) => { - //TODO: fix me - OnIncomingMessageOutcome::Ok + message @ EthMessage::UpgradeStatus(_) => { + OnIncomingMessageOutcome::BadMessage { + error: EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake), + message, + } }, } }