diff --git a/Cargo.toml b/Cargo.toml index 90d5c86..7f9935d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,22 +10,19 @@ documentation = "https://docs.rs/kadmium" readme = "README.md" categories = ["algorithms", "network-programming"] keywords = ["p2p", "peer-to-peer", "networking"] +rust-version = "1.75" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = [] full = ["codec", "sync"] codec = ["tokio-util/codec", "bincode", "bytes/serde"] -sync = ["async-trait", "parking_lot", "tokio"] +sync = ["parking_lot", "tokio"] [dependencies] rand = "0.8.5" time = "0.3.11" -[dependencies.async-trait] -version = "0.1.56" -optional = true - [dependencies.parking_lot] version = "0.12.1" optional = true @@ -50,7 +47,7 @@ optional = true [dev-dependencies] deadline = "0.2.0" paste = "1.0" -pea2pea = "0.48.0" +pea2pea = "0.49.0" [dev-dependencies.tracing] version = "0.1.35" diff --git a/src/tcp/traits.rs b/src/tcp/traits.rs index 2cbd160..469ee9a 100644 --- a/src/tcp/traits.rs +++ b/src/tcp/traits.rs @@ -1,5 +1,5 @@ #[cfg(feature = "sync")] -use std::net::SocketAddr; +use std::{future::Future, net::SocketAddr}; use bytes::Bytes; #[cfg(feature = "sync")] @@ -15,7 +15,6 @@ use crate::{ /// A trait used to enable core kadcast functionality on the implementor. #[cfg(feature = "sync")] #[cfg_attr(doc_cfg, doc(cfg(feature = "sync")))] -#[async_trait::async_trait] pub trait Kadcast where Self: Clone + Send + Sync + 'static, @@ -35,155 +34,167 @@ where fn router(&self) -> &SyncTcpRouter; /// Returns `true` if the address is connected, `false` if it isn't. - async fn is_connected(&self, addr: SocketAddr) -> bool; + fn is_connected(&self, addr: SocketAddr) -> impl Future + Send; /// Connects to the address and returns if it was succesful or not. /// /// Note: Kadmium assumes this method calls [`SyncTcpRouter::insert`] and /// [`SyncTcpRouter::set_connected`] appropriately. - async fn connect(&self, addr: SocketAddr) -> bool; + fn connect(&self, addr: SocketAddr) -> impl Future + Send; /// Disconnects the address and returns `true` if it was connected, returns `false` if it wasn't. /// /// Note: Kadmium assumes this method calls [`SyncTcpRouter::set_disconnected`] appropriately. - async fn disconnect(&self, addr: SocketAddr) -> bool; + fn disconnect(&self, addr: SocketAddr) -> impl Future + Send; /// Sends a message to the destination address. - async fn unicast(&self, dst: SocketAddr, message: Message); + fn unicast(&self, dst: SocketAddr, message: Message) -> impl Future + Send; /// Starts the periodic ping task. - async fn ping(&self) { - let self_clone = self.clone(); - - tokio::spawn(async move { - loop { - for addr in self_clone.router().connected_addrs() { - self_clone - .unicast(addr, Message::Ping(self_clone.router().generate_ping())) + + fn ping(&self) -> impl Future + Send { + async { + let self_clone = self.clone(); + + tokio::spawn(async move { + loop { + for addr in self_clone.router().connected_addrs() { + self_clone + .unicast(addr, Message::Ping(self_clone.router().generate_ping())) + .await + } + + tokio::time::sleep(std::time::Duration::from_secs(Self::PING_INTERVAL_SECS)) .await } + }); - tokio::time::sleep(std::time::Duration::from_secs(Self::PING_INTERVAL_SECS)).await - } - }); - - // TODO: consider returning the task handle, or at least track it internally. + // TODO: consider returning the task handle, or at least track it internally. + } } /// Starts the periodic peer discovery task. - async fn peer(&self) { - // TODO: a few current issues to consider: - // - // 1. identifiers are more likely to be in higher index buckets, not necessarily an issue - // so long as bucket size is above the minimum number of peers. - // 2. the above also guaranties a search returning K nodes can indeed return K nodes, so - // long as K is below the minimum number of peers. If K is larger a node will return at - // worst min(min peers, K) and at best min(peers, K). - // - // Therefore: bucket size >= min peers >= K is likely ideal. - - let self_clone = self.clone(); - - tokio::spawn(async move { - loop { - for (_id, addr, is_connected) in - self_clone.router().select_search_peers(Self::ALPHA.into()) - { - let is_connected = match is_connected { - true => self_clone.is_connected(addr).await, - false => self_clone.connect(addr).await, - }; - - if is_connected { - self_clone - .unicast( - addr, - Message::FindKNodes(self_clone.router().generate_find_k_nodes()), - ) - .await; + fn peer(&self) -> impl Future + Send { + async { + // TODO: a few current issues to consider: + // + // 1. identifiers are more likely to be in higher index buckets, not necessarily an issue + // so long as bucket size is above the minimum number of peers. + // 2. the above also guaranties a search returning K nodes can indeed return K nodes, so + // long as K is below the minimum number of peers. If K is larger a node will return at + // worst min(min peers, K) and at best min(peers, K). + // + // Therefore: bucket size >= min peers >= K is likely ideal. + + let self_clone = self.clone(); + + tokio::spawn(async move { + loop { + for (_id, addr, is_connected) in + self_clone.router().select_search_peers(Self::ALPHA.into()) + { + let is_connected = match is_connected { + true => self_clone.is_connected(addr).await, + false => self_clone.connect(addr).await, + }; + + if is_connected { + self_clone + .unicast( + addr, + Message::FindKNodes( + self_clone.router().generate_find_k_nodes(), + ), + ) + .await; + } } - } - let peer_deficit = - Self::PEER_TARGET as i128 - self_clone.router().connected_addrs().len() as i128; + let peer_deficit = Self::PEER_TARGET as i128 + - self_clone.router().connected_addrs().len() as i128; - if peer_deficit < 0 { - let addrs: Vec = { - let mut rng = rand::thread_rng(); + if peer_deficit < 0 { + let addrs: Vec = { + let mut rng = rand::thread_rng(); - self_clone - .router() - .connected_addrs() - .choose_multiple(&mut rng, peer_deficit.unsigned_abs() as usize) - .copied() - .collect() - }; + self_clone + .router() + .connected_addrs() + .choose_multiple(&mut rng, peer_deficit.unsigned_abs() as usize) + .copied() + .collect() + }; - for addr in addrs { - self_clone.disconnect(addr).await; + for addr in addrs { + self_clone.disconnect(addr).await; + } } - } - if peer_deficit > 0 { - let addrs: Vec = { - let mut rng = rand::thread_rng(); - self_clone - .router() - .disconnected_addrs() - .choose_multiple(&mut rng, peer_deficit as usize) - .copied() - .collect() + if peer_deficit > 0 { + let addrs: Vec = { + let mut rng = rand::thread_rng(); + self_clone + .router() + .disconnected_addrs() + .choose_multiple(&mut rng, peer_deficit as usize) + .copied() + .collect() + }; + + for addr in addrs { + self_clone.connect(addr).await; + } + } + + // Check the peer counts again. + let sleep_duration = { + std::time::Duration::from_secs( + if self_clone.router().connected_addrs().len() + < Self::PEER_TARGET.into() + { + Self::BOOTSTRAP_INTERVAL_SECS + } else { + Self::DISCOVERY_INTERVAL_SECS + }, + ) }; - for addr in addrs { - self_clone.connect(addr).await; - } + tokio::time::sleep(sleep_duration).await; } - - // Check the peer counts again. - let sleep_duration = { - std::time::Duration::from_secs( - if self_clone.router().connected_addrs().len() < Self::PEER_TARGET.into() { - Self::BOOTSTRAP_INTERVAL_SECS - } else { - Self::DISCOVERY_INTERVAL_SECS - }, - ) - }; - - tokio::time::sleep(sleep_duration).await; - } - }); + }); + } } // TODO: work out how and if data should be chunked (1 block per-message or multiple smaller // messages). Up to the caller for now. /// Broadcast data to the network, following the kadcast protocol. - async fn kadcast(&self, data: Bytes) -> Nonce { - let peers = self - .router() - .select_broadcast_peers(Id::BITS as u32) - .unwrap(); - - // TODO: record nonce somewhere. - let nonce = { - let mut rng = thread_rng(); - rng.gen() - }; - - for (height, addr) in peers { - let message = Message::Chunk(Chunk { - // Can be used to trace the broadcast. If set differently for each peer here, it will - // be the same within a propagation sub-tree. - nonce, - height, - // Cheap as the backing storage is shared amongst instances. - data: data.clone(), - }); + fn kadcast(&self, data: Bytes) -> impl Future + Send { + async move { + let peers = self + .router() + .select_broadcast_peers(Id::BITS as u32) + .unwrap(); + + // TODO: record nonce somewhere. + let nonce = { + let mut rng = thread_rng(); + rng.gen() + }; + + for (height, addr) in peers { + let message = Message::Chunk(Chunk { + // Can be used to trace the broadcast. If set differently for each peer here, it will + // be the same within a propagation sub-tree. + nonce, + height, + // Cheap as the backing storage is shared amongst instances. + data: data.clone(), + }); + + self.unicast(addr, message).await; + } - self.unicast(addr, message).await; + nonce } - - nonce } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 2a6f294..d4b4f7a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -77,7 +77,6 @@ impl ProcessData for Data { } } -#[async_trait::async_trait] impl Kadcast for KadNode { // Shorten the defaults for testing purposes. const PEER_TARGET: u16 = 20; @@ -134,7 +133,7 @@ impl KadNode { pub async fn new(id: Id) -> Self { Self { node: Node::new(Config { - listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + listener_addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)), max_connections: 1024, ..Default::default() }), @@ -163,7 +162,6 @@ impl Pea2Pea for KadNode { } } -#[async_trait::async_trait] impl Reading for KadNode { type Message = Message; type Codec = MessageCodec; @@ -214,7 +212,6 @@ impl Writing for KadNode { } } -#[async_trait::async_trait] impl Handshake for KadNode { async fn perform_handshake(&self, mut conn: Connection) -> io::Result { let local_id = self.router.local_id(); @@ -281,7 +278,6 @@ impl Handshake for KadNode { } } -#[async_trait::async_trait] impl OnDisconnect for KadNode { async fn on_disconnect(&self, addr: SocketAddr) { self.router.set_disconnected(addr);