diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f904c1..542b359 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Pharos Changelog +## 0.4.0 - 2019-09-28 + +- **BREAKING CHANGE**: The notify function had a sub optimal implemetation and did not allow notifying observers + from within `poll_*` functions. It has been replaced with an implementation of Sink on Pharos. +- got rid of dependency on pin_project. +- as Error::kind returns a reference to the error kind, you can now compare `ErrorKind::SomeVariant == err.kind()` without having to write: `&ErrorKind::SomeVariant == err.kind()`. +- updated to futures-preview 0.3.0-alpha.19 + ## 0.3.2 - 2019-09-23 - check spelling diff --git a/Cargo.toml b/Cargo.toml index 09b3697..497562c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,12 +7,13 @@ status = "actively-developed" repository = "najamelan/pharos" [dependencies] -pin-project = "^0.4.0-beta" - [dependencies.futures-preview] -features = ["async-await", "nightly"] +features = ["async-await"] version = "^0.3.0-alpha" +[dev-dependencies] +assert_matches = "^1" + [features] external_doc = [] @@ -27,7 +28,7 @@ license = "Unlicense" name = "pharos" readme = "README.md" repository = "https://github.com/najamelan/pharos" -version = "0.3.2" +version = "0.4.0" [package.metadata] [package.metadata.docs] diff --git a/Cargo.yml b/Cargo.yml index 8e1169c..f885989 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -10,7 +10,7 @@ package: # - merge dev branch into master # - create git tag # - version : 0.3.2 + version : 0.4.0 name : pharos authors : [ Naja Melan ] edition : '2018' @@ -43,6 +43,8 @@ badges: dependencies: - futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } - pin-project : ^0.4.0-beta + futures-preview: { version: ^0.3.0-alpha, features: [async-await] } +dev-dependencies: + + assert_matches: ^1 diff --git a/README.md b/README.md index c3de095..8a425b0 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to. -I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on. +I created it to leverage interoperability we can create by using async [Stream](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/stream/trait.Stream.html) and [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) from the futures library. So you can use all stream combinators, forward it into Sinks and so on. Minimal rustc version: 1.39. @@ -44,8 +44,8 @@ This crate has: `#![ forbid( unsafe_code ) ]` - [`Events`] is not clonable right now (would require support from the channels we use as back-ends, eg. broadcast type channel) - performance tweaking still needs to be done - pharos requires mut access for most operations. This is not intended to change anytime soon. Both on - [notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal - state, and most channels also require mutable access to either read or write. If you need it from non mutable + [send](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_util/sink/trait.SinkExt.html#method.send) and [observe](Observable::observe), the two main interfaces, manipulate internal + state, and most channels also require mutable access to either read or write. If you need it from immutable context, use interior mutability primitives like locks or Cells... ### Future work @@ -62,14 +62,14 @@ With [cargo yaml](https://gitlab.com/storedbox/cargo-yaml): ```yaml dependencies: - pharos: ^0.3 + pharos: ^0.4 ``` With raw Cargo.toml ```toml [dependencies] - pharos = "0.3" + pharos = "0.4" ``` ### Upgrade @@ -79,31 +79,30 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master ### Dependencies -This crate only has two dependencies. Cargo will automatically handle it's dependencies for you. +This crate only has one dependencies. Cargo will automatically handle it's dependencies for you. ```yaml dependencies: - futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } - pin-project : ^0.4.0-beta + futures-preview: { version: ^0.3.0-alpha, features: [async-await, nightly] } ``` ## Usage -pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages -fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). +`pharos` only works from async code, implementing Sink to notify observers. You can notify observers from within +`poll_*` methods by calling the poll methods of the [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) impl directly. In async context you can use [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send). Observers must consume the messages fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers. -Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size (eg. there's no point putting an enum without data in an Arc). +Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than 2 pointer sizes (eg. there's no point putting an enum without data in an Arc). Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is the most basic one: ```rust use { - pharos :: { * } , - futures :: { executor::block_on, StreamExt } , + pharos :: { * } , + futures :: { executor::block_on, StreamExt, SinkExt } , }; @@ -123,7 +122,10 @@ impl Goddess // pub async fn sail( &mut self ) { - self.pharos.notify( &GoddessEvent::Sailing ).await; + // It's infallible. Observers that error will be dropped, since the only kind of errors on + // channels are when the channel is closed. + // + self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" ); } } @@ -147,7 +149,9 @@ enum GoddessEvent // impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -164,7 +168,7 @@ fn main() // - channel type (bounded/unbounded) // - a predicate to filter events // - let mut events = isis.observe( Channel::Bounded( 3 ).into() ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" ); // trigger an event // @@ -209,7 +213,9 @@ struct Connection { pharos: Pharos } impl Observable for Connection { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -226,7 +232,7 @@ fn main() // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. // - let observer = conn.observe( filter.into() ); + let observer = conn.observe( filter.into() ).expect( "observe" ); // Combine both options. // diff --git a/TODO.md b/TODO.md index 9a59f97..84b3084 100644 --- a/TODO.md +++ b/TODO.md @@ -1,13 +1,7 @@ # TODO - make Events clone? means we can only work with broadcast channels - - switch to more performant channels (crossbeam). Will be easier once they provide an async api. - - allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks? -- scaling? For now we have an ever growing vector of observers - - other data structure than vec? smallvec? - - type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review. - - let users set capacity on creation? diff --git a/examples/basic.rs b/examples/basic.rs index 519e296..5d6541f 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,7 +1,7 @@ use { - pharos :: { * } , - futures :: { executor::block_on, StreamExt } , + pharos :: { * } , + futures :: { executor::block_on, StreamExt, SinkExt } , }; @@ -21,7 +21,10 @@ impl Goddess // pub async fn sail( &mut self ) { - self.pharos.notify( &GoddessEvent::Sailing ).await; + // It's infallible. Observers that error will be dropped, since the only kind of errors on + // channels are when the channel is closed. + // + let _ = self.pharos.send( GoddessEvent::Sailing ).await; } } @@ -45,7 +48,9 @@ enum GoddessEvent // impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -62,7 +67,7 @@ fn main() // - channel type (bounded/unbounded) // - a predicate to filter events // - let mut events = isis.observe( Channel::Bounded( 3 ).into() ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" ); // trigger an event // diff --git a/examples/filter.rs b/examples/filter.rs index 8dda3ce..30a1b56 100644 --- a/examples/filter.rs +++ b/examples/filter.rs @@ -16,7 +16,9 @@ struct Connection { pharos: Pharos } impl Observable for Connection { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -33,7 +35,7 @@ fn main() // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. // - let observer = conn.observe( filter.into() ); + let observer = conn.observe( filter.into() ).expect( "observe" ); // Combine both options. // diff --git a/src/error.rs b/src/error.rs index afd5c5d..ebfc611 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,35 +3,91 @@ use crate::{ import::* }; /// The error type for errors happening in `pharos`. /// -/// Use [`err.kind()`] to know which kind of error happened. +/// Use [`Error::kind()`] to know which kind of error happened. // #[ derive( Debug ) ] // -pub(crate) struct Error +pub struct Error { - pub(crate) inner: Option< Box >, + pub(crate) inner: Option< Box >, pub(crate) kind : ErrorKind, } +impl Error +{ + /// Identify which error happened. + // + pub fn kind( &self ) -> &ErrorKind + { + &self.kind + } +} + + +impl From for Error +{ + fn from( kind: ErrorKind ) -> Error + { + Error { inner: None, kind } + } +} + +impl From for Error +{ + fn from( inner: FutSendError ) -> Error + { + Error { inner: Some( Box::new( inner ) ), kind: ErrorKind::SendError } + } +} + + + /// The different kind of errors that can happen when you use the `pharos` API. // -#[ derive( Debug ) ] +#[ derive( Debug, Copy, Clone, PartialEq, Eq ) ] // -pub(crate) enum ErrorKind +pub enum ErrorKind { - /// Failed to send on channel, normally means it's closed. Pharos does not expose these errors - /// to the user. + #[ doc( hidden ) ] + // + //This variant is only used internally. // SendError, + /// The pharos object is already closed. You can no longer send messages or observe it. + /// This should only happen if you call [SinkExt::close](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.close) on it. + // + Closed, + + /// The minimum valid buffer size for [`Channel::Bounded`](crate::observable::Channel) is `1`, you sent in `0`. + // + MinChannelSizeOne, + #[ doc( hidden ) ] // __NonExhaustive__ } +impl PartialEq<&ErrorKind> for ErrorKind +{ + fn eq( &self, other: &&ErrorKind ) -> bool + { + self == *other + } +} + +impl PartialEq for &ErrorKind +{ + fn eq( &self, other: &ErrorKind ) -> bool + { + *self == other + } +} + + impl ErrorTrait for Error { @@ -50,7 +106,8 @@ impl fmt::Display for ErrorKind { match self { - Self::SendError => fmt::Display::fmt( "Channel closed.", f ) , + Self::SendError => fmt::Display::fmt( "Channel closed.", f ) , + Self::MinChannelSizeOne => fmt::Display::fmt( "The minimum valid buffer size for Channel::Bounded is 1, you send in 0.", f ) , _ => unreachable!(), } @@ -72,30 +129,3 @@ impl fmt::Display for Error } } - - -impl Error -{ - /// Allows matching on the error kind - // - pub(crate) fn _kind( &self ) -> &ErrorKind - { - &self.kind - } -} - -impl From for Error -{ - fn from( kind: ErrorKind ) -> Error - { - Error { inner: None, kind } - } -} - -impl From for Error -{ - fn from( inner: FutSendError ) -> Error - { - Error { inner: Some( Box::new( inner ) ), kind: ErrorKind::SendError } - } -} diff --git a/src/events.rs b/src/events.rs index 830edfc..b5ce49b 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,8 +1,11 @@ -use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; +use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error, ErrorKind }; + /// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). +/// You will only start receiving events from the moment you call this. Any events in the observed +/// object emitted before will not be delivered. /// -/// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>() == 16` +/// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>() == 16` // #[ derive( Debug ) ] // @@ -20,7 +23,7 @@ impl Events where Event: Clone + 'static + Send { Channel::Bounded( queue_size ) => { - let (tx, rx) = mpsc::channel( queue_size ); + let (tx, rx) = mpsc::channel( queue_size - 1 ); ( Sender::Bounded{ tx, filter: config.filter }, Receiver::Bounded{ rx } ) } @@ -40,9 +43,8 @@ impl Events where Event: Clone + 'static + Send } - /// Close the channel. This way the sender will stop sending new events, and you can still - /// continue to read any events that are still pending in the channel. This avoids data loss - /// compared to just dropping this object. + /// Disconnect from the observable object. This way the sender will stop sending new events + /// and you can still continue to read any events that are still pending in the channel. // pub fn close( &mut self ) { @@ -52,7 +54,8 @@ impl Events where Event: Clone + 'static + Send - +// Just forward +// impl Stream for Events where Event: Clone + 'static + Send { type Item = Event; @@ -66,14 +69,12 @@ impl Stream for Events where Event: Clone + 'static + Send /// The sender of the channel. -/// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>() == 56` -// -#[ pin_project ] +/// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>() == 56` // pub(crate) enum Sender where Event: Clone + 'static + Send { - Bounded { #[pin] tx: FutSender , filter: Option> } , - Unbounded{ #[pin] tx: FutUnboundedSender, filter: Option> } , + Bounded { tx: FutSender , filter: Option> } , + Unbounded{ tx: FutUnboundedSender, filter: Option> } , } @@ -81,7 +82,7 @@ pub(crate) enum Sender where Event: Clone + 'static + Send impl Sender where Event: Clone + 'static + Send { - // Verify whether this observer is still around + // Verify whether this observer is still around. // pub(crate) fn is_closed( &self ) -> bool { @@ -93,63 +94,36 @@ impl Sender where Event: Clone + 'static + Send } - // Notify the observer and return a bool indicating whether this observer is still - // operational. If an error happens on a channel it usually means that the channel - // is closed, in which case we should drop this sender. + /// Check whether this sender is interested in this event. // - pub(crate) async fn notify( &mut self, evt: &Event ) -> bool + pub(crate) fn filter( &mut self, evt: &Event ) -> bool { - if self.is_closed() { return false } - match self { - Sender::Bounded { tx, filter } => Self::notifier( tx, filter, evt ).await, - Sender::Unbounded{ tx, filter } => Self::notifier( tx, filter, evt ).await, + Sender::Bounded { filter, .. } => Self::filter_inner( filter, evt ), + Sender::Unbounded{ filter, .. } => Self::filter_inner( filter, evt ), } } - async fn notifier - ( - mut tx: impl Sink + Unpin , - filter: &mut Option> , - evt : &Event , - ) - - -> bool - + fn filter_inner( filter: &mut Option>, evt: &Event ) -> bool { - let interested = match filter + match filter { Some(f) => f.call(evt), None => true , - }; - - - #[ allow( clippy::match_bool ) ] - // - match interested - { - true => tx.send( evt.clone() ).await.is_ok(), - - // since we don't try to send, we know nothing about whether they are still - // observing, so assume they do. - // - false => true, } } } -/// The receiver of the channel. -// -#[ pin_project ] +/// The receiver of the channel, abstracting over different channel types. // enum Receiver where Event: Clone + 'static + Send { - Bounded { #[pin] rx: FutReceiver } , - Unbounded{ #[pin] rx: FutUnboundedReceiver } , + Bounded { rx: FutReceiver } , + Unbounded{ rx: FutUnboundedReceiver } , } @@ -186,16 +160,12 @@ impl Stream for Receiver where Event: Clone + 'static + Send { type Item = Event; - #[ project ] - // fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< Option > { - #[ project ] - // - match self.project() + match self.get_mut() { - Receiver::Bounded { rx } => rx.poll_next( cx ), - Receiver::Unbounded{ rx } => rx.poll_next( cx ), + Receiver::Bounded { rx } => Pin::new( rx ).poll_next( cx ), + Receiver::Unbounded{ rx } => Pin::new( rx ).poll_next( cx ), } } } @@ -206,57 +176,62 @@ impl Sink for Sender where Event: Clone + 'static + Send { type Error = Error; - #[ project ] - // + fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.poll_ready( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.poll_ready( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).poll_ready( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).poll_ready( cx ).map_err( Into::into ), } } - #[ project ] - // + fn start_send( self: Pin<&mut Self>, item: Event ) -> Result<(), Self::Error> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.start_send( item ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.start_send( item ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).start_send( item ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).start_send( item ).map_err( Into::into ), } } - /// This will do a send under the hood, so the same errors as from start_send can occur here. + + // Note that on futures-rs bounded channels poll_flush has a problematic implementation. + // - it just calls poll_ready, which means it will be pending when the buffer is full. So + // it will make SinkExt::send hang, bad! + // - it will swallow disconnected errors, so we don't get feedback allowing us to free slots. + // + // In principle channels are always flushed, because when the message is in the buffer, it's + // ready for the reader to read. So this should just be a noop. // - #[ project ] + // We compensate for the error swallowing by checking `is_closed`. // - fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + fn poll_flush( self: Pin<&mut Self>, _cx: &mut Context<'_> ) -> Poll> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.poll_flush( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.poll_flush( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => + { + if tx.is_closed() { Poll::Ready(Err( ErrorKind::Closed.into() ))} + else { Poll::Ready(Ok ( () ))} + } + + Sender::Unbounded{ tx, .. } => + { + if tx.is_closed() { Poll::Ready(Err( ErrorKind::Closed.into() ))} + else { Poll::Ready(Ok ( () ))} + } } } - #[ project ] - // + fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.poll_close( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.poll_close( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).poll_close( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).poll_close( cx ).map_err( Into::into ), } } } diff --git a/src/filter.rs b/src/filter.rs index 47aef3c..bd488f7 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,6 +1,8 @@ use crate :: { import::* }; -/// Predicate for filtering events. This is an enum because closures that capture variables from +/// Predicate for filtering events. +/// +/// This is an enum because closures that capture variables from /// their environment need to be boxed. More often than not, an event will be a simple enum and /// the predicate will just match on the variant, so it would be wasteful to impose boxing in those /// cases, hence there is a function pointer variant which does not require boxing. This should @@ -43,11 +45,11 @@ pub enum Filter where Event: Clone + 'static + Send , { - /// A function pointer to use to filter events. + /// A function pointer to a predicate to filter events. // Pointer( fn(&Event) -> bool ), - /// A boxed closure to use to filter events. + /// A boxed closure to a predicate to filter events. // Closure( Box bool + Send> ), } @@ -55,7 +57,7 @@ pub enum Filter impl Filter where Event: Clone + 'static + Send { - /// Invoke the predicate + /// Invoke the predicate. // pub(crate) fn call( &mut self, evt: &Event ) -> bool { diff --git a/src/lib.rs b/src/lib.rs index 48c8972..d33f468 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,12 +40,7 @@ pub use filter :: { Filter } , observable :: { Observable, ObserveConfig, Channel } , events :: { Events } , -}; - - -pub(crate) use -{ - error :: { Error } , + error :: { Error, ErrorKind } , }; @@ -55,11 +50,10 @@ mod import { std :: { fmt, error::Error as ErrorTrait, ops::Deref, any::type_name } , std :: { task::{ Poll, Context }, pin::Pin } , - pin_project :: { project, pin_project } , futures :: { - future::{ join_all }, Stream, Sink, SinkExt, + Stream, Sink, ready, channel::mpsc:: { @@ -72,4 +66,12 @@ mod import } , }, }; + + #[ cfg( test ) ] + // + pub(crate) use + { + assert_matches :: { assert_matches } , + futures :: { future::poll_fn, executor::block_on, SinkExt } , + }; } diff --git a/src/observable.rs b/src/observable.rs index 5a97a08..e6d27c3 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -28,6 +28,8 @@ use crate :: { Filter, Events }; /// /// impl Steps /// { +/// // We can use this as a predicate to filter events. +/// // /// fn is_err( &self ) -> bool /// { /// match self @@ -39,15 +41,17 @@ use crate :: { Filter, Events }; /// } /// /// -/// // The object we want to be observable +/// // The object we want to be observable. /// // /// struct Foo { pharos: Pharos }; /// /// impl Observable for Foo /// { +/// type Error = pharos::Error; +/// /// // Pharos implements observable, so we just forward the call. /// // -/// fn observe( &mut self, options: ObserveConfig ) -> Events +/// fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > /// { /// self.pharos.observe( options ) /// } @@ -59,9 +63,9 @@ use crate :: { Filter, Events }; /// async fn task() /// { /// let mut foo = Foo { pharos: Pharos::default() }; -/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ); +/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ).expect( "observe" ); /// -/// // will only be notified on errors now +/// // will only be notified on errors thanks to the filter. /// // /// let next_error = errors.next().await; /// } @@ -71,10 +75,23 @@ pub trait Observable where Event: Clone + 'static + Send , { - /// Add an observer to the observable. Options can be in order to choose channel type and + /// The error type that is returned if observing is not possible. + /// + /// [Pharos](crate::Pharos) implements + /// [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) + /// which has a close method, so observing will no longer be possible after close is called. + /// + /// Other than that, you might want to have moments in your objects lifetime when you don't want to take + /// any more observers. Returning a result from [observe](Observable::observe) enables that. + /// + /// You can of course map the error of pharos to your own error type. + // + type Error: std::error::Error; + + /// Add an observer to the observable. Options allow chosing the channel type and /// to filter events with a predicate. // - fn observe( &mut self, options: ObserveConfig ) -> Events; + fn observe( &mut self, options: ObserveConfig ) -> Result, Self::Error>; } @@ -85,8 +102,11 @@ pub trait Observable // pub enum Channel { - /// A channel with a limited buffer (the usize parameter). Creates back pressure when the buffer is full. + + /// A channel with a limited message queue (the usize parameter). Creates back pressure when the buffer is full. /// This means that producer tasks may block if consumers can't process fast enough. + /// + /// The minimum valid buffer size is 1. // Bounded(usize), @@ -111,8 +131,9 @@ impl Default for Channel -/// Configuration for your event stream, passed to [Observable::observe] when subscribing. -/// This let's you choose the type of [channel](Channel) and let's +/// Configuration for your event stream. +/// +/// Pass to [Observable::observe] when subscribing. This let's you choose the type of [channel](Channel) and let's /// you set a [filter](Filter) to ignore certain events. /// /// ``` @@ -212,7 +233,7 @@ impl ObserveConfig where Event: Clone + 'static + Send } -/// Create a ObserveConfig from a [Channel], getting default values for other options. +/// Create a [ObserveConfig] from a [Channel], getting default values for other options. // impl From for ObserveConfig where Event: Clone + 'static + Send { @@ -223,7 +244,7 @@ impl From for ObserveConfig where Event: Clone + 'static } -/// Create a ObserveConfig from a [Filter], getting default values for other options. +/// Create a [ObserveConfig] from a [Filter], getting default values for other options. // impl From> for ObserveConfig where Event: Clone + 'static + Send { diff --git a/src/pharos.rs b/src/pharos.rs index 919363b..885ef2e 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,8 +1,8 @@ -use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; +use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Error, ErrorKind, Channel }; -/// The Pharos lighthouse. When you implement Observable on your type, you can forward -/// the [`observe`](Observable::observe) method to Pharos and call notify on it. +/// The Pharos lighthouse. When you implement [Observable] on your type, you can forward +/// the [`observe`](Observable::observe) method to Pharos and use [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send) to notify observers. /// /// You can of course create several `Pharos` (I know, historical sacrilege) for (different) types /// of events. @@ -12,28 +12,38 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; /// /// ## Implementation. /// -/// Currently just holds a `Vec>`. It will stop notifying observers if the channel has -/// returned an error, which usually means it is closed or disconnected. However, we currently don't -/// compact the vector or use a more performant data structure for the observers. +/// Currently just holds a `Vec>`. It will drop observers if the channel has +/// returned an error, which means it is closed or disconnected. However, we currently don't +/// compact the vector. Slots are reused for new observers, but the vector never shrinks. /// -/// In observe, we do loop the vector to find a free spot to re-use before pushing. -/// -/// **Note**: we only detect that observers can be removed when [Pharos::notify] or [Pharos::num_observers] +/// **Note**: we only detect that observers can be removed when [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send) or [Pharos::num_observers] /// is being called. Otherwise, we won't find out about disconnected observers and the vector of observers /// will not mark deleted observers and thus their slots can not be reused. /// -/// Right now, in notify, we use `join_all` from the futures library to notify all observers concurrently. -/// We take all of our senders out of the options in our vector, operate on them and put them back if -/// they did not generate an error. +/// The [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) impl +/// is not very optimized for the moment. It just loops over all observers in each poll method +/// so it will call `poll_ready` and `poll_flush` again for observers that already returned `Poll::Ready(Ok(()))`. /// -/// `join_all` will allocate a new vector on every notify from what our concurrent futures return. Ideally -/// we would use a data structure which allows &mut access to individual elements, so we can work on them -/// concurrently in place without reallocating. I am looking into the partitions crate, but that's for -/// the next release ;). +/// TODO: I will do some benchmarking and see if this can be improved, eg. by keeping a state which tracks which +/// observers we still have to poll. // pub struct Pharos where Event: 'static + Clone + Send { - observers: Vec >>, + // Observers never get moved. Their index stays stable, so that when we free a slot, + // we can store that in `free_slots`. + // + observers : Vec >>, + free_slots: Vec , + state : State , +} + + +#[ derive( Clone, Debug, PartialEq ) ] +// +enum State +{ + Ready, + Closed, } @@ -52,67 +62,22 @@ impl Pharos where Event: 'static + Clone + Send { /// Create a new Pharos. May it's light guide you to safe harbor. /// - /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers + /// You can set the initial capacity of the vector of observers, if you know you will a lot of observers /// it will save allocations by setting this to a higher number. /// - /// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. + /// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. // pub fn new( capacity: usize ) -> Self { Self { - observers: Vec::with_capacity( capacity ), + observers : Vec::with_capacity( capacity ), + free_slots: Vec::with_capacity( capacity ), + state : State::Ready , } } - - /// Notify all observers of Event `evt`. - /// - /// Currently allocates a new vector for all observers on every run. That will be fixed in future - /// versions. - // - pub async fn notify( &mut self, evt: &Event ) - { - // Try to send to all channels in parallel, so they can all start processing this event - // even if one of them is blocked on a full queue. - // - // We can not have mutable access in parallel, so we take options out and put them back. - // - // The output of the join is a vec of options with the disconnected observers removed. - // - let fut = join_all - ( - self.observers.iter_mut().map( |opt| - { - let opt = opt.take(); - - async - { - let mut new = None; - - if let Some( mut s ) = opt - { - if s.notify( evt ).await - { - new = Some( s ) - } - } - - new - } - - }) - ); - - - // Put back the observers that we "borrowed" - // TODO: compact the vector from time to time? - // - self.observers = fut.await; - } - - /// Returns the size of the vector used to store the observers. Useful for debugging and testing if it /// seems to get to big. // @@ -122,7 +87,7 @@ impl Pharos where Event: 'static + Clone + Send } - /// Returns the number of actual observers that are still listening (have not closed or dropped the Events). + /// Returns the number of actual observers that are still listening (have not closed or dropped the [Events]). /// This will loop and it will verify for each if they are closed, clearing them from the internal storage /// if they are closed. This is similar to what notify does, but without sending an event. // @@ -130,14 +95,20 @@ impl Pharos where Event: 'static + Clone + Send { let mut count = 0; - for opt in self.observers.iter_mut() + + for (i, opt) in self.observers.iter_mut().enumerate() { - if let Some(observer) = opt.take() + if let Some(observer) = opt { if !observer.is_closed() { count += 1; - *opt = Some( observer ); + } + + else + { + self.free_slots.push( i ); + *opt = None } } } @@ -162,43 +133,241 @@ impl Default for Pharos where Event: 'static + Clone + Send impl Observable for Pharos where Event: 'static + Clone + Send { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = Error; + + /// Will re-use slots from disconnected observers to avoid growing to much. + /// + /// TODO: provide API for the client to compact the pharos object after reducing the + /// number of observers. + // + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ); + } + + + match options.channel + { + Channel::Bounded(queue_size) => + { + if queue_size < 1 + { + return Err( ErrorKind::MinChannelSizeOne.into() ); + } + } + + _ => {} + } + + let (events, sender) = Events::new( options ); - let mut new_observer = Some(sender); - // Try to find a free slot + // Try to reuse a free slot // - for option in &mut self.observers + if let Some( i ) = self.free_slots.pop() + { + self.observers[i] = Some( sender ); + } + + else { - if option.is_none() + self.observers.push( Some( sender ) ); + } + + Ok( events ) + } +} + + + +// See the documentation on Channel for how poll functions work for the channels we use. +// +impl Sink for Pharos where Event: Clone + 'static + Send +{ + type Error = Error; + + + fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ).into(); + } + + + // As soon as any is not ready, we are not ready + // + for obs in self.get_mut().observers.iter_mut() + { + if let Some( ref mut o ) = obs { - *option = new_observer.take(); - break; + let res = ready!( Pin::new( o ).poll_ready( cx ) ); + + // Errors mean disconnected, so drop. + // + if res.is_err() + { + *obs = None; + } } } - // no free slots found + Ok(()).into() + } + + + + fn start_send( self: Pin<&mut Self>, evt: Event ) -> Result<(), Self::Error> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ); + } + + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + // if this spot in the vector has a sender + // + if let Some( obs ) = opt + { + // if it's closed, let's remove it. + // + if obs.is_closed() + { + this.free_slots.push( i ); + + *opt = None; + } + + // else if it is interested in this event + // + else if obs.filter( &evt ) + { + // if sending fails, remove it + // + if Pin::new( obs ).start_send( evt.clone() ).is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + } + + Ok(()) + } + + + + fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ).into(); + } + + + // We loop over all, polling them all. If any return pending, we return pending. + // If any return an error, we drop them. // - if new_observer.is_some() + let mut pending = false; + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() { - self.observers.push( new_observer ); + if let Some( ref mut obs ) = opt + { + match Pin::new( obs ).poll_flush( cx ) + { + Poll::Pending => pending = true , + Poll::Ready(Ok(_)) => continue , + + Poll::Ready(Err(_)) => + { + this.free_slots.push( i ); + + *opt = None; + } + } + } } - events + if pending { Poll::Pending } + else { Ok(()).into() } } -} + /// Will close and drop all observers. The pharos object will remain operational however. + /// The main annoyance would be that we'd have to make + // + fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + if self.state == State::Closed + { + return Ok(()).into(); + } + + else + { + self.state = State::Closed; + } + + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + if let Some( ref mut obs ) = opt + { + let res = ready!( Pin::new( obs ).poll_close( cx ) ); + + if res.is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + + Ok(()).into() + } +} + + #[ cfg( test ) ] // mod tests { - use super::*; + // Tested: + // + // - ✔ debug impl shows generic type + // - ✔ storage length and free slots bookkeeping + // - ✔ observe: we actually reuse free slots + // - ✔ observe: cannot observe after calling close + // - ✔ observe: refuse Channel::Bounded(0) + // - ✔ poll_ready have a channel that is full, verify we return pending. + // - ✔ poll_ready have a channel that is disconnected, verify we drop it. + // - ✔ poll_ready should return closed if the pharos is closed. + // - ✔ start_send verify message arrives + // - ✔ start_send drop disconnected channel + // - ✔ start_send filter message + // - ✔ poll_flush drop on error + // + use crate :: { *, import::* }; #[test] // @@ -214,6 +383,7 @@ mod tests // fn size_of_sender() // { // dbg!( std::mem::size_of::>>() ); + // dbg!( std::mem::size_of::>() ); // } @@ -237,32 +407,37 @@ mod tests { let mut ph = Pharos::::default(); - assert_eq!( ph.storage_len (), 0 ); - assert_eq!( ph.num_observers(), 0 ); + assert_eq!( ph.storage_len (), 0 ); + assert_eq!( ph.num_observers (), 0 ); + assert_eq!( ph.free_slots.len(), 0 ); - let mut a = ph.observe( ObserveConfig::default() ); + let mut a = ph.observe( ObserveConfig::default() ).expect( "observe" ); - assert_eq!( ph.storage_len (), 1 ); - assert_eq!( ph.num_observers(), 1 ); + assert_eq!( ph.storage_len (), 1 ); + assert_eq!( ph.num_observers (), 1 ); + assert_eq!( ph.free_slots.len(), 0 ); - let b = ph.observe( ObserveConfig::default() ); + let b = ph.observe( ObserveConfig::default() ).expect( "observe" ); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 2 ); + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers (), 2 ); + assert_eq!( ph.free_slots.len(), 0 ); a.close(); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 1 ); + assert_eq!( ph.storage_len () , 2 ); + assert_eq!( ph.num_observers() , 1 ); + assert_eq!( &ph.free_slots , &[0] ); drop( b ); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 0 ); + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers(), 0 ); + assert_eq!( &ph.free_slots , &[0, 1] ); } - // Make sure we are reusing slots + // observe: Make sure we are reusing slots // #[test] // @@ -284,18 +459,199 @@ mod tests assert_eq!( ph.num_observers(), 2 ); assert!( ph.observers[1].is_none() ); + assert_eq!( &ph.free_slots, &[1] ); let _d = ph.observe( ObserveConfig::default() ); - assert_eq!( ph.storage_len (), 3 ); - assert_eq!( ph.num_observers(), 3 ); + assert_eq!( ph.storage_len (), 3 ); + assert_eq!( ph.num_observers (), 3 ); + assert_eq!( ph.free_slots.len(), 0 ); let _e = ph.observe( ObserveConfig::default() ); // Now we should have pushed again // - assert_eq!( ph.storage_len (), 4 ); - assert_eq!( ph.num_observers(), 4); + assert_eq!( ph.storage_len (), 4 ); + assert_eq!( ph.num_observers (), 4); + assert_eq!( ph.free_slots.len(), 0 ); + } + + + // observe: verify we can no longer observe after calling close + // + #[test] + // + fn observe_after_close() + { + let mut ph = Pharos::::default(); + + block_on( ph.close() ).expect( "close" ); + + let res = ph.observe( ObserveConfig::default() ); + + assert! ( res.is_err() ); + assert_eq!( ErrorKind::Closed, res.unwrap_err().kind() ); + } + + + // observe: refuse Channel::Bounded(0) + // + #[test] + // + fn observe_refuse_zero() + { + let mut ph = Pharos::::default(); + + let res = ph.observe( Channel::Bounded(0).into() ); + + assert! ( res.is_err() ); + assert_eq!( ErrorKind::MinChannelSizeOne, res.unwrap_err().kind() ); + } + + + // verify that one observer blocks pharos. + // + #[ test ] + // + fn poll_ready_pending() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let mut full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + assert!( ph.as_mut().start_send( true ).is_ok() ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Pending ); + + assert_eq!( Pin::new( &mut full ).poll_next(cx), Poll::Ready(Some(true))); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + + ().into() + })); + } + + + + // pharos drops closed observers. + // + #[ test ] + // + fn poll_ready_drop() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + drop( full ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + + assert!( ph.observers[1].is_none() ); + ().into() + })); } + + + + // poll_ready should return closed if the pharos is closed. + // + #[ test ] + // + fn poll_ready_closed() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::::default(); + + let mut ph = Pin::new( &mut ph ); + + assert_matches!( ph.as_mut().poll_close( cx ), Poll::Ready(Ok(())) ); + + let res = ph.as_mut().poll_ready( &mut cx ); + + assert_matches!( res, Poll::Ready( Err(_) ) ); + + match res + { + Poll::Ready( Err( e ) ) => assert_eq!( ErrorKind::Closed, e.kind() ), + _ => assert!( false, "wrong result " ), + } + + ().into() + + })); + } + + + + // start_send verify message arrives. + // + #[ test ] + // + fn start_send_arrive() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let mut full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + assert!( ph.as_mut().start_send( 3 ).is_ok() ); + + assert_eq!( Pin::new( &mut full ).poll_next(cx), Poll::Ready(Some(3))); + + ().into() + })); + } + + + + // pharos drops closed observers. + // + #[ test ] + // + fn poll_flush_drop() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + drop( full ); + + assert_matches!( ph.as_mut().poll_flush( &mut cx ), Poll::Ready( Ok(_) ) ); + + assert!( ph.observers[1].is_none() ); + ().into() + })); + } + + + + } diff --git a/tests/bounded.rs b/tests/bounded.rs index 4f5c1ff..a33b7d9 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -22,7 +22,7 @@ fn basic() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events = isis.observe( Channel::Bounded( 2 ).into() ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -45,7 +45,7 @@ fn close_receiver() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.sail().await; events.close(); @@ -66,8 +66,8 @@ fn one_receiver_drops() block_on( async move { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ); - let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ); + let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ).expect( "observe" ); + let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ).expect( "observe" ); isis.sail().await; @@ -99,8 +99,8 @@ fn types() // Note that because of the asserts below type inference works here and we don't have to // put type annotation, but I do find it quite obscure and better to be explicit. // - let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.shine().await; isis.sail ().await; @@ -124,8 +124,8 @@ fn threads() block_on( async move { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ); - let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ); + let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); thread::spawn( move || @@ -216,7 +216,7 @@ fn filter() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -249,7 +249,7 @@ fn filter_true() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -285,7 +285,7 @@ fn filter_false() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -325,7 +325,7 @@ fn filter_move() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter_boxed( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; diff --git a/tests/combined.rs b/tests/combined.rs index 75553b9..78748ee 100644 --- a/tests/combined.rs +++ b/tests/combined.rs @@ -15,11 +15,11 @@ fn both() { let mut isis = Goddess::new(); - let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); - let mut ubevts: Events = isis.observe( ObserveConfig::default() ); - let mut ubnuts: Events = isis.observe( ObserveConfig::default() ); + let mut ubevts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut ubnuts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail ().await; isis.shine().await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ff76870..54d711a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,15 +6,17 @@ pub mod import // pub(crate) use { - pharos :: { * } , - std :: { sync::Arc, thread } , + pharos :: { * } , + std :: { sync::Arc, thread, task::{ Context, Poll }, pin::Pin } , futures :: { - channel::mpsc :: Receiver , - channel::mpsc :: UnboundedReceiver , - executor :: block_on , - stream :: StreamExt , + channel::mpsc :: Receiver , + channel::mpsc :: UnboundedReceiver , + executor :: block_on , + stream :: Stream, StreamExt, SinkExt , + sink :: Sink , + future :: poll_fn , }, }; } @@ -43,19 +45,19 @@ impl Goddess pub async fn sail( &mut self ) { - self.isis.notify( &IsisEvent::Sail ).await; + self.isis.send( IsisEvent::Sail ).await.expect( "send event" ); } pub async fn dock( &mut self ) { - self.isis.notify( &IsisEvent::Dock ).await; + self.isis.send( IsisEvent::Dock ).await.expect( "send event" ); } pub async fn shine( &mut self ) { let evt = NutEvent { time: "midnight".into() }; - self.nut.notify( &evt ).await; + self.nut.send( evt ).await.expect( "send event" ); } } @@ -81,7 +83,9 @@ pub struct NutEvent impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { self.isis.observe( options ) } @@ -90,7 +94,9 @@ impl Observable for Goddess impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { self.nut.observe( options ) } diff --git a/tests/unbounded.rs b/tests/unbounded.rs index d103f86..756e3ee 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -23,7 +23,7 @@ fn basic() { let mut isis = Goddess::new(); - let mut events = isis.observe( ObserveConfig::default() ); + let mut events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -46,7 +46,7 @@ fn close_receiver() { let mut isis = Goddess::new(); - let mut events = isis.observe( ObserveConfig::default() ); + let mut events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; events.close(); @@ -68,8 +68,8 @@ fn one_receiver_drops() { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( ObserveConfig::default() ); - let mut shine_evts = isis.observe( ObserveConfig::default() ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; @@ -100,8 +100,8 @@ fn types() { let mut isis = Goddess::new(); - let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ); - let mut shine_evts: Events = isis.observe( ObserveConfig::default() ); + let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail ().await; isis.shine().await; @@ -125,8 +125,8 @@ fn threads() { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( ObserveConfig::default() ); - let mut shine_evts = isis.observe( ObserveConfig::default() ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); thread::spawn( move || @@ -159,7 +159,7 @@ fn alot_of_events() { let mut w = Goddess::new(); - let mut events = w.observe( ObserveConfig::default() ); + let mut events = w.observe( ObserveConfig::default() ).expect( "observe" ); let amount = 1000; @@ -200,7 +200,7 @@ fn filter() } }; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -230,7 +230,7 @@ fn filter_true() let filter = |_: &IsisEvent| true; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -263,7 +263,7 @@ fn filter_false() let filter = |_: &IsisEvent| false; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -300,7 +300,7 @@ fn filter_move() } }; - let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await;