Skip to content

Commit

Permalink
IPC-520: Upgrade the IPLD Resolver for libp2p 0.53 (#522)
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh authored Jan 11, 2024
1 parent 3e7984d commit 2440ac2
Show file tree
Hide file tree
Showing 13 changed files with 1,245 additions and 1,835 deletions.
2,335 changes: 786 additions & 1,549 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 16 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,24 @@ clap = { version = "4.1", features = ["derive", "env"] }
config = "0.13"
dirs = "5.0"
dircpy = "0.3"
env_logger = "0.10"
erased-serde = "0.3"
ethers = { version = "2.0", features = ["abigen", "ws"] }
ethers-core = { version = "2.0" }
ethers-contract = "2.0.8"
fnv = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
gcra = "0.4"
hex = "0.4"
im = "15.1.0"
integer-encoding = { version = "3.0.3", default-features = false }
jsonrpc-v2 = { version = "0.11", default-features = false, features = ["bytes-v10"] }
k256 = "0.11" # Same as tendermint-rs
lazy_static = "1.4"
libipld = { version = "0.14", default-features = false, features = ["dag-cbor"] }
libp2p = { version = "0.50", default-features = false, features = [
libp2p = { version = "0.53", default-features = false, features = [
"gossipsub",
"kad",
"identify",
Expand All @@ -75,7 +78,6 @@ libp2p = { version = "0.50", default-features = false, features = [
"yamux",
"tcp",
"dns",
"mplex",
"request-response",
"metrics",
"tokio",
Expand All @@ -84,22 +86,29 @@ libp2p = { version = "0.50", default-features = false, features = [
"secp256k1",
"plaintext",
] }
libp2p-bitswap = "0.25.1"
libp2p-mplex = { version = "0.41" }
# libp2p-bitswap = "0.25.1"
libp2p-bitswap = { git = "https://github.com/consensus-shipyard/libp2p-bitswap.git", branch = "chore-upgrade-libp2p" } # Updated to libp2p v0.53
libsecp256k1 = "0.7"
literally = "0.1.3"
log = "0.4"
lru_time_cache = "0.11"
merkle-tree-rs = "0.1.0"
multiaddr = "0.16"
multiaddr = "0.18"
multihash = { version = "0.16.1", default-features = false }
num-bigint = "0.4"
num-derive = "0.3"
num-traits = "0.2"
paste = "1"
pin-project = "1.1.2"
prometheus = "0.13"
prost = { version = "0.11" }
quickcheck = "1"
quickcheck_macros = "1"
rand = "0.8"
rand_chacha = "0.3"
regex = "1"
reqwest = { version = "0.11.13", features = ["json"] }
sha2 = "0.10"
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
Expand All @@ -118,21 +127,13 @@ tokio = { version = "1", features = [
] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.8", features = ["compat"] }
tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] }
toml = "0.7"
tracing = "0.1"
tracing-subscriber = "0.3"
trace4rs = "0.5.1"
url = { version = "2.4.1", features = ["serde"] }
zeroize = "1.6"
trace4rs = "0.5.1"
literally = "0.1.3"
reqwest = { version = "0.11.13", features = ["json"] }
log = "0.4"
env_logger = "0.10"
prometheus = "0.13"
tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] }
libsecp256k1 = "0.7"
ethers-contract = "2.0.8"
integer-encoding = {version = "3.0.3", default-features = false}

# Workspace deps
ipc-api = { path = "ipc/api" }
Expand All @@ -149,7 +150,7 @@ openssl = { version = "0.10", features = ["vendored"] }
# Using the 3.3 version of the FVM because the newer ones update the IPLD dependencies
# to version which are different than the ones in the builtin-actors project, and since
# they are 0.x cargo cannot upgrade them automatically, which leads to version conflicts.
fvm = { version = "~3.2", default-features = false } # no opencl feature or it fails on CI
fvm = { version = "~3.2", default-features = false } # no opencl feature or it fails on CI
fvm_shared = { version = "~3.2", features = ["crypto"] }
fvm_sdk = { version = "~3.2" }

Expand Down
6 changes: 3 additions & 3 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ fn to_resolver_config(settings: &Settings) -> anyhow::Result<ipc_ipld_resolver::

let r = &settings.resolver;

let local_key = {
let local_key: Keypair = {
let path = r.network.local_key(settings.home_dir());
let sk = read_secret_key(&path)?;
let sk = secp256k1::SecretKey::from_bytes(sk.serialize())?;
Keypair::Secp256k1(secp256k1::Keypair::from(sk))
let sk = secp256k1::SecretKey::try_from_bytes(sk.serialize())?;
secp256k1::Keypair::from(sk).into()
};

let network_name = format!(
Expand Down
2 changes: 2 additions & 0 deletions ipld/resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ lazy_static = { workspace = true }
libipld = { workspace = true }
libp2p = { workspace = true }
libp2p-bitswap = { workspace = true }
libp2p-mplex = { workspace = true }
lru_time_cache = { workspace = true }
log = { workspace = true }
prometheus = { workspace = true }
quickcheck = { workspace = true, optional = true }
Expand Down
168 changes: 112 additions & 56 deletions ipld/resolver/src/behaviour/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ use std::{

use libipld::{store::StoreParams, Cid};
use libp2p::{
core::ConnectedPoint,
core::{ConnectedPoint, Endpoint},
futures::channel::oneshot,
multiaddr::Protocol,
request_response::handler::RequestResponseHandlerEvent,
swarm::{
derive_prelude::{ConnectionId, FromSwarm},
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
derive_prelude::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, BitswapStore};
use log::warn;
use log::debug;
use prometheus::Registry;

use crate::{
Expand Down Expand Up @@ -54,6 +52,7 @@ pub enum Event {
/// This is only raised if we are tracking rate limits. The service has to
/// do the forwarding between the two oneshot channels, and call this module
/// back between doing so.
#[allow(dead_code)]
BitswapForward {
peer_id: PeerId,
/// Receive response from the [`Bitswap`] behaviour.
Expand Down Expand Up @@ -140,12 +139,14 @@ impl<P: StoreParams> Behaviour<P> {
/// The underlying [`libp2p_request_response::RequestResponse`] behaviour
/// will initiate connections to the peers which aren't connected at the moment.
pub fn resolve(&mut self, cid: Cid, peers: Vec<PeerId>) -> QueryId {
debug!("resolving {cid} from {peers:?}");
stats::CONTENT_RESOLVE_RUNNING.inc();
// Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`.
self.inner.sync(cid, peers, [].into_iter())
}

/// Check whether the peer has already exhaused their rate limit.
#[allow(dead_code)]
fn check_rate_limit(&mut self, peer_id: &PeerId, cid: &Cid) -> bool {
if let Some(ref rate_limit) = self.rate_limit {
if let Some(addr) = self.peer_addresses.get(peer_id).cloned() {
Expand Down Expand Up @@ -181,17 +182,9 @@ impl<P: StoreParams> Behaviour<P> {

impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
type ConnectionHandler = <Bitswap<P> as NetworkBehaviour>::ConnectionHandler;
type OutEvent = Event;
type ToSwarm = Event;

fn new_handler(&mut self) -> Self::ConnectionHandler {
self.inner.new_handler()
}

fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.inner.addresses_of_peer(peer_id)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
// Store the remote address.
match &event {
FromSwarm::ConnectionEstablished(c) => {
Expand Down Expand Up @@ -226,61 +219,124 @@ impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
event: THandlerOutEvent<Self>,
) {
match event {
RequestResponseHandlerEvent::Request {
request_id,
request,
sender,
} if self.rate_limit.is_some() => {
if !self.check_rate_limit(&peer_id, &request.cid) {
warn!("rate limiting {peer_id}");
stats::CONTENT_RATE_LIMITED.inc();
return;
}
// We need to hijack the response channel to record the size, otherwise it goes straight to the handler.
let (tx, rx) = libp2p::futures::channel::oneshot::channel();
let event = RequestResponseHandlerEvent::Request {
request_id,
request,
sender: tx,
};

self.inner
.on_connection_handler_event(peer_id, connection_id, event);

let forward = Event::BitswapForward {
peer_id,
response_rx: rx,
response_tx: sender,
};
self.outbox.push_back(forward);
}
_ => self
.inner
.on_connection_handler_event(peer_id, connection_id, event),
}
// TODO: `request_response::handler` is now private, so we cannot pattern match on the handler event.
// By the looks of the only way to access the request event is to let it go right into the RR protocol
// wrapped by the Bitswap behaviour and let it raise an event, however we will not see that event here.
// I'm not sure what we can do without moving rate limiting into the bitswap library itself, because
// what we did here relied on the ability to redirect the channels inside the request, but if the event
// itself is private to the `request_response` protocol there's nothing I can do.
// match event {

// request_response::handler::Event::Request {
// request_id,
// request,
// sender,
// } if self.rate_limit.is_some() => {
// if !self.check_rate_limit(&peer_id, &request.cid) {
// warn!("rate limiting {peer_id}");
// stats::CONTENT_RATE_LIMITED.inc();
// return;
// }
// // We need to hijack the response channel to record the size, otherwise it goes straight to the handler.
// let (tx, rx) = libp2p::futures::channel::oneshot::channel();
// let event = request_response::Event::Request {
// request_id,
// request,
// sender: tx,
// };

// self.inner
// .on_connection_handler_event(peer_id, connection_id, event);

// let forward = Event::BitswapForward {
// peer_id,
// response_rx: rx,
// response_tx: sender,
// };
// self.outbox.push_back(forward);
// }
// _ => self
// .inner
// .on_connection_handler_event(peer_id, connection_id, event),
// }

// debug!("BITSWAP CONNECTION HANDLER EVENT: {event:?}");

self.inner
.on_connection_handler_event(peer_id, connection_id, event)
}

fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
self.inner
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_pending_outbound_connection(
&mut self,
connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
self.inner.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Emit own events first.
if let Some(ev) = self.outbox.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
return Poll::Ready(ToSwarm::GenerateEvent(ev));
}
// Poll Bitswap.
while let Poll::Ready(ev) = self.inner.poll(cx, params) {
while let Poll::Ready(ev) = self.inner.poll(cx) {
// debug!("BITSWAP POLL: {ev:?}");
match ev {
NetworkBehaviourAction::GenerateEvent(ev) => match ev {
ToSwarm::GenerateEvent(ev) => match ev {
BitswapEvent::Progress(_, _) => {}
BitswapEvent::Complete(id, result) => {
stats::CONTENT_RESOLVE_RUNNING.dec();
let out = Event::Complete(id, result);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
return Poll::Ready(ToSwarm::GenerateEvent(out));
}
},
other => {
Expand Down
Loading

0 comments on commit 2440ac2

Please sign in to comment.