Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPC-520: Upgrade the IPLD Resolver for libp2p 0.53 #522

Merged
merged 23 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,335 changes: 786 additions & 1,549 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 16 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,24 @@ clap = { version = "4.1", features = ["derive", "env"] }
config = "0.13"
dirs = "5.0"
dircpy = "0.3"
env_logger = "0.10"
erased-serde = "0.3"
ethers = { version = "2.0", features = ["abigen", "ws"] }
ethers-core = { version = "2.0" }
ethers-contract = "2.0.8"
fnv = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
gcra = "0.4"
hex = "0.4"
im = "15.1.0"
integer-encoding = { version = "3.0.3", default-features = false }
jsonrpc-v2 = { version = "0.11", default-features = false, features = ["bytes-v10"] }
k256 = "0.11" # Same as tendermint-rs
lazy_static = "1.4"
libipld = { version = "0.14", default-features = false, features = ["dag-cbor"] }
libp2p = { version = "0.50", default-features = false, features = [
libp2p = { version = "0.53", default-features = false, features = [
"gossipsub",
"kad",
"identify",
Expand All @@ -75,7 +78,6 @@ libp2p = { version = "0.50", default-features = false, features = [
"yamux",
"tcp",
"dns",
"mplex",
"request-response",
"metrics",
"tokio",
Expand All @@ -84,22 +86,29 @@ libp2p = { version = "0.50", default-features = false, features = [
"secp256k1",
"plaintext",
] }
libp2p-bitswap = "0.25.1"
libp2p-mplex = { version = "0.41" }
# libp2p-bitswap = "0.25.1"
libp2p-bitswap = { git = "https://github.com/consensus-shipyard/libp2p-bitswap.git", branch = "chore-upgrade-libp2p" } # Updated to libp2p v0.53
libsecp256k1 = "0.7"
literally = "0.1.3"
log = "0.4"
lru_time_cache = "0.11"
merkle-tree-rs = "0.1.0"
multiaddr = "0.16"
multiaddr = "0.18"
multihash = { version = "0.16.1", default-features = false }
num-bigint = "0.4"
num-derive = "0.3"
num-traits = "0.2"
paste = "1"
pin-project = "1.1.2"
prometheus = "0.13"
prost = { version = "0.11" }
quickcheck = "1"
quickcheck_macros = "1"
rand = "0.8"
rand_chacha = "0.3"
regex = "1"
reqwest = { version = "0.11.13", features = ["json"] }
sha2 = "0.10"
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
Expand All @@ -118,21 +127,13 @@ tokio = { version = "1", features = [
] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.8", features = ["compat"] }
tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] }
toml = "0.7"
tracing = "0.1"
tracing-subscriber = "0.3"
trace4rs = "0.5.1"
url = { version = "2.4.1", features = ["serde"] }
zeroize = "1.6"
trace4rs = "0.5.1"
literally = "0.1.3"
reqwest = { version = "0.11.13", features = ["json"] }
log = "0.4"
env_logger = "0.10"
prometheus = "0.13"
tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] }
libsecp256k1 = "0.7"
ethers-contract = "2.0.8"
integer-encoding = {version = "3.0.3", default-features = false}

# Workspace deps
ipc-api = { path = "ipc/api" }
Expand All @@ -149,7 +150,7 @@ openssl = { version = "0.10", features = ["vendored"] }
# Using the 3.3 version of the FVM because the newer ones update the IPLD dependencies
# to version which are different than the ones in the builtin-actors project, and since
# they are 0.x cargo cannot upgrade them automatically, which leads to version conflicts.
fvm = { version = "~3.2", default-features = false } # no opencl feature or it fails on CI
fvm = { version = "~3.2", default-features = false } # no opencl feature or it fails on CI
fvm_shared = { version = "~3.2", features = ["crypto"] }
fvm_sdk = { version = "~3.2" }

Expand Down
6 changes: 3 additions & 3 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ fn to_resolver_config(settings: &Settings) -> anyhow::Result<ipc_ipld_resolver::

let r = &settings.resolver;

let local_key = {
let local_key: Keypair = {
let path = r.network.local_key(settings.home_dir());
let sk = read_secret_key(&path)?;
let sk = secp256k1::SecretKey::from_bytes(sk.serialize())?;
Keypair::Secp256k1(secp256k1::Keypair::from(sk))
let sk = secp256k1::SecretKey::try_from_bytes(sk.serialize())?;
secp256k1::Keypair::from(sk).into()
};

let network_name = format!(
Expand Down
2 changes: 2 additions & 0 deletions ipld/resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ lazy_static = { workspace = true }
libipld = { workspace = true }
libp2p = { workspace = true }
libp2p-bitswap = { workspace = true }
libp2p-mplex = { workspace = true }
lru_time_cache = { workspace = true }
log = { workspace = true }
prometheus = { workspace = true }
quickcheck = { workspace = true, optional = true }
Expand Down
168 changes: 112 additions & 56 deletions ipld/resolver/src/behaviour/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ use std::{

use libipld::{store::StoreParams, Cid};
use libp2p::{
core::ConnectedPoint,
core::{ConnectedPoint, Endpoint},
futures::channel::oneshot,
multiaddr::Protocol,
request_response::handler::RequestResponseHandlerEvent,
swarm::{
derive_prelude::{ConnectionId, FromSwarm},
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
derive_prelude::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, BitswapStore};
use log::warn;
use log::debug;
use prometheus::Registry;

use crate::{
Expand Down Expand Up @@ -54,6 +52,7 @@ pub enum Event {
/// This is only raised if we are tracking rate limits. The service has to
/// do the forwarding between the two oneshot channels, and call this module
/// back between doing so.
#[allow(dead_code)]
BitswapForward {
peer_id: PeerId,
/// Receive response from the [`Bitswap`] behaviour.
Expand Down Expand Up @@ -140,12 +139,14 @@ impl<P: StoreParams> Behaviour<P> {
/// The underlying [`libp2p_request_response::RequestResponse`] behaviour
/// will initiate connections to the peers which aren't connected at the moment.
pub fn resolve(&mut self, cid: Cid, peers: Vec<PeerId>) -> QueryId {
debug!("resolving {cid} from {peers:?}");
stats::CONTENT_RESOLVE_RUNNING.inc();
// Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`.
self.inner.sync(cid, peers, [].into_iter())
}

/// Check whether the peer has already exhaused their rate limit.
#[allow(dead_code)]
fn check_rate_limit(&mut self, peer_id: &PeerId, cid: &Cid) -> bool {
if let Some(ref rate_limit) = self.rate_limit {
if let Some(addr) = self.peer_addresses.get(peer_id).cloned() {
Expand Down Expand Up @@ -181,17 +182,9 @@ impl<P: StoreParams> Behaviour<P> {

impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
type ConnectionHandler = <Bitswap<P> as NetworkBehaviour>::ConnectionHandler;
type OutEvent = Event;
type ToSwarm = Event;

fn new_handler(&mut self) -> Self::ConnectionHandler {
self.inner.new_handler()
}

fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.inner.addresses_of_peer(peer_id)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
// Store the remote address.
match &event {
FromSwarm::ConnectionEstablished(c) => {
Expand Down Expand Up @@ -226,61 +219,124 @@ impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
event: THandlerOutEvent<Self>,
) {
match event {
RequestResponseHandlerEvent::Request {
request_id,
request,
sender,
} if self.rate_limit.is_some() => {
if !self.check_rate_limit(&peer_id, &request.cid) {
warn!("rate limiting {peer_id}");
stats::CONTENT_RATE_LIMITED.inc();
return;
}
// We need to hijack the response channel to record the size, otherwise it goes straight to the handler.
let (tx, rx) = libp2p::futures::channel::oneshot::channel();
let event = RequestResponseHandlerEvent::Request {
request_id,
request,
sender: tx,
};

self.inner
.on_connection_handler_event(peer_id, connection_id, event);

let forward = Event::BitswapForward {
peer_id,
response_rx: rx,
response_tx: sender,
};
self.outbox.push_back(forward);
}
_ => self
.inner
.on_connection_handler_event(peer_id, connection_id, event),
}
// TODO: `request_response::handler` is now private, so we cannot pattern match on the handler event.
// By the looks of the only way to access the request event is to let it go right into the RR protocol
// wrapped by the Bitswap behaviour and let it raise an event, however we will not see that event here.
// I'm not sure what we can do without moving rate limiting into the bitswap library itself, because
// what we did here relied on the ability to redirect the channels inside the request, but if the event
// itself is private to the `request_response` protocol there's nothing I can do.
// match event {

// request_response::handler::Event::Request {
// request_id,
// request,
// sender,
// } if self.rate_limit.is_some() => {
// if !self.check_rate_limit(&peer_id, &request.cid) {
// warn!("rate limiting {peer_id}");
// stats::CONTENT_RATE_LIMITED.inc();
// return;
// }
// // We need to hijack the response channel to record the size, otherwise it goes straight to the handler.
// let (tx, rx) = libp2p::futures::channel::oneshot::channel();
// let event = request_response::Event::Request {
// request_id,
// request,
// sender: tx,
// };

// self.inner
// .on_connection_handler_event(peer_id, connection_id, event);

// let forward = Event::BitswapForward {
// peer_id,
// response_rx: rx,
// response_tx: sender,
// };
// self.outbox.push_back(forward);
// }
// _ => self
// .inner
// .on_connection_handler_event(peer_id, connection_id, event),
// }

// debug!("BITSWAP CONNECTION HANDLER EVENT: {event:?}");

self.inner
.on_connection_handler_event(peer_id, connection_id, event)
}

fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
self.inner
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_pending_outbound_connection(
&mut self,
connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
self.inner.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Emit own events first.
if let Some(ev) = self.outbox.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
return Poll::Ready(ToSwarm::GenerateEvent(ev));
}
// Poll Bitswap.
while let Poll::Ready(ev) = self.inner.poll(cx, params) {
while let Poll::Ready(ev) = self.inner.poll(cx) {
// debug!("BITSWAP POLL: {ev:?}");
match ev {
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
ToSwarm::GenerateEvent(ev) => match ev {
BitswapEvent::Progress(_, _) => {}
BitswapEvent::Complete(id, result) => {
stats::CONTENT_RESOLVE_RUNNING.dec();
let out = Event::Complete(id, result);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
return Poll::Ready(ToSwarm::GenerateEvent(out));
}
},
other => {
Expand Down
Loading
Loading