Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(l1): getBlockHeaders eth capability test #989

Merged
merged 37 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a526c10
Initial peer listen loop
ElFantasma Oct 23, 2024
574299b
Merge branch 'main' into 840-rlpx-listen-loop
ElFantasma Oct 24, 2024
f23077e
Sending eth Status message first
ElFantasma Oct 24, 2024
122778d
Small fixes
ElFantasma Oct 25, 2024
7a4f9cf
Merge branch 'main' into 840-rlpx-listen-loop
ElFantasma Oct 25, 2024
5ae41fe
Small fixes
ElFantasma Oct 25, 2024
66b0bd8
Added TODO comments for pending tasks
ElFantasma Oct 25, 2024
6a5f580
Refactored code to separate encoding/decoding from backend logic
ElFantasma Oct 25, 2024
a6ff7cb
draft: receive encoded message
fkrause98 Oct 25, 2024
0d64f5f
draft: properly decode get headers msg
fkrause98 Oct 28, 2024
4b423a2
draft: send badly encoded msg
fkrause98 Oct 29, 2024
fd7db20
draft: add code in msg
fkrause98 Oct 30, 2024
3930a3f
draft: do not hardcode loop
fkrause98 Oct 30, 2024
779093d
draft: remove print
fkrause98 Oct 31, 2024
9967853
draft: hardcode status message
fkrause98 Oct 31, 2024
0d228c1
Merge branch 'main' into p2p-test-get-block-headers
fkrause98 Oct 31, 2024
5efade9
feat: clean-up for PR
fkrause98 Oct 31, 2024
f55c38f
feat: expect on errors
fkrause98 Oct 31, 2024
7960aba
feat: safe slicing
fkrause98 Oct 31, 2024
079d419
feat: warn on block import error
fkrause98 Oct 31, 2024
c2fa0d3
Add go ethereum ref
fkrause98 Oct 31, 2024
b4c450b
Merge branch 'main' into p2p-test-get-block-headers
fkrause98 Oct 31, 2024
b52fde2
ci: add eth tests to CI
fkrause98 Oct 31, 2024
c04d319
tests: fix {de,en}coding tests
fkrause98 Oct 31, 2024
ec4be28
chore: remove prints
fkrause98 Oct 31, 2024
8690db1
chore: fix typo
fkrause98 Oct 31, 2024
4031595
Merge branch 'main' into p2p-test-get-block-headers
fkrause98 Oct 31, 2024
40c7d61
tests: re-add hive test to CI
fkrause98 Nov 1, 2024
ed5560c
ci: fix .yml
fkrause98 Nov 1, 2024
81df35a
feat: define constant for limit
fkrause98 Nov 1, 2024
d457d5f
pr-comments: remove unnecessary checks
fkrause98 Nov 5, 2024
a8c108c
pr-comments: add comment for RLP-decode of tuple
fkrause98 Nov 5, 2024
f3933b8
lint: remove unused dep
fkrause98 Nov 5, 2024
71bcded
pr-comments: error handling, link issues
fkrause98 Nov 5, 2024
afa055e
pr-comments: break on non-found block
fkrause98 Nov 5, 2024
47fa021
Merge branch 'main' into p2p-test-get-block-headers
fkrause98 Nov 5, 2024
72716de
lint: cargo fmt
fkrause98 Nov 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/hive_and_assertoor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ jobs:
- simulation: snap
name: "Devp2p snap tests"
run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="/AccountRange"
- simulation: eth
name: "Devp2p eth tests"
run_command: make run-hive SIMULATION=devp2p TEST_PATTERN="eth/getblockheaders"
- simulation: engine
name: "Engine tests"
run_command: make run-hive-on-latest SIMULATION=ethereum/engine TEST_PATTERN="/Blob Transactions On Block 1, Cancun Genesis|Blob Transactions On Block 1, Shanghai Genesis|Blob Transaction Ordering, Single Account, Single Blob|Blob Transaction Ordering, Single Account, Dual Blob|Blob Transaction Ordering, Multiple Accounts|Replace Blob Transactions|Parallel Blob Transactions|ForkchoiceUpdatedV3 Modifies Payload ID on Different Beacon Root|NewPayloadV3 After Cancun|NewPayloadV3 Versioned Hashes|ForkchoiceUpdated Version on Payload Request"
Expand Down
16 changes: 15 additions & 1 deletion cmd/ethereum_rust/ethereum_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
net::{Ipv4Addr, SocketAddr, ToSocketAddrs},
};
use tokio_util::task::TaskTracker;
use tracing::{info, warn};
use tracing::{error, info, warn};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
mod cli;
Expand Down Expand Up @@ -137,6 +137,20 @@ async fn main() {
block.header.number, hash, error
);
}
if store
.update_latest_block_number(block.header.number)
.is_err()
{
error!("Fatal: added block {} but could not update the block number -- aborting block import", block.header.number);
break;
};
if store
.set_canonical_block(block.header.number, hash)
.is_err()
{
error!("Fatal: added block {} but could not set it as canonical -- aborting block import", block.header.number);
break;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already fixed by apply_fork_choice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How so? 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply_fork_choice in line 157 sets the last block imported as the head, which makes all its ancestor blocks canonical. I'd check removing this as Fede suggests and checking everything still works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed!

}
if let Some(last_block) = blocks.last() {
let hash = last_block.hash();
Expand Down
50 changes: 42 additions & 8 deletions crates/common/rlp/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,25 @@ impl RLPDecode for bool {

impl RLPDecode for u8 {
fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
if rlp.is_empty() {
return Err(RLPDecodeError::InvalidLength);
}

match rlp[0] {
let first_byte = rlp.first().ok_or(RLPDecodeError::InvalidLength)?;
match first_byte {
// Single byte in the range [0x00, 0x7f]
0..=0x7f => Ok((rlp[0], &rlp[1..])),
0..=0x7f => {
let rest = rlp.get(1..).ok_or(RLPDecodeError::MalformedData)?;
Ok((*first_byte, rest))
}

// RLP_NULL represents zero
RLP_NULL => Ok((0, &rlp[1..])),
&RLP_NULL => {
let rest = rlp.get(1..).ok_or(RLPDecodeError::MalformedData)?;
Ok((0, rest))
}

// Two bytes, where the first byte is RLP_NULL + 1
x if rlp.len() >= 2 && x == RLP_NULL + 1 => Ok((rlp[1], &rlp[2..])),
x if rlp.len() >= 2 && *x == RLP_NULL + 1 => {
let rest = rlp.get(2..).ok_or(RLPDecodeError::MalformedData)?;
Ok((rlp[1], rest))
}

// Any other case is invalid for u8
_ => Err(RLPDecodeError::MalformedData),
Expand Down Expand Up @@ -330,6 +336,34 @@ impl<T1: RLPDecode, T2: RLPDecode, T3: RLPDecode> RLPDecode for (T1, T2, T3) {
}
}

impl<
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a small comment stating that this decodes a 4-element list as a tuple would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

T1: RLPDecode + std::fmt::Debug,
T2: RLPDecode + std::fmt::Debug,
T3: RLPDecode + std::fmt::Debug,
T4: RLPDecode + std::fmt::Debug,
> RLPDecode for (T1, T2, T3, T4)
{
fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
if rlp.is_empty() {
return Err(RLPDecodeError::InvalidLength);
}
let (is_list, payload, input_rest) = decode_rlp_item(rlp)?;
if !is_list {
return Err(RLPDecodeError::MalformedData);
}
let (first, first_rest) = T1::decode_unfinished(payload)?;
let (second, second_rest) = T2::decode_unfinished(first_rest)?;
let (third, third_rest) = T3::decode_unfinished(second_rest)?;
let (fourth, fourth_rest) = T4::decode_unfinished(third_rest)?;
// check that there is no more data to decode after the fourth element.
if !fourth_rest.is_empty() {
return Err(RLPDecodeError::MalformedData);
}

Ok(((first, second, third, fourth), input_rest))
}
}

/// Decodes an RLP item from a slice of bytes.
/// It returns a 3-element tuple with the following elements:
/// - A boolean indicating if the item is a list or not.
Expand Down
15 changes: 14 additions & 1 deletion crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::{
rlpx::{eth::backend, handshake::encode_ack_message, message::Message, p2p, utils::id2pubkey},
rlpx::{
eth::{backend, blocks::BlockHeaders},
handshake::encode_ack_message,
message::Message,
p2p,
utils::id2pubkey,
},
snap::process_account_range_request,
MAX_DISC_PACKET_SIZE,
};
Expand Down Expand Up @@ -150,6 +156,13 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
process_account_range_request(req, self.storage.clone())?;
self.send(Message::AccountRange(response)).await
}
Message::GetBlockHeaders(msg_data) => {
let response = BlockHeaders {
id: msg_data.id,
block_headers: msg_data.fetch_headers(&self.storage),
};
self.send(Message::BlockHeaders(response)).await;
}
// TODO: Add new message types and handlers as they are implemented
message => return Err(RLPxError::UnexpectedMessage(message)),
};
Expand Down
81 changes: 61 additions & 20 deletions crates/networking/p2p/rlpx/eth/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use ethereum_rust_rlp::{
error::{RLPDecodeError, RLPEncodeError},
structs::{Decoder, Encoder},
};
use ethereum_rust_storage::Store;
use snap::raw::Decoder as SnappyDecoder;

use crate::rlpx::{message::RLPxMessage, utils::snappy_encode};

pub const HASH_FIRST_BYTE_DECODER: u8 = 160;

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum HashOrNumber {
Hash(BlockHash),
Number(BlockNumber),
Expand Down Expand Up @@ -55,13 +56,16 @@ impl RLPDecode for HashOrNumber {
pub(crate) struct GetBlockHeaders {
// id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
id: u64,
startblock: HashOrNumber,
limit: u64,
skip: u64,
reverse: bool,
pub id: u64,
pub startblock: HashOrNumber,
pub limit: u64,
pub skip: u64,
pub reverse: bool,
}

// Limit taken from here: https://github.com/ethereum/go-ethereum/blob/20bf543a64d7c2a590b18a1e1d907cae65707013/eth/protocols/eth/handler.go#L40
pub const BLOCK_HEADER_LIMIT: u64 = 1024;

impl GetBlockHeaders {
pub fn new(id: u64, startblock: HashOrNumber, limit: u64, skip: u64, reverse: bool) -> Self {
Self {
Expand All @@ -72,19 +76,56 @@ impl GetBlockHeaders {
reverse,
}
}
pub fn fetch_headers(&self, storage: &Store) -> Vec<BlockHeader> {
let start_block = match self.startblock {
// Check we have the given block hash and fetch its number
HashOrNumber::Hash(block_hash) => storage.get_block_number(block_hash).ok().flatten(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other PR here, we should return an empty response + log if error.

// Don't check if the block number is available
// because if it it's not, the loop below will
// break early and return an empty vec.
HashOrNumber::Number(block_num) => Some(block_num),
};

let mut headers = vec![];

if let Some(start_block) = start_block {
let mut current_block = start_block as i64;
let block_skip = if self.reverse {
-((self.skip + 1) as i64)
} else {
(self.skip + 1) as i64
};
let limit = if self.limit > BLOCK_HEADER_LIMIT {
BLOCK_HEADER_LIMIT
} else {
self.limit
};
for _ in 0..limit {
let Some(block_header) = storage
.get_block_header(current_block as u64)
.ok()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

.flatten()
else {
break;
};
headers.push(block_header);
current_block += block_skip
}
}
headers
}
}

impl RLPxMessage for GetBlockHeaders {
fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
let mut encoded_data = vec![];
let limit = self.limit;
let skip = self.skip;
let reverse = self.reverse as u8;
Encoder::new(&mut encoded_data)
.encode_field(&self.id)
.encode_field(&self.startblock)
.encode_field(&self.limit)
.encode_field(&self.skip)
.encode_field(&self.reverse)
.encode_field(&(self.startblock.clone(), limit, skip, reverse))
.finish();

let msg_data = snappy_encode(encoded_data)?;
buf.put_slice(&msg_data);
Ok(())
Expand All @@ -97,21 +138,19 @@ impl RLPxMessage for GetBlockHeaders {
.map_err(|e| RLPDecodeError::Custom(e.to_string()))?;
let decoder = Decoder::new(&decompressed_data)?;
let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
let (startblock, decoder): (HashOrNumber, _) = decoder.decode_field("startblock")?;
let (limit, decoder): (u64, _) = decoder.decode_field("limit")?;
let (skip, decoder): (u64, _) = decoder.decode_field("skip")?;
let (reverse, _): (bool, _) = decoder.decode_field("reverse")?;

Ok(Self::new(id, startblock, limit, skip, reverse))
let ((start_block, limit, skip, reverse), _): ((HashOrNumber, u64, u64, bool), _) =
decoder.decode_field("get headers request params")?;
Ok(Self::new(id, start_block, limit, skip, reverse))
}
}

// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockheaders-0x04
#[derive(Debug)]
pub(crate) struct BlockHeaders {
// id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
id: u64,
block_headers: Vec<BlockHeader>,
pub id: u64,
pub block_headers: Vec<BlockHeader>,
}

impl BlockHeaders {
Expand All @@ -123,11 +162,13 @@ impl BlockHeaders {
impl RLPxMessage for BlockHeaders {
fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
let mut encoded_data = vec![];
// Each message is encoded with its own
// message identifier (code).
// Go ethereum reference: https://github.com/ethereum/go-ethereum/blob/20bf543a64d7c2a590b18a1e1d907cae65707013/p2p/transport.go#L94
Encoder::new(&mut encoded_data)
.encode_field(&self.id)
.encode_field(&self.block_headers)
.finish();

let msg_data = snappy_encode(encoded_data)?;
buf.put_slice(&msg_data);
Ok(())
Expand Down
23 changes: 23 additions & 0 deletions crates/networking/p2p/rlpx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bytes::BufMut;
use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError};
use std::fmt::Display;

use super::eth::blocks::{BlockHeaders, GetBlockHeaders};
use super::eth::status::StatusMessage;
use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage};
use super::snap::{AccountRange, GetAccountRange};
Expand All @@ -20,6 +21,9 @@ pub(crate) enum Message {
Ping(PingMessage),
Pong(PongMessage),
Status(StatusMessage),
// https://github.com/ethereum/devp2p/blob/5713591d0366da78a913a811c7502d9ca91d29a8/caps/eth.md#getblockheaders-0x03
GetBlockHeaders(GetBlockHeaders),
BlockHeaders(BlockHeaders),
// snap capability
GetAccountRange(GetAccountRange),
AccountRange(AccountRange),
Expand All @@ -32,7 +36,19 @@ impl Message {
0x01 => Ok(Message::Disconnect(DisconnectMessage::decode(msg_data)?)),
0x02 => Ok(Message::Ping(PingMessage::decode(msg_data)?)),
0x03 => Ok(Message::Pong(PongMessage::decode(msg_data)?)),
// Subprotocols like 'eth' use offsets to identify
// themselves, the eth capability starts
// at 0x10 (16), the status message
// has offset 0, so a message with id 0x10
// identifies an eth status message.
// Another example is the eth getBlockHeaders message,
// which has 3 as its offset, so it is identified as 0x13 (19).
// References:
// - https://ethereum.stackexchange.com/questions/37051/ethereum-network-messaging
// - https://github.com/ethereum/devp2p/blob/master/caps/eth.md#status-0x00
0x10 => Ok(Message::Status(StatusMessage::decode(msg_data)?)),
0x13 => Ok(Message::GetBlockHeaders(GetBlockHeaders::decode(msg_data)?)),
0x14 => Ok(Message::BlockHeaders(BlockHeaders::decode(msg_data)?)),
0x21 => Ok(Message::GetAccountRange(GetAccountRange::decode(msg_data)?)),
0x22 => Ok(Message::AccountRange(AccountRange::decode(msg_data)?)),
_ => Err(RLPDecodeError::MalformedData),
Expand All @@ -46,6 +62,11 @@ impl Message {
Message::Ping(msg) => msg.encode(buf),
Message::Pong(msg) => msg.encode(buf),
Message::Status(msg) => msg.encode(buf),
Message::GetBlockHeaders(msg) => msg.encode(buf),
Message::BlockHeaders(msg) => {
0x14_u8.encode(buf);
msg.encode(buf)
}
Message::GetAccountRange(msg) => {
0x21_u8.encode(buf);
msg.encode(buf)
Expand All @@ -66,6 +87,8 @@ impl Display for Message {
Message::Ping(_) => "p2p:Ping".fmt(f),
Message::Pong(_) => "p2p:Pong".fmt(f),
Message::Status(_) => "eth:Status".fmt(f),
Message::GetBlockHeaders(_) => "eth:getBlockHeaders".fmt(f),
Message::BlockHeaders(_) => "eth:BlockHeaders".fmt(f),
Message::GetAccountRange(_) => "snap:GetAccountRange".fmt(f),
Message::AccountRange(_) => "snap:AccountRange".fmt(f),
}
Expand Down
Loading