diff --git a/CHANGELOG.md b/CHANGELOG.md index ef88a6b..bd5c87f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,3 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +### Added + +`UnboundedMemorySink` and `UnboundedMemoryStream` mirroring the `MemorySink` and `MemoryStream` wrappers of `futures::mpsc::{Sender, Receiver}` for `futures::mpsc::{UnboundedSender, UnboundedReceiver}`. \ No newline at end of file diff --git a/serio/src/channel.rs b/serio/src/channel.rs index 12fee88..57525e4 100644 --- a/serio/src/channel.rs +++ b/serio/src/channel.rs @@ -16,7 +16,7 @@ use crate::{Deserialize, Serialize, Sink, Stream}; type Item = Box; /// A memory sink that can be used to send any serializable type to the receiver. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MemorySink(mpsc::Sender); impl Sink for MemorySink { @@ -77,6 +77,68 @@ pub fn channel(buffer: usize) -> (MemorySink, MemoryStream) { (MemorySink(sender), MemoryStream(receiver)) } +/// An unbounded memory sink that can be used to send any serializable type to the receiver. +#[derive(Debug, Clone)] +pub struct UnboundedMemorySink(mpsc::UnboundedSender); + +impl Sink for UnboundedMemorySink { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0) + .poll_ready(cx) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } + + fn start_send( + mut self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> { + Pin::new(&mut self.0) + .start_send(Box::new(item)) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0) + .poll_flush(cx) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0) + .poll_close(cx) + .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) + } +} + +/// An unbounded memory stream that can be used to receive any deserializable type from the sender. +#[derive(Debug)] +pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver); + +impl Stream for UnboundedMemoryStream { + type Error = Error; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(&mut self.0).poll_next(cx).map(|item| { + item.map(|item| { + item.downcast().map(|item| *item).map_err(|_| { + Error::new(ErrorKind::InvalidData, "sender sent an unexpected type") + }) + }) + }) + } +} + +/// Creates a new unbounded memory channel. +pub fn unbounded() -> (UnboundedMemorySink, UnboundedMemoryStream) { + let (sender, receiver) = mpsc::unbounded(); + (UnboundedMemorySink(sender), UnboundedMemoryStream(receiver)) +} + /// A memory duplex that can be used to send and receive any serializable types. #[derive(Debug)] pub struct MemoryDuplex { @@ -145,6 +207,74 @@ pub fn duplex(buffer: usize) -> (MemoryDuplex, MemoryDuplex) { ) } +/// An unbounded memory duplex that can be used to send and receive any serializable types. +#[derive(Debug)] +pub struct UnboundedMemoryDuplex { + sink: UnboundedMemorySink, + stream: UnboundedMemoryStream, +} + +impl UnboundedMemoryDuplex { + /// Returns the inner sink and stream. + pub fn into_inner(self) -> (UnboundedMemorySink, UnboundedMemoryStream) { + (self.sink, self.stream) + } + + /// Returns a reference to the inner sink. + pub fn sink_mut(&mut self) -> &mut UnboundedMemorySink { + &mut self.sink + } + + /// Returns a reference to the inner stream. + pub fn stream_mut(&mut self) -> &mut UnboundedMemoryStream { + &mut self.stream + } +} + +impl Sink for UnboundedMemoryDuplex { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink).poll_ready(cx) + } + + fn start_send( + mut self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> { + Pin::new(&mut self.sink).start_send(item) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink).poll_close(cx) + } +} + +impl Stream for UnboundedMemoryDuplex { + type Error = Error; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +/// Creates a new unbounded memory duplex. +pub fn unbounded_duplex() -> (UnboundedMemoryDuplex, UnboundedMemoryDuplex) { + let (a, b) = unbounded(); + let (c, d) = unbounded(); + ( + UnboundedMemoryDuplex { sink: a, stream: d }, + UnboundedMemoryDuplex { sink: c, stream: b }, + ) +} + #[cfg(test)] mod tests { use super::*;