From 1fed61ee191bcd0c976aba2a8e48532c70e62657 Mon Sep 17 00:00:00 2001 From: n-lebel Date: Wed, 14 Aug 2024 11:56:39 +0200 Subject: [PATCH 1/5] Added unbounded memory channels --- serio/src/channel.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/serio/src/channel.rs b/serio/src/channel.rs index 12fee88..ba45ce6 100644 --- a/serio/src/channel.rs +++ b/serio/src/channel.rs @@ -77,6 +77,68 @@ pub fn channel(buffer: usize) -> (MemorySink, MemoryStream) { (MemorySink(sender), MemoryStream(receiver)) } +/// An unbounded memory sink that can be used to send any serializable type to the receiver. +#[derive(Debug)] +pub struct UnboundedMemorySink(mpsc::UnboundedSender); + +impl Sink for UnboundedMemorySink { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0) + .poll_ready(cx) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } + + fn start_send( + mut self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> { + Pin::new(&mut self.0) + .start_send(Box::new(item)) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0) + .poll_flush(cx) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0) + .poll_close(cx) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } +} + +/// An Unbounded memory stream that can be used to receive any deserializable type from the sender. +#[derive(Debug)] +pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver); + +impl Stream for UnboundedMemoryStream { + type Error = Error; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(&mut self.0).poll_next(cx).map(|item| { + item.map(|item| { + item.downcast().map(|item| *item).map_err(|_| { + Error::new(ErrorKind::InvalidData, "sender sent an unexpected type") + }) + }) + }) + } +} + +/// Creates a new memory channel with the specified buffer size. +pub fn unbounded() -> (UnboundedMemorySink, UnboundedMemoryStream) { + let (sender, receiver) = mpsc::unbounded(); + (UnboundedMemorySink(sender), UnboundedMemoryStream(receiver)) +} + /// A memory duplex that can be used to send and receive any serializable types. #[derive(Debug)] pub struct MemoryDuplex { From e54faf4d0f1348fb76f44c65596fbc8f3a79eaac Mon Sep 17 00:00:00 2001 From: n-lebel Date: Wed, 14 Aug 2024 12:14:52 +0200 Subject: [PATCH 2/5] Add Clone to channel senders --- serio/src/channel.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/serio/src/channel.rs b/serio/src/channel.rs index ba45ce6..cae7d75 100644 --- a/serio/src/channel.rs +++ b/serio/src/channel.rs @@ -16,7 +16,7 @@ use crate::{Deserialize, Serialize, Sink, Stream}; type Item = Box; /// A memory sink that can be used to send any serializable type to the receiver. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MemorySink(mpsc::Sender); impl Sink for MemorySink { @@ -78,7 +78,7 @@ pub fn channel(buffer: usize) -> (MemorySink, MemoryStream) { } /// An unbounded memory sink that can be used to send any serializable type to the receiver. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct UnboundedMemorySink(mpsc::UnboundedSender); impl Sink for UnboundedMemorySink { From a8a4b141b1e3d0a057abb5a51dca4b7822821598 Mon Sep 17 00:00:00 2001 From: n-lebel Date: Thu, 15 Aug 2024 08:14:16 +0200 Subject: [PATCH 3/5] Changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef88a6b..a83f595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,3 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +### Added + +`UnboundedMemorySink` and `UnboundedMemoryStream` mirroring the `MemorySink` and `MemoryStream` wrappers of `futures::mpsc::{Sender, Receiver} for `futures::mpsc::{UnboundedSender, UnboundedReceiver}. \ No newline at end of file From 8d8a54649d5e4b2cf1cf2090bd9babde13289385 Mon Sep 17 00:00:00 2001 From: Nicolas Le Bel <126451245+n-lebel@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:41:10 +0100 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: th4s --- CHANGELOG.md | 2 +- serio/src/channel.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a83f595..bd5c87f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,4 +8,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -`UnboundedMemorySink` and `UnboundedMemoryStream` mirroring the `MemorySink` and `MemoryStream` wrappers of `futures::mpsc::{Sender, Receiver} for `futures::mpsc::{UnboundedSender, UnboundedReceiver}. \ No newline at end of file +`UnboundedMemorySink` and `UnboundedMemoryStream` mirroring the `MemorySink` and `MemoryStream` wrappers of `futures::mpsc::{Sender, Receiver}` for `futures::mpsc::{UnboundedSender, UnboundedReceiver}`. \ No newline at end of file diff --git a/serio/src/channel.rs b/serio/src/channel.rs index cae7d75..04603ca 100644 --- a/serio/src/channel.rs +++ b/serio/src/channel.rs @@ -112,7 +112,7 @@ impl Sink for UnboundedMemorySink { } } -/// An Unbounded memory stream that can be used to receive any deserializable type from the sender. +/// An unbounded memory stream that can be used to receive any deserializable type from the sender. #[derive(Debug)] pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver); From 75ef318bd3e46de979aff7b8bb8bf956a83d7388 Mon Sep 17 00:00:00 2001 From: n-lebel Date: Mon, 25 Nov 2024 16:50:45 +0100 Subject: [PATCH 5/5] UnboundedMemoryDuplex + comments --- serio/src/channel.rs | 70 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/serio/src/channel.rs b/serio/src/channel.rs index 04603ca..57525e4 100644 --- a/serio/src/channel.rs +++ b/serio/src/channel.rs @@ -133,7 +133,7 @@ impl Stream for UnboundedMemoryStream { } } -/// Creates a new memory channel with the specified buffer size. +/// Creates a new unbounded memory channel. pub fn unbounded() -> (UnboundedMemorySink, UnboundedMemoryStream) { let (sender, receiver) = mpsc::unbounded(); (UnboundedMemorySink(sender), UnboundedMemoryStream(receiver)) @@ -207,6 +207,74 @@ pub fn duplex(buffer: usize) -> (MemoryDuplex, MemoryDuplex) { ) } +/// An unbounded memory duplex that can be used to send and receive any serializable types. +#[derive(Debug)] +pub struct UnboundedMemoryDuplex { + sink: UnboundedMemorySink, + stream: UnboundedMemoryStream, +} + +impl UnboundedMemoryDuplex { + /// Returns the inner sink and stream. + pub fn into_inner(self) -> (UnboundedMemorySink, UnboundedMemoryStream) { + (self.sink, self.stream) + } + + /// Returns a reference to the inner sink. + pub fn sink_mut(&mut self) -> &mut UnboundedMemorySink { + &mut self.sink + } + + /// Returns a reference to the inner stream. + pub fn stream_mut(&mut self) -> &mut UnboundedMemoryStream { + &mut self.stream + } +} + +impl Sink for UnboundedMemoryDuplex { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink).poll_ready(cx) + } + + fn start_send( + mut self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> { + Pin::new(&mut self.sink).start_send(item) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink).poll_close(cx) + } +} + +impl Stream for UnboundedMemoryDuplex { + type Error = Error; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +/// Creates a new unbounded memory duplex. +pub fn unbounded_duplex() -> (UnboundedMemoryDuplex, UnboundedMemoryDuplex) { + let (a, b) = unbounded(); + let (c, d) = unbounded(); + ( + UnboundedMemoryDuplex { sink: a, stream: d }, + UnboundedMemoryDuplex { sink: c, stream: b }, + ) +} + #[cfg(test)] mod tests { use super::*;