Skip to content

Commit

Permalink
upgrade status in handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing committed May 13, 2024
1 parent 55fd93f commit c7762fb
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 9 deletions.
35 changes: 32 additions & 3 deletions crates/net/eth-wire-types/src/upgrade_status.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,40 @@
use alloy_rlp::{RlpDecodable, RlpEncodable};
use serde::{Deserialize, Serialize};
use reth_codecs::derive_arbitrary;
use reth_primitives::{Bytes, B256};
use reth_primitives::{Bytes, B256, hex};

#[derive_arbitrary(rlp)]
#[derive(Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct UpgradeStatus {
pub extension : Vec<Bytes>
}
pub extension : Vec<u8>
}

#[derive_arbitrary(rlp)]
#[derive(Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct UpgradeStatusExtension {
pub disable_peer_tx_broadcast: bool
}

#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
use alloy_rlp::Encodable;
use reth_primitives::hex;
use crate::{EthMessage, ProtocolMessage};

#[test]
fn test_encode_upgrade_status() {
let extension = UpgradeStatusExtension{disable_peer_tx_broadcast: false};
let mut buffer = Vec::<u8>::new();
let _ = extension.encode(&mut buffer);
let result = alloy_rlp::encode(ProtocolMessage::from(EthMessage::UpgradeStatus(UpgradeStatus{
extension: buffer,
})));

println!("{}", hex::encode(result))
}
}

34 changes: 34 additions & 0 deletions crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
p2pstream::HANDSHAKE_TIMEOUT,
CanDisconnect, DisconnectReason, EthMessage, EthVersion, ProtocolMessage, Status,
UpgradeStatus, UpgradeStatusExtension
};
use futures::{ready, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
Expand All @@ -15,6 +16,7 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use alloy_rlp::Encodable;
use tokio::time::timeout;
use tokio_stream::Stream;
use tracing::{debug, trace};
Expand Down Expand Up @@ -166,6 +168,38 @@ where
return Err(err.into())
}

// For BSC, UpgradeStatus message should be sent during handshake.
//#[cfg(feature = "bsc")]
if version > EthVersion::Eth66 {
// TODO: support disable_peer_tx_broadcast flag
let extension = UpgradeStatusExtension{disable_peer_tx_broadcast: false};
let mut buffer = Vec::<u8>::new();
let _ = extension.encode(&mut buffer);
self.inner
.send(alloy_rlp::encode(ProtocolMessage::from(EthMessage::UpgradeStatus(UpgradeStatus{
extension: buffer,
}))).into())
.await?;
let their_msg_res = self.inner.next().await;
let their_msg = match their_msg_res {
Some(msg) => msg,
None => {
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(EthStreamError::EthHandshakeError(EthHandshakeError::NoResponse))
}
}?;
let msg = match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) {
Ok(m) => m,
Err(err) => {
debug!("decode error in eth handshake: msg={their_msg:x}");
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(EthStreamError::InvalidMessage(err))
}
};
// TODO: support disable_peer_tx_broadcast flag
debug!("msg {:?}", msg.message)
}

// now we can create the `EthStream` because the peer has successfully completed
// the handshake
let stream = EthStream::new(version, self.inner);
Expand Down
7 changes: 4 additions & 3 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,10 @@ where
// reject message in POS
if self.handle.mode().is_stake() {
// connections to peers which send invalid messages should be terminated
self.swarm
.sessions_mut()
.disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
// self.swarm
// .sessions_mut()
// .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
only_pow(self);
} else {
only_pow(self);
}
Expand Down
8 changes: 5 additions & 3 deletions crates/net/network/src/session/active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ impl ActiveSession {
EthMessage::Receipts(resp) => {
on_response!(resp, GetReceipts)
},
EthMessage::UpgradeStatus(resp) => {
//TODO: fix me
OnIncomingMessageOutcome::Ok
message @ EthMessage::UpgradeStatus(_) => {
OnIncomingMessageOutcome::BadMessage {
error: EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake),
message,
}
},
}
}
Expand Down

0 comments on commit c7762fb

Please sign in to comment.