Skip to content
This repository has been archived by the owner on Mar 23, 2021. It is now read-only.

Commit

Permalink
Merge #2323
Browse files Browse the repository at this point in the history
2323: Create a separate OutEvent enum for the oneshot_protocol r=mergify[bot] a=thomaseizinger

This allows us to change the NetworkBehaviour's OutEvent type
to include the PeerId. Otherwise we would loose this information.

Co-authored-by: Thomas Eizinger <[email protected]>
  • Loading branch information
bors[bot] and thomaseizinger authored Mar 27, 2020
2 parents 0a69cf2 + a3e860a commit 2af497e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
55 changes: 27 additions & 28 deletions cnd/src/network/oneshot_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,13 @@ impl<M> Default for Behaviour<M> {
}
}

/// Event generated by the NetworkBehaviour and that the swarm will report back.
#[derive(Clone, Copy, Debug)]
/// Events emitted from the NetworkBehaviour up to the swarm.
#[derive(Debug)]
pub enum OutEvent<M> {
/// Emitted once we receive a message from the other peer.
Received(M),
/// Emitted once we successfully sent a message to the other peer.
Sent,
}

impl<M> From<M> for OutEvent<M>
where
M: oneshot_protocol::Message,
{
fn from(msg: M) -> Self {
OutEvent::Received(msg)
}
}

impl<M> From<()> for OutEvent<M> {
fn from(_: ()) -> Self {
OutEvent::Sent
}
/// We received the message M from the given peer.
Received { peer: PeerId, message: M },
/// We sent the message M to given peer.
Sent { peer: PeerId, message: M },
}

impl<M> NetworkBehaviour for Behaviour<M>
Expand All @@ -59,7 +44,7 @@ where
type ProtocolsHandler = OneShotHandler<
oneshot_protocol::InboundConfig<M>,
oneshot_protocol::OutboundConfig<M>,
OutEvent<M>,
oneshot_protocol::OutEvent<M>,
>;
type OutEvent = OutEvent<M>;

Expand All @@ -79,23 +64,37 @@ where
// Do nothing, announce protocol is going to take care of connections.
}

fn inject_node_event(&mut self, peer_id: PeerId, event: OutEvent<M>) {
fn inject_node_event(&mut self, peer: PeerId, event: oneshot_protocol::OutEvent<M>) {
match event {
OutEvent::Received(message) => {
oneshot_protocol::OutEvent::Received(message) => {
trace!(
"Received message from {} on protocol {}: {:?}",
peer_id,
peer,
M::INFO,
message
);

// Add the message to be dispatched to the user.
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(OutEvent::Received(
.push_back(NetworkBehaviourAction::GenerateEvent(OutEvent::Received {
peer,
message,
}));
}
oneshot_protocol::OutEvent::Sent(message) => {
trace!(
"Sent message {:?} to {} on protocol {}",
message,
peer,
M::INFO
);

self.events
.push_back(NetworkBehaviourAction::GenerateEvent(OutEvent::Sent {
peer,
message,
)));
}));
}
OutEvent::Sent => trace!("Sent message to {} on protocol {}", peer_id, M::INFO),
}
}

Expand Down
23 changes: 18 additions & 5 deletions cnd/src/network/oneshot_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ pub struct OutboundConfig<M> {
msg: M,
}

/// Events that are produced as part of the connection upgrade.
///
/// The oneshot protocol is push-based, meaning the outbound upgrade will
/// generate the `Sent` event whereas the inbound upgrade will generate the
/// `Received` event.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OutEvent<M> {
/// Emitted once we receive a message from the other peer.
Received(M),
/// Emitted once we successfully sent a message to the other peer.
Sent(M),
}

impl<M> UpgradeInfo for OutboundConfig<M>
where
M: Message,
Expand All @@ -42,7 +55,7 @@ where
C: AsyncWrite + Unpin + Send + 'static,
M: Serialize + Message + Send + 'static,
{
type Output = ();
type Output = OutEvent<M>;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

Expand All @@ -55,7 +68,7 @@ where
let bytes = serde_json::to_vec(&self.msg)?;
upgrade::write_one(&mut socket, &bytes).await?;

Ok(())
Ok(OutEvent::Sent(self.msg))
})
}
}
Expand Down Expand Up @@ -94,7 +107,7 @@ where
C: AsyncRead + Unpin + Send + 'static,
M: DeserializeOwned + Message,
{
type Output = M;
type Output = OutEvent<M>;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

Expand All @@ -108,7 +121,7 @@ where
let mut de = serde_json::Deserializer::from_slice(&message);
let info = M::deserialize(&mut de)?;

Ok(info)
Ok(OutEvent::Received(info))
})
}
}
Expand Down Expand Up @@ -176,6 +189,6 @@ mod tests {
let config = InboundConfig::<DummyMessage>::default();
let received_msg = upgrade::apply_inbound(conn, config).await.unwrap();

assert_eq!(received_msg, sent_msg)
assert_eq!(received_msg, OutEvent::Received(sent_msg))
}
}

0 comments on commit 2af497e

Please sign in to comment.