From a3e860a60d9922c877655c62371b0888e98e5814 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 26 Mar 2020 15:06:03 +1100 Subject: [PATCH] Create a separate OutEvent enum for the oneshot_protocol This allows us to change the NetworkBehaviour's OutEvent type to include the PeerId. Otherwise we would loose this information. --- cnd/src/network/oneshot_behaviour.rs | 55 ++++++++++++++-------------- cnd/src/network/oneshot_protocol.rs | 23 +++++++++--- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/cnd/src/network/oneshot_behaviour.rs b/cnd/src/network/oneshot_behaviour.rs index 942828f55a..c542ca4947 100644 --- a/cnd/src/network/oneshot_behaviour.rs +++ b/cnd/src/network/oneshot_behaviour.rs @@ -28,28 +28,13 @@ impl Default for Behaviour { } } -/// Event generated by the NetworkBehaviour and that the swarm will report back. -#[derive(Clone, Copy, Debug)] +/// Events emitted from the NetworkBehaviour up to the swarm. +#[derive(Debug)] pub enum OutEvent { - /// Emitted once we receive a message from the other peer. - Received(M), - /// Emitted once we successfully sent a message to the other peer. - Sent, -} - -impl From for OutEvent -where - M: oneshot_protocol::Message, -{ - fn from(msg: M) -> Self { - OutEvent::Received(msg) - } -} - -impl From<()> for OutEvent { - fn from(_: ()) -> Self { - OutEvent::Sent - } + /// We received the message M from the given peer. + Received { peer: PeerId, message: M }, + /// We sent the message M to given peer. + Sent { peer: PeerId, message: M }, } impl NetworkBehaviour for Behaviour @@ -59,7 +44,7 @@ where type ProtocolsHandler = OneShotHandler< oneshot_protocol::InboundConfig, oneshot_protocol::OutboundConfig, - OutEvent, + oneshot_protocol::OutEvent, >; type OutEvent = OutEvent; @@ -79,23 +64,37 @@ where // Do nothing, announce protocol is going to take care of connections. } - fn inject_node_event(&mut self, peer_id: PeerId, event: OutEvent) { + fn inject_node_event(&mut self, peer: PeerId, event: oneshot_protocol::OutEvent) { match event { - OutEvent::Received(message) => { + oneshot_protocol::OutEvent::Received(message) => { trace!( "Received message from {} on protocol {}: {:?}", - peer_id, + peer, M::INFO, message ); // Add the message to be dispatched to the user. self.events - .push_back(NetworkBehaviourAction::GenerateEvent(OutEvent::Received( + .push_back(NetworkBehaviourAction::GenerateEvent(OutEvent::Received { + peer, + message, + })); + } + oneshot_protocol::OutEvent::Sent(message) => { + trace!( + "Sent message {:?} to {} on protocol {}", + message, + peer, + M::INFO + ); + + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(OutEvent::Sent { + peer, message, - ))); + })); } - OutEvent::Sent => trace!("Sent message to {} on protocol {}", peer_id, M::INFO), } } diff --git a/cnd/src/network/oneshot_protocol.rs b/cnd/src/network/oneshot_protocol.rs index 3bc71a88f6..d5ed475dc7 100644 --- a/cnd/src/network/oneshot_protocol.rs +++ b/cnd/src/network/oneshot_protocol.rs @@ -25,6 +25,19 @@ pub struct OutboundConfig { msg: M, } +/// Events that are produced as part of the connection upgrade. +/// +/// The oneshot protocol is push-based, meaning the outbound upgrade will +/// generate the `Sent` event whereas the inbound upgrade will generate the +/// `Received` event. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum OutEvent { + /// Emitted once we receive a message from the other peer. + Received(M), + /// Emitted once we successfully sent a message to the other peer. + Sent(M), +} + impl UpgradeInfo for OutboundConfig where M: Message, @@ -42,7 +55,7 @@ where C: AsyncWrite + Unpin + Send + 'static, M: Serialize + Message + Send + 'static, { - type Output = (); + type Output = OutEvent; type Error = Error; type Future = BoxFuture<'static, Result>; @@ -55,7 +68,7 @@ where let bytes = serde_json::to_vec(&self.msg)?; upgrade::write_one(&mut socket, &bytes).await?; - Ok(()) + Ok(OutEvent::Sent(self.msg)) }) } } @@ -94,7 +107,7 @@ where C: AsyncRead + Unpin + Send + 'static, M: DeserializeOwned + Message, { - type Output = M; + type Output = OutEvent; type Error = Error; type Future = BoxFuture<'static, Result>; @@ -108,7 +121,7 @@ where let mut de = serde_json::Deserializer::from_slice(&message); let info = M::deserialize(&mut de)?; - Ok(info) + Ok(OutEvent::Received(info)) }) } } @@ -176,6 +189,6 @@ mod tests { let config = InboundConfig::::default(); let received_msg = upgrade::apply_inbound(conn, config).await.unwrap(); - assert_eq!(received_msg, sent_msg) + assert_eq!(received_msg, OutEvent::Received(sent_msg)) } }