Skip to content

Commit

Permalink
feat(serio): unbounded memory channels (#37)
Browse files Browse the repository at this point in the history
* Added unbounded memory channels

* Add Clone to channel senders

* Changelog

* Apply suggestions from code review

Co-authored-by: th4s <[email protected]>

* UnboundedMemoryDuplex + comments

---------

Co-authored-by: th4s <[email protected]>
  • Loading branch information
n-lebel and th4s authored Nov 25, 2024
1 parent 09f1abe commit 280bb38
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

`UnboundedMemorySink` and `UnboundedMemoryStream` mirroring the `MemorySink` and `MemoryStream` wrappers of `futures::mpsc::{Sender, Receiver}` for `futures::mpsc::{UnboundedSender, UnboundedReceiver}`.
132 changes: 131 additions & 1 deletion serio/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{Deserialize, Serialize, Sink, Stream};
type Item = Box<dyn Any + Send + Sync + 'static>;

/// A memory sink that can be used to send any serializable type to the receiver.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MemorySink(mpsc::Sender<Item>);

impl Sink for MemorySink {
Expand Down Expand Up @@ -77,6 +77,68 @@ pub fn channel(buffer: usize) -> (MemorySink, MemoryStream) {
(MemorySink(sender), MemoryStream(receiver))
}

/// An unbounded memory sink that can be used to send any serializable type to the receiver.
#[derive(Debug, Clone)]
pub struct UnboundedMemorySink(mpsc::UnboundedSender<Item>);

impl Sink for UnboundedMemorySink {
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_ready(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.0)
.start_send(Box::new(item))
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_flush(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_close(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
}

/// An unbounded memory stream that can be used to receive any deserializable type from the sender.
#[derive(Debug)]
pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver<Item>);

impl Stream for UnboundedMemoryStream {
type Error = Error;

fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.0).poll_next(cx).map(|item| {
item.map(|item| {
item.downcast().map(|item| *item).map_err(|_| {
Error::new(ErrorKind::InvalidData, "sender sent an unexpected type")
})
})
})
}
}

/// Creates a new unbounded memory channel.
pub fn unbounded() -> (UnboundedMemorySink, UnboundedMemoryStream) {
let (sender, receiver) = mpsc::unbounded();
(UnboundedMemorySink(sender), UnboundedMemoryStream(receiver))
}

/// A memory duplex that can be used to send and receive any serializable types.
#[derive(Debug)]
pub struct MemoryDuplex {
Expand Down Expand Up @@ -145,6 +207,74 @@ pub fn duplex(buffer: usize) -> (MemoryDuplex, MemoryDuplex) {
)
}

/// An unbounded memory duplex that can be used to send and receive any serializable types.
#[derive(Debug)]
pub struct UnboundedMemoryDuplex {
sink: UnboundedMemorySink,
stream: UnboundedMemoryStream,
}

impl UnboundedMemoryDuplex {
/// Returns the inner sink and stream.
pub fn into_inner(self) -> (UnboundedMemorySink, UnboundedMemoryStream) {
(self.sink, self.stream)
}

/// Returns a reference to the inner sink.
pub fn sink_mut(&mut self) -> &mut UnboundedMemorySink {
&mut self.sink
}

/// Returns a reference to the inner stream.
pub fn stream_mut(&mut self) -> &mut UnboundedMemoryStream {
&mut self.stream
}
}

impl Sink for UnboundedMemoryDuplex {
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_ready(cx)
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_close(cx)
}
}

impl Stream for UnboundedMemoryDuplex {
type Error = Error;

fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}

/// Creates a new unbounded memory duplex.
pub fn unbounded_duplex() -> (UnboundedMemoryDuplex, UnboundedMemoryDuplex) {
let (a, b) = unbounded();
let (c, d) = unbounded();
(
UnboundedMemoryDuplex { sink: a, stream: d },
UnboundedMemoryDuplex { sink: c, stream: b },
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 280bb38

Please sign in to comment.