From db85733b979e8feec70b3fed9ee4fcd2fecc27a4 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Tue, 24 Sep 2019 16:35:21 +0200 Subject: [PATCH 01/13] Actually, we don't need pin_project --- Cargo.toml | 2 -- Cargo.yml | 1 - src/events.rs | 68 +++++++++++++++++---------------------------------- src/lib.rs | 1 - 4 files changed, 23 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 09b3697..036092f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,6 @@ status = "actively-developed" repository = "najamelan/pharos" [dependencies] -pin-project = "^0.4.0-beta" - [dependencies.futures-preview] features = ["async-await", "nightly"] version = "^0.3.0-alpha" diff --git a/Cargo.yml b/Cargo.yml index 8e1169c..8ea4b6e 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -44,5 +44,4 @@ badges: dependencies: futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } - pin-project : ^0.4.0-beta diff --git a/src/events.rs b/src/events.rs index 830edfc..b133980 100644 --- a/src/events.rs +++ b/src/events.rs @@ -68,12 +68,10 @@ impl Stream for Events where Event: Clone + 'static + Send /// The sender of the channel. /// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>() == 56` // -#[ pin_project ] -// pub(crate) enum Sender where Event: Clone + 'static + Send { - Bounded { #[pin] tx: FutSender , filter: Option> } , - Unbounded{ #[pin] tx: FutUnboundedSender, filter: Option> } , + Bounded { tx: FutSender , filter: Option> } , + Unbounded{ tx: FutUnboundedSender, filter: Option> } , } @@ -144,12 +142,10 @@ impl Sender where Event: Clone + 'static + Send /// The receiver of the channel. // -#[ pin_project ] -// enum Receiver where Event: Clone + 'static + Send { - Bounded { #[pin] rx: FutReceiver } , - Unbounded{ #[pin] rx: FutUnboundedReceiver } , + Bounded { rx: FutReceiver } , + Unbounded{ rx: FutUnboundedReceiver } , } @@ -186,16 +182,12 @@ impl Stream for Receiver where Event: Clone + 'static + Send { type Item = Event; - #[ project ] - // fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< Option > { - #[ project ] - // - match self.project() + match self.get_mut() { - Receiver::Bounded { rx } => rx.poll_next( cx ), - Receiver::Unbounded{ rx } => rx.poll_next( cx ), + Receiver::Bounded { rx } => Pin::new( rx ).poll_next( cx ), + Receiver::Unbounded{ rx } => Pin::new( rx ).poll_next( cx ), } } } @@ -206,57 +198,43 @@ impl Sink for Sender where Event: Clone + 'static + Send { type Error = Error; - #[ project ] - // + fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.poll_ready( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.poll_ready( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).poll_ready( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).poll_ready( cx ).map_err( Into::into ), } } - #[ project ] - // + fn start_send( self: Pin<&mut Self>, item: Event ) -> Result<(), Self::Error> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.start_send( item ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.start_send( item ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).start_send( item ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).start_send( item ).map_err( Into::into ), } } - /// This will do a send under the hood, so the same errors as from start_send can occur here. - // - #[ project ] - // + fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.poll_flush( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.poll_flush( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).poll_flush( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).poll_flush( cx ).map_err( Into::into ), } } - #[ project ] - // + fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> { - #[ project ] - // - match self.project() + match self.get_mut() { - Sender::Bounded { tx, .. } => tx.poll_close( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => tx.poll_close( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => Pin::new( tx ).poll_close( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => Pin::new( tx ).poll_close( cx ).map_err( Into::into ), } } } diff --git a/src/lib.rs b/src/lib.rs index 48c8972..12504bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,6 @@ mod import { std :: { fmt, error::Error as ErrorTrait, ops::Deref, any::type_name } , std :: { task::{ Poll, Context }, pin::Pin } , - pin_project :: { project, pin_project } , futures :: { From 72d5892ae66a1abb11d72d713e15fa9626f9cb36 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Tue, 24 Sep 2019 19:53:04 +0200 Subject: [PATCH 02/13] Another breaking change to the API. - impl Sink for Pharos instead of a notify method. This makes sure poll methods are available when needing to notify from within other poll fn. - We loop over all observers on every call to poll_ready and poll_flush, which should be improved --- README.md | 33 +++-- TODO.md | 6 - examples/basic.rs | 15 +- examples/filter.rs | 6 +- src/error.rs | 13 +- src/events.rs | 37 +---- src/lib.rs | 9 +- src/observable.rs | 26 +++- src/pharos.rs | 329 ++++++++++++++++++++++++++++++++------------ tests/bounded.rs | 24 ++-- tests/combined.rs | 8 +- tests/common/mod.rs | 16 ++- tests/unbounded.rs | 26 ++-- 13 files changed, 348 insertions(+), 200 deletions(-) diff --git a/README.md b/README.md index c3de095..5997e8c 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,8 @@ More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to. -I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on. +I created it to leverage interoperability we can create by using async [Stream](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_core/stream/trait.Stream.html) and [Sink](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures/sink/trait.Sink.html +) from the futures library. So you can use all stream combinators, forward it into Sinks and so on. Minimal rustc version: 1.39. @@ -44,8 +45,8 @@ This crate has: `#![ forbid( unsafe_code ) ]` - [`Events`] is not clonable right now (would require support from the channels we use as back-ends, eg. broadcast type channel) - performance tweaking still needs to be done - pharos requires mut access for most operations. This is not intended to change anytime soon. Both on - [notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal - state, and most channels also require mutable access to either read or write. If you need it from non mutable + [send](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_util/sink/trait.SinkExt.html#method.send) and [observe](Observable::observe), the two main interfaces, manipulate internal + state, and most channels also require mutable access to either read or write. If you need it from immutable context, use interior mutability primitives like locks or Cells... ### Future work @@ -79,18 +80,17 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master ### Dependencies -This crate only has two dependencies. Cargo will automatically handle it's dependencies for you. +This crate only has one dependencies. Cargo will automatically handle it's dependencies for you. ```yaml dependencies: futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } - pin-project : ^0.4.0-beta ``` ## Usage -pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages +`pharos` only works for async code, as the notify method is asynchronous. Observers must consume the messages fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers. @@ -102,8 +102,8 @@ Examples can be found in the [examples](https://github.com/najamelan/pharos/tree ```rust use { - pharos :: { * } , - futures :: { executor::block_on, StreamExt } , + pharos :: { * } , + futures :: { executor::block_on, StreamExt, SinkExt } , }; @@ -123,7 +123,10 @@ impl Goddess // pub async fn sail( &mut self ) { - self.pharos.notify( &GoddessEvent::Sailing ).await; + // It's infallible. Observers that error will be dropped, since the only kind of errors on + // channels are when the channel is closed. + // + self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" ); } } @@ -147,7 +150,9 @@ enum GoddessEvent // impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -164,7 +169,7 @@ fn main() // - channel type (bounded/unbounded) // - a predicate to filter events // - let mut events = isis.observe( Channel::Bounded( 3 ).into() ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" ); // trigger an event // @@ -209,7 +214,9 @@ struct Connection { pharos: Pharos } impl Observable for Connection { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -226,7 +233,7 @@ fn main() // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. // - let observer = conn.observe( filter.into() ); + let observer = conn.observe( filter.into() ).expect( "observe" ); // Combine both options. // diff --git a/TODO.md b/TODO.md index 9a59f97..84b3084 100644 --- a/TODO.md +++ b/TODO.md @@ -1,13 +1,7 @@ # TODO - make Events clone? means we can only work with broadcast channels - - switch to more performant channels (crossbeam). Will be easier once they provide an async api. - - allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks? -- scaling? For now we have an ever growing vector of observers - - other data structure than vec? smallvec? - - type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review. - - let users set capacity on creation? diff --git a/examples/basic.rs b/examples/basic.rs index 519e296..5d6541f 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,7 +1,7 @@ use { - pharos :: { * } , - futures :: { executor::block_on, StreamExt } , + pharos :: { * } , + futures :: { executor::block_on, StreamExt, SinkExt } , }; @@ -21,7 +21,10 @@ impl Goddess // pub async fn sail( &mut self ) { - self.pharos.notify( &GoddessEvent::Sailing ).await; + // It's infallible. Observers that error will be dropped, since the only kind of errors on + // channels are when the channel is closed. + // + let _ = self.pharos.send( GoddessEvent::Sailing ).await; } } @@ -45,7 +48,9 @@ enum GoddessEvent // impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -62,7 +67,7 @@ fn main() // - channel type (bounded/unbounded) // - a predicate to filter events // - let mut events = isis.observe( Channel::Bounded( 3 ).into() ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" ); // trigger an event // diff --git a/examples/filter.rs b/examples/filter.rs index 8dda3ce..30a1b56 100644 --- a/examples/filter.rs +++ b/examples/filter.rs @@ -16,7 +16,9 @@ struct Connection { pharos: Pharos } impl Observable for Connection { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -33,7 +35,7 @@ fn main() // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. // - let observer = conn.observe( filter.into() ); + let observer = conn.observe( filter.into() ).expect( "observe" ); // Combine both options. // diff --git a/src/error.rs b/src/error.rs index afd5c5d..2c550e4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use crate::{ import::* }; // #[ derive( Debug ) ] // -pub(crate) struct Error +pub struct Error { pub(crate) inner: Option< Box >, pub(crate) kind : ErrorKind, @@ -19,13 +19,18 @@ pub(crate) struct Error // #[ derive( Debug ) ] // -pub(crate) enum ErrorKind +pub enum ErrorKind { - /// Failed to send on channel, normally means it's closed. Pharos does not expose these errors - /// to the user. + #[ doc( hidden ) ] + // + //This variant is only used internally. // SendError, + /// The pharos object is already closed. You can no longer + // + Closed, + #[ doc( hidden ) ] // __NonExhaustive__ diff --git a/src/events.rs b/src/events.rs index b133980..a645f1d 100644 --- a/src/events.rs +++ b/src/events.rs @@ -91,49 +91,24 @@ impl Sender where Event: Clone + 'static + Send } - // Notify the observer and return a bool indicating whether this observer is still - // operational. If an error happens on a channel it usually means that the channel - // is closed, in which case we should drop this sender. + /// Check whether this sender is interested in this event // - pub(crate) async fn notify( &mut self, evt: &Event ) -> bool + pub(crate) fn filter( &mut self, evt: &Event ) -> bool { - if self.is_closed() { return false } - match self { - Sender::Bounded { tx, filter } => Self::notifier( tx, filter, evt ).await, - Sender::Unbounded{ tx, filter } => Self::notifier( tx, filter, evt ).await, + Sender::Bounded { filter, .. } => Self::filter_inner( filter, evt ), + Sender::Unbounded{ filter, .. } => Self::filter_inner( filter, evt ), } } - async fn notifier - ( - mut tx: impl Sink + Unpin , - filter: &mut Option> , - evt : &Event , - ) - - -> bool - + fn filter_inner( filter: &mut Option>, evt: &Event ) -> bool { - let interested = match filter + match filter { Some(f) => f.call(evt), None => true , - }; - - - #[ allow( clippy::match_bool ) ] - // - match interested - { - true => tx.send( evt.clone() ).await.is_ok(), - - // since we don't try to send, we know nothing about whether they are still - // observing, so assume they do. - // - false => true, } } } diff --git a/src/lib.rs b/src/lib.rs index 12504bc..5529e35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,12 +40,7 @@ pub use filter :: { Filter } , observable :: { Observable, ObserveConfig, Channel } , events :: { Events } , -}; - - -pub(crate) use -{ - error :: { Error } , + error :: { Error, ErrorKind } , }; @@ -58,7 +53,7 @@ mod import futures :: { - future::{ join_all }, Stream, Sink, SinkExt, + Stream, Sink, ready, channel::mpsc:: { diff --git a/src/observable.rs b/src/observable.rs index 5a97a08..df87300 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -1,4 +1,4 @@ -use crate :: { Filter, Events }; +use crate :: { import::*, Filter, Events }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. @@ -28,6 +28,8 @@ use crate :: { Filter, Events }; /// /// impl Steps /// { +/// // We can use this as a predicate to filter events. +/// // /// fn is_err( &self ) -> bool /// { /// match self @@ -39,15 +41,17 @@ use crate :: { Filter, Events }; /// } /// /// -/// // The object we want to be observable +/// // The object we want to be observable. /// // /// struct Foo { pharos: Pharos }; /// /// impl Observable for Foo /// { +/// type Error = pharos::Error; +/// /// // Pharos implements observable, so we just forward the call. /// // -/// fn observe( &mut self, options: ObserveConfig ) -> Events +/// fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > /// { /// self.pharos.observe( options ) /// } @@ -59,9 +63,9 @@ use crate :: { Filter, Events }; /// async fn task() /// { /// let mut foo = Foo { pharos: Pharos::default() }; -/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ); +/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ).expect( "observe" ); /// -/// // will only be notified on errors now +/// // will only be notified on errors thanks to the filter. /// // /// let next_error = errors.next().await; /// } @@ -71,10 +75,20 @@ pub trait Observable where Event: Clone + 'static + Send , { + /// The error type that is returned if observing is not possible. [Pharos](crate::Pharos) implements Sink + /// which has a close method, so observing will no longer be possible after close is called. + /// + /// Other than that, you might want to have moments in your objects lifetime when you don't want to take + /// any more observers. Returning a result from [observe](Observable::observe) enables that. + /// + /// You can of course map the error of pharos to your own error type. + // + type Error: ErrorTrait; + /// Add an observer to the observable. Options can be in order to choose channel type and /// to filter events with a predicate. // - fn observe( &mut self, options: ObserveConfig ) -> Events; + fn observe( &mut self, options: ObserveConfig ) -> Result, Self::Error>; } diff --git a/src/pharos.rs b/src/pharos.rs index 919363b..caa1fbc 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; +use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Error, ErrorKind }; /// The Pharos lighthouse. When you implement Observable on your type, you can forward @@ -13,27 +13,38 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; /// ## Implementation. /// /// Currently just holds a `Vec>`. It will stop notifying observers if the channel has -/// returned an error, which usually means it is closed or disconnected. However, we currently don't -/// compact the vector or use a more performant data structure for the observers. +/// returned an error, which means it is closed or disconnected. However, we currently don't +/// compact the vector. Slots are reused for new observers, but the vector never shrinks. /// /// In observe, we do loop the vector to find a free spot to re-use before pushing. /// -/// **Note**: we only detect that observers can be removed when [Pharos::notify] or [Pharos::num_observers] +/// **Note**: we only detect that observers can be removed when [futures::SinkExt::send] or [Pharos::num_observers] /// is being called. Otherwise, we won't find out about disconnected observers and the vector of observers /// will not mark deleted observers and thus their slots can not be reused. /// -/// Right now, in notify, we use `join_all` from the futures library to notify all observers concurrently. -/// We take all of our senders out of the options in our vector, operate on them and put them back if -/// they did not generate an error. +/// The [Sink] impl is not very optimized for the moment. It just loops over all observers in each poll method +/// so it will call `poll_ready` and `poll_flush` again for observers that already returned `Poll::Ready(Ok(()))`. /// -/// `join_all` will allocate a new vector on every notify from what our concurrent futures return. Ideally -/// we would use a data structure which allows &mut access to individual elements, so we can work on them -/// concurrently in place without reallocating. I am looking into the partitions crate, but that's for -/// the next release ;). +/// TODO: I will do some benchmarking and see if this can be improved, eg. by keeping a state which tracks which +/// observers we still have to poll. // pub struct Pharos where Event: 'static + Clone + Send { - observers: Vec >>, + // Observers never get moved. Their index stays stable, so that when we free a slot, + // we can store that in `free_slots`. + // + observers : Vec >>, + free_slots: Vec , + state : State , +} + + +#[ derive( Clone, Debug, PartialEq ) ] +// +enum State +{ + Ready, + Closed, } @@ -55,64 +66,20 @@ impl Pharos where Event: 'static + Clone + Send /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers /// it will save allocations by setting this to a higher number. /// + /// TODO: update to pharos 0.4.0 /// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. // pub fn new( capacity: usize ) -> Self { Self { - observers: Vec::with_capacity( capacity ), + observers : Vec::with_capacity( capacity ), + free_slots: Vec::with_capacity( capacity ), + state : State::Ready , } } - - /// Notify all observers of Event `evt`. - /// - /// Currently allocates a new vector for all observers on every run. That will be fixed in future - /// versions. - // - pub async fn notify( &mut self, evt: &Event ) - { - // Try to send to all channels in parallel, so they can all start processing this event - // even if one of them is blocked on a full queue. - // - // We can not have mutable access in parallel, so we take options out and put them back. - // - // The output of the join is a vec of options with the disconnected observers removed. - // - let fut = join_all - ( - self.observers.iter_mut().map( |opt| - { - let opt = opt.take(); - - async - { - let mut new = None; - - if let Some( mut s ) = opt - { - if s.notify( evt ).await - { - new = Some( s ) - } - } - - new - } - - }) - ); - - - // Put back the observers that we "borrowed" - // TODO: compact the vector from time to time? - // - self.observers = fut.await; - } - - /// Returns the size of the vector used to store the observers. Useful for debugging and testing if it /// seems to get to big. // @@ -130,14 +97,20 @@ impl Pharos where Event: 'static + Clone + Send { let mut count = 0; - for opt in self.observers.iter_mut() + + for (i, opt) in self.observers.iter_mut().enumerate() { - if let Some(observer) = opt.take() + if let Some(observer) = opt { if !observer.is_closed() { count += 1; - *opt = Some( observer ); + } + + else + { + self.free_slots.push( i ); + *opt = None } } } @@ -162,31 +135,182 @@ impl Default for Pharos where Event: 'static + Clone + Send impl Observable for Pharos where Event: 'static + Clone + Send { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = Error; + + /// Will try to re-use slots in the vector from disconnected observers. + // + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ); + } + + let (events, sender) = Events::new( options ); - let mut new_observer = Some(sender); - // Try to find a free slot + // Try to reuse a free slot // - for option in &mut self.observers + if let Some( i ) = self.free_slots.pop() { - if option.is_none() + self.observers[i] = Some( sender ); + } + + else + { + self.observers.push( Some( sender ) ); + } + + Ok( events ) + } +} + + + +impl Sink for Pharos where Event: Clone + 'static + Send +{ + type Error = Error; + + + fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ).into(); + } + + + for obs in self.get_mut().observers.iter_mut() + { + if let Some( ref mut o ) = obs { - *option = new_observer.take(); - break; + let res = ready!( Pin::new( o ).poll_ready( cx ) ); + + if res.is_err() + { + *obs = None; + } } } - // no free slots found - // - if new_observer.is_some() + Ok(()).into() + } + + + fn start_send( self: Pin<&mut Self>, evt: Event ) -> Result<(), Self::Error> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ); + } + + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + // if this spot in the vector has a sender + // + if let Some( obs ) = opt + { + // if it's closed, let's remove it. + // + if obs.is_closed() + { + this.free_slots.push( i ); + + *opt = None; + } + + // else if it is interested in this event + // + else if obs.filter( &evt ) + { + // if sending fails, remove it + // + if Pin::new( obs ).start_send( evt.clone() ).is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + } + + Ok(()).into() + } + + + + fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ).into(); + } + + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + if let Some( ref mut obs ) = opt + { + let res = ready!( Pin::new( obs ).poll_flush( cx ) ); + + if res.is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + + Ok(()).into() + } + + + + /// Will close and drop all observers. The pharos object will remain operational however. + /// The main annoyance would be that we'd have to make + // + fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + if self.state == State::Closed + { + return Ok(()).into(); + } + + else { - self.observers.push( new_observer ); + self.state = State::Closed; } - events + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + if let Some( ref mut obs ) = opt + { + let res = ready!( Pin::new( obs ).poll_close( cx ) ); + + if res.is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + + Ok(()).into() } } @@ -199,6 +323,7 @@ impl Observable for Pharos where Event: 'static + Clone + mod tests { use super::*; + use futures::SinkExt; #[test] // @@ -237,28 +362,33 @@ mod tests { let mut ph = Pharos::::default(); - assert_eq!( ph.storage_len (), 0 ); - assert_eq!( ph.num_observers(), 0 ); + assert_eq!( ph.storage_len (), 0 ); + assert_eq!( ph.num_observers (), 0 ); + assert_eq!( ph.free_slots.len(), 0 ); - let mut a = ph.observe( ObserveConfig::default() ); + let mut a = ph.observe( ObserveConfig::default() ).expect( "observe" ); - assert_eq!( ph.storage_len (), 1 ); - assert_eq!( ph.num_observers(), 1 ); + assert_eq!( ph.storage_len (), 1 ); + assert_eq!( ph.num_observers (), 1 ); + assert_eq!( ph.free_slots.len(), 0 ); - let b = ph.observe( ObserveConfig::default() ); + let b = ph.observe( ObserveConfig::default() ).expect( "observe" ); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 2 ); + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers (), 2 ); + assert_eq!( ph.free_slots.len(), 0 ); a.close(); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 1 ); + assert_eq!( ph.storage_len () , 2 ); + assert_eq!( ph.num_observers() , 1 ); + assert_eq!( &ph.free_slots , &[0] ); drop( b ); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 0 ); + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers(), 0 ); + assert_eq!( &ph.free_slots , &[0, 1] ); } @@ -284,18 +414,35 @@ mod tests assert_eq!( ph.num_observers(), 2 ); assert!( ph.observers[1].is_none() ); + assert_eq!( &ph.free_slots, &[1] ); let _d = ph.observe( ObserveConfig::default() ); - assert_eq!( ph.storage_len (), 3 ); - assert_eq!( ph.num_observers(), 3 ); + assert_eq!( ph.storage_len (), 3 ); + assert_eq!( ph.num_observers (), 3 ); + assert_eq!( ph.free_slots.len(), 0 ); let _e = ph.observe( ObserveConfig::default() ); // Now we should have pushed again // - assert_eq!( ph.storage_len (), 4 ); - assert_eq!( ph.num_observers(), 4); + assert_eq!( ph.storage_len (), 4 ); + assert_eq!( ph.num_observers (), 4); + assert_eq!( ph.free_slots.len(), 0 ); + } + + + // verify we can no longer observer after calling close + // + #[test] + // + fn observe_after_close() + { + let mut ph = Pharos::::default(); + + futures::executor::block_on( ph.close() ).expect( "close" ); + + assert!( ph.observe( ObserveConfig::default() ).is_err() ); } } diff --git a/tests/bounded.rs b/tests/bounded.rs index 4f5c1ff..e6f0b90 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -22,7 +22,7 @@ fn basic() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -45,7 +45,7 @@ fn close_receiver() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.sail().await; events.close(); @@ -66,8 +66,8 @@ fn one_receiver_drops() block_on( async move { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ); - let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ); + let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ).expect( "observe" ); + let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ).expect( "observe" ); isis.sail().await; @@ -99,8 +99,8 @@ fn types() // Note that because of the asserts below type inference works here and we don't have to // put type annotation, but I do find it quite obscure and better to be explicit. // - let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.shine().await; isis.sail ().await; @@ -124,8 +124,8 @@ fn threads() block_on( async move { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ); - let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ); + let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); thread::spawn( move || @@ -216,7 +216,7 @@ fn filter() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -249,7 +249,7 @@ fn filter_true() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -285,7 +285,7 @@ fn filter_false() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -325,7 +325,7 @@ fn filter_move() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter_boxed( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; diff --git a/tests/combined.rs b/tests/combined.rs index 75553b9..78748ee 100644 --- a/tests/combined.rs +++ b/tests/combined.rs @@ -15,11 +15,11 @@ fn both() { let mut isis = Goddess::new(); - let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); - let mut ubevts: Events = isis.observe( ObserveConfig::default() ); - let mut ubnuts: Events = isis.observe( ObserveConfig::default() ); + let mut ubevts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut ubnuts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail ().await; isis.shine().await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ff76870..d6e0e0a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -14,7 +14,7 @@ pub mod import channel::mpsc :: Receiver , channel::mpsc :: UnboundedReceiver , executor :: block_on , - stream :: StreamExt , + stream :: StreamExt, SinkExt, }, }; } @@ -43,19 +43,19 @@ impl Goddess pub async fn sail( &mut self ) { - self.isis.notify( &IsisEvent::Sail ).await; + self.isis.send( IsisEvent::Sail ).await.expect( "send event" ); } pub async fn dock( &mut self ) { - self.isis.notify( &IsisEvent::Dock ).await; + self.isis.send( IsisEvent::Dock ).await.expect( "send event" ); } pub async fn shine( &mut self ) { let evt = NutEvent { time: "midnight".into() }; - self.nut.notify( &evt ).await; + self.nut.send( evt ).await.expect( "send event" ); } } @@ -81,7 +81,9 @@ pub struct NutEvent impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { self.isis.observe( options ) } @@ -90,7 +92,9 @@ impl Observable for Goddess impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { self.nut.observe( options ) } diff --git a/tests/unbounded.rs b/tests/unbounded.rs index d103f86..756e3ee 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -23,7 +23,7 @@ fn basic() { let mut isis = Goddess::new(); - let mut events = isis.observe( ObserveConfig::default() ); + let mut events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -46,7 +46,7 @@ fn close_receiver() { let mut isis = Goddess::new(); - let mut events = isis.observe( ObserveConfig::default() ); + let mut events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; events.close(); @@ -68,8 +68,8 @@ fn one_receiver_drops() { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( ObserveConfig::default() ); - let mut shine_evts = isis.observe( ObserveConfig::default() ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; @@ -100,8 +100,8 @@ fn types() { let mut isis = Goddess::new(); - let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ); - let mut shine_evts: Events = isis.observe( ObserveConfig::default() ); + let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail ().await; isis.shine().await; @@ -125,8 +125,8 @@ fn threads() { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( ObserveConfig::default() ); - let mut shine_evts = isis.observe( ObserveConfig::default() ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); thread::spawn( move || @@ -159,7 +159,7 @@ fn alot_of_events() { let mut w = Goddess::new(); - let mut events = w.observe( ObserveConfig::default() ); + let mut events = w.observe( ObserveConfig::default() ).expect( "observe" ); let amount = 1000; @@ -200,7 +200,7 @@ fn filter() } }; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -230,7 +230,7 @@ fn filter_true() let filter = |_: &IsisEvent| true; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -263,7 +263,7 @@ fn filter_false() let filter = |_: &IsisEvent| false; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -300,7 +300,7 @@ fn filter_move() } }; - let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; From d81799b08046009481b794c91bb6c2ddeeb2d9bf Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Tue, 24 Sep 2019 19:53:15 +0200 Subject: [PATCH 03/13] ICE in rustc --- src/pharos.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/pharos.rs b/src/pharos.rs index caa1fbc..863b538 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -255,23 +255,29 @@ impl Sink for Pharos where Event: Clone + 'static + Send } - let this = self.get_mut(); + let mut pending = false; + let this = self.get_mut(); for (i, opt) in this.observers.iter_mut().enumerate() { if let Some( ref mut obs ) = opt { - let res = ready!( Pin::new( obs ).poll_flush( cx ) ); - - if res.is_err() + match Pin::new( obs ).poll_flush( cx ) { - this.free_slots.push( i ); + Poll::Pending => pending = true , + Poll::Ready(Ok()) => continue , - *opt = None; + Poll::Ready(Err(_)) => + { + this.free_slots.push( i ); + + *opt = None; + } } } } + Ok(()).into() } From 8567dc9c743b4b1ae5d939af74d66647beac079d Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Tue, 24 Sep 2019 20:37:13 +0200 Subject: [PATCH 04/13] Update some comments --- src/events.rs | 4 ++-- src/pharos.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/events.rs b/src/events.rs index a645f1d..e40fcc4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -2,7 +2,7 @@ use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; /// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). /// -/// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>() == 16` +/// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>() == 16` // #[ derive( Debug ) ] // @@ -66,7 +66,7 @@ impl Stream for Events where Event: Clone + 'static + Send /// The sender of the channel. -/// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>() == 56` +/// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>() == 56` // pub(crate) enum Sender where Event: Clone + 'static + Send { diff --git a/src/pharos.rs b/src/pharos.rs index 863b538..75cd738 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -66,8 +66,7 @@ impl Pharos where Event: 'static + Clone + Send /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers /// it will save allocations by setting this to a higher number. /// - /// TODO: update to pharos 0.4.0 - /// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. + /// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. // pub fn new( capacity: usize ) -> Self { @@ -345,6 +344,7 @@ mod tests // fn size_of_sender() // { // dbg!( std::mem::size_of::>>() ); + // dbg!( std::mem::size_of::>() ); // } From 44c43343b277ba99073b8d70c547b06f848db863 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Wed, 25 Sep 2019 08:45:22 +0200 Subject: [PATCH 05/13] fix visibility of Error::kind() --- src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/error.rs b/src/error.rs index 2c550e4..3c7977a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,7 +9,7 @@ use crate::{ import::* }; // pub struct Error { - pub(crate) inner: Option< Box >, + pub(crate) inner: Option< Box >, pub(crate) kind : ErrorKind, } @@ -83,7 +83,7 @@ impl Error { /// Allows matching on the error kind // - pub(crate) fn _kind( &self ) -> &ErrorKind + pub fn kind( &self ) -> &ErrorKind { &self.kind } From 273490c43feb5efd4eb881d609fb223a5085ee8d Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Thu, 26 Sep 2019 23:13:35 +0200 Subject: [PATCH 06/13] fix clippy warning --- src/pharos.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pharos.rs b/src/pharos.rs index 75cd738..7a1d86b 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -240,7 +240,7 @@ impl Sink for Pharos where Event: Clone + 'static + Send } } - Ok(()).into() + Ok(()) } From 7c677216ce7aae9db241947b2d11226e489b7d92 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Fri, 27 Sep 2019 00:26:16 +0200 Subject: [PATCH 07/13] Finish flush algorithm now ICE is solved --- src/pharos.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/pharos.rs b/src/pharos.rs index 7a1d86b..d50ecfa 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -254,6 +254,9 @@ impl Sink for Pharos where Event: Clone + 'static + Send } + // We loop over all, polling them all. If any return pending, we return pending. + // If any return an error, we drop them. + // let mut pending = false; let this = self.get_mut(); @@ -264,7 +267,7 @@ impl Sink for Pharos where Event: Clone + 'static + Send match Pin::new( obs ).poll_flush( cx ) { Poll::Pending => pending = true , - Poll::Ready(Ok()) => continue , + Poll::Ready(Ok(_)) => continue , Poll::Ready(Err(_)) => { @@ -276,8 +279,8 @@ impl Sink for Pharos where Event: Clone + 'static + Send } } - - Ok(()).into() + if pending { Poll::Pending } + else { Ok(()).into() } } From 51280c77ba8c378c1b12a80cb3d3700621f6cdaa Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 28 Sep 2019 01:17:46 +0200 Subject: [PATCH 08/13] Document the behavior of the futures channels --- src/observable.rs | 19 ++++++++++++++++++- src/pharos.rs | 2 ++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/observable.rs b/src/observable.rs index df87300..cb8c060 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -99,8 +99,25 @@ pub trait Observable // pub enum Channel { - /// A channel with a limited buffer (the usize parameter). Creates back pressure when the buffer is full. + + /// A channel with a limited message queue (the usize parameter). Creates back pressure when the buffer is full. /// This means that producer tasks may block if consumers can't process fast enough. + /// + /// ## Implementation + /// + /// Some background on the bounded channels from the futures library (that we use as a backend): + /// - the `queue_size` is buffer + num senders (pharos is the only sender in our case). That means + /// that you can set a buffer size of 0 and still send a message in, which is somewhat counter + /// intuitive. It would make sense to do -1 on the user supplied queue_size to compensate, but: + /// - `poll_flush` just calls poll_ready. That means that flush will return Pending if the buffer is full, + /// even though in principle all messages are ready for the reader to read, so flush should be a + /// noop for a channel. It also kind of kills an exact buffer size, because it will make `SinkExt::send` + /// block when you send the last message that fits in the buffer. :-( + /// + /// Conclusion, we don't do the minus one. SinkExt::send works as expected, we don't need to validate + /// against users sending in 0 queue_size, because it's now a valid input, since we take a usize, you + /// can't send in negative numbers. + /// // Bounded(usize), diff --git a/src/pharos.rs b/src/pharos.rs index d50ecfa..0318e3b 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -167,6 +167,8 @@ impl Observable for Pharos where Event: 'static + Clone + +// See the documentation on Channel for how poll functions work for the channels we use. +// impl Sink for Pharos where Event: Clone + 'static + Send { type Error = Error; From 7e13af61d1737e406bedf15b13726a048e6d36ed Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 28 Sep 2019 01:19:42 +0200 Subject: [PATCH 09/13] Allow comparing ErrorKind to &ErrorKind --- src/error.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 3c7977a..d251924 100644 --- a/src/error.rs +++ b/src/error.rs @@ -17,7 +17,7 @@ pub struct Error /// The different kind of errors that can happen when you use the `pharos` API. // -#[ derive( Debug ) ] +#[ derive( Debug, Copy, Clone, PartialEq, Eq ) ] // pub enum ErrorKind { @@ -37,6 +37,23 @@ pub enum ErrorKind } +impl PartialEq<&ErrorKind> for ErrorKind +{ + fn eq( &self, other: &&ErrorKind ) -> bool + { + self == *other + } +} + +impl PartialEq for &ErrorKind +{ + fn eq( &self, other: &ErrorKind ) -> bool + { + *self == other + } +} + + impl ErrorTrait for Error { From 1721a3b30a7657ba45877c9eda3c899fb1744e91 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 28 Sep 2019 01:20:40 +0200 Subject: [PATCH 10/13] Update to futures 0.3.0-alpha.19 --- Cargo.toml | 2 +- Cargo.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 036092f..cee0a72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ repository = "najamelan/pharos" [dependencies] [dependencies.futures-preview] -features = ["async-await", "nightly"] +features = ["async-await"] version = "^0.3.0-alpha" [features] diff --git a/Cargo.yml b/Cargo.yml index 8ea4b6e..2228118 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -43,5 +43,5 @@ badges: dependencies: - futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } + futures-preview : { version: ^0.3.0-alpha, features: [async-await] } From bbd2f6b83d3dd6ecde2c7226767b3de6ea50bc70 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 28 Sep 2019 02:36:39 +0200 Subject: [PATCH 11/13] Add unit tests for the Sink impl of pharos --- Cargo.toml | 3 + Cargo.yml | 3 + src/error.rs | 9 +- src/events.rs | 29 ++++-- src/lib.rs | 8 ++ src/observable.rs | 16 +--- src/pharos.rs | 212 ++++++++++++++++++++++++++++++++++++++++++-- tests/bounded.rs | 2 +- tests/common/mod.rs | 14 +-- 9 files changed, 259 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cee0a72..f2c4b02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,9 @@ repository = "najamelan/pharos" features = ["async-await"] version = "^0.3.0-alpha" +[dev-dependencies] +assert_matches = "^1" + [features] external_doc = [] diff --git a/Cargo.yml b/Cargo.yml index 2228118..173bcea 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -45,3 +45,6 @@ dependencies: futures-preview : { version: ^0.3.0-alpha, features: [async-await] } +dev-dependencies: + + assert_matches : ^1 diff --git a/src/error.rs b/src/error.rs index d251924..0e2849f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -27,10 +27,14 @@ pub enum ErrorKind // SendError, - /// The pharos object is already closed. You can no longer + /// The pharos object is already closed. You can no longer send messages or observe it. // Closed, + /// The minimum valid buffer size for [`Channel::Bounded`] is `1`, you send in `0`. + // + MinChannelSizeOne, + #[ doc( hidden ) ] // __NonExhaustive__ @@ -72,7 +76,8 @@ impl fmt::Display for ErrorKind { match self { - Self::SendError => fmt::Display::fmt( "Channel closed.", f ) , + Self::SendError => fmt::Display::fmt( "Channel closed.", f ) , + Self::MinChannelSizeOne => fmt::Display::fmt( "The minimum valid buffer size for Channel::Bounded is 1, you send in 0.", f ) , _ => unreachable!(), } diff --git a/src/events.rs b/src/events.rs index e40fcc4..3bebc9e 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; +use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error, ErrorKind }; /// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). /// @@ -20,7 +20,7 @@ impl Events where Event: Clone + 'static + Send { Channel::Bounded( queue_size ) => { - let (tx, rx) = mpsc::channel( queue_size ); + let (tx, rx) = mpsc::channel( queue_size - 1 ); ( Sender::Bounded{ tx, filter: config.filter }, Receiver::Bounded{ rx } ) } @@ -194,12 +194,31 @@ impl Sink for Sender where Event: Clone + 'static + Send } - fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + // Note that on futures-rs bounded channels poll_flush has a problematic implementation. + // - it just calls poll_ready, which means it will be pending when the buffer is full. So + // it will make SinkExt::send hang, bad! + // - it will swallow disconnected errors, so we don't get feedback allowing us to free slots. + // + // In principle channels are always flushed, because when the message is in the buffer, it's + // ready for the reader to read. So this should just be a noop. + // + // We compensate for the error swallowing by checking `is_closed`. + // + fn poll_flush( self: Pin<&mut Self>, _cx: &mut Context<'_> ) -> Poll> { match self.get_mut() { - Sender::Bounded { tx, .. } => Pin::new( tx ).poll_flush( cx ).map_err( Into::into ), - Sender::Unbounded{ tx, .. } => Pin::new( tx ).poll_flush( cx ).map_err( Into::into ), + Sender::Bounded { tx, .. } => + { + if tx.is_closed() { Poll::Ready(Err( ErrorKind::Closed.into() ))} + else { Poll::Ready(Ok ( () ))} + } + + Sender::Unbounded{ tx, .. } => + { + if tx.is_closed() { Poll::Ready(Err( ErrorKind::Closed.into() ))} + else { Poll::Ready(Ok ( () ))} + } } } diff --git a/src/lib.rs b/src/lib.rs index 5529e35..d33f468 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,4 +66,12 @@ mod import } , }, }; + + #[ cfg( test ) ] + // + pub(crate) use + { + assert_matches :: { assert_matches } , + futures :: { future::poll_fn, executor::block_on, SinkExt } , + }; } diff --git a/src/observable.rs b/src/observable.rs index cb8c060..6267b94 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -103,21 +103,7 @@ pub enum Channel /// A channel with a limited message queue (the usize parameter). Creates back pressure when the buffer is full. /// This means that producer tasks may block if consumers can't process fast enough. /// - /// ## Implementation - /// - /// Some background on the bounded channels from the futures library (that we use as a backend): - /// - the `queue_size` is buffer + num senders (pharos is the only sender in our case). That means - /// that you can set a buffer size of 0 and still send a message in, which is somewhat counter - /// intuitive. It would make sense to do -1 on the user supplied queue_size to compensate, but: - /// - `poll_flush` just calls poll_ready. That means that flush will return Pending if the buffer is full, - /// even though in principle all messages are ready for the reader to read, so flush should be a - /// noop for a channel. It also kind of kills an exact buffer size, because it will make `SinkExt::send` - /// block when you send the last message that fits in the buffer. :-( - /// - /// Conclusion, we don't do the minus one. SinkExt::send works as expected, we don't need to validate - /// against users sending in 0 queue_size, because it's now a valid input, since we take a usize, you - /// can't send in negative numbers. - /// + /// The minimum valid buffer size is 1. // Bounded(usize), diff --git a/src/pharos.rs b/src/pharos.rs index 0318e3b..156cb27 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Error, ErrorKind }; +use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Error, ErrorKind, Channel }; /// The Pharos lighthouse. When you implement Observable on your type, you can forward @@ -146,6 +146,20 @@ impl Observable for Pharos where Event: 'static + Clone + } + match options.channel + { + Channel::Bounded(queue_size) => + { + if queue_size < 1 + { + return Err( ErrorKind::MinChannelSizeOne.into() ); + } + } + + _ => {} + } + + let (events, sender) = Events::new( options ); @@ -183,12 +197,16 @@ impl Sink for Pharos where Event: Clone + 'static + Send } + // As soon as any is not ready, we are not ready + // for obs in self.get_mut().observers.iter_mut() { if let Some( ref mut o ) = obs { let res = ready!( Pin::new( o ).poll_ready( cx ) ); + // Errors mean disconnected, so drop. + // if res.is_err() { *obs = None; @@ -200,6 +218,7 @@ impl Sink for Pharos where Event: Clone + 'static + Send } + fn start_send( self: Pin<&mut Self>, evt: Event ) -> Result<(), Self::Error> { @@ -327,13 +346,26 @@ impl Sink for Pharos where Event: Clone + 'static + Send - #[ cfg( test ) ] // mod tests { - use super::*; - use futures::SinkExt; + // Tested: + // + // - ✔ debug impl shows generic type + // - ✔ storage length and free slots bookkeeping + // - ✔ observe: we actually reuse free slots + // - ✔ observe: cannot observe after calling close + // - ✔ observe: refuse Channel::Bounded(0) + // - ✔ poll_ready have a channel that is full, verify we return pending. + // - ✔ poll_ready have a channel that is disconnected, verify we drop it. + // - ✔ poll_ready should return closed if the pharos is closed. + // - ✔ start_send verify message arrives + // - ✔ start_send drop disconnected channel + // - ✔ start_send filter message + // - ✔ poll_flush drop on error + // + use crate :: { *, import::* }; #[test] // @@ -403,7 +435,7 @@ mod tests } - // Make sure we are reusing slots + // observe: Make sure we are reusing slots // #[test] // @@ -444,7 +476,7 @@ mod tests } - // verify we can no longer observer after calling close + // observe: verify we can no longer observe after calling close // #[test] // @@ -452,8 +484,172 @@ mod tests { let mut ph = Pharos::::default(); - futures::executor::block_on( ph.close() ).expect( "close" ); + block_on( ph.close() ).expect( "close" ); + + let res = ph.observe( ObserveConfig::default() ); + + assert! ( res.is_err() ); + assert_eq!( ErrorKind::Closed, res.unwrap_err().kind() ); + } + + + // observe: refuse Channel::Bounded(0) + // + #[test] + // + fn observe_refuse_zero() + { + let mut ph = Pharos::::default(); + + let res = ph.observe( Channel::Bounded(0).into() ); + + assert! ( res.is_err() ); + assert_eq!( ErrorKind::MinChannelSizeOne, res.unwrap_err().kind() ); + } + + + // verify that one observer blocks pharos. + // + #[ test ] + // + fn poll_ready_pending() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let mut full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + assert!( ph.as_mut().start_send( true ).is_ok() ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Pending ); + + assert_eq!( Pin::new( &mut full ).poll_next(cx), Poll::Ready(Some(true))); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + + ().into() + })); + } + + + + // pharos drops closed observers. + // + #[ test ] + // + fn poll_ready_drop() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); - assert!( ph.observe( ObserveConfig::default() ).is_err() ); + drop( full ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + + assert!( ph.observers[1].is_none() ); + ().into() + })); + } + + + + // poll_ready should return closed if the pharos is closed. + // + #[ test ] + // + fn poll_ready_closed() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::::default(); + + let mut ph = Pin::new( &mut ph ); + + assert_matches!( ph.as_mut().poll_close( cx ), Poll::Ready(Ok(())) ); + + let res = ph.as_mut().poll_ready( &mut cx ); + + assert_matches!( res, Poll::Ready( Err(_) ) ); + + match res + { + Poll::Ready( Err( e ) ) => assert_eq!( ErrorKind::Closed, e.kind() ), + _ => assert!( false, "wrong result " ), + } + + ().into() + + })); } + + + + // start_send verify message arrives. + // + #[ test ] + // + fn start_send_arrive() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let mut full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + assert_matches!( ph.as_mut().poll_ready( &mut cx ), Poll::Ready( Ok(_) ) ); + assert!( ph.as_mut().start_send( 3 ).is_ok() ); + + assert_eq!( Pin::new( &mut full ).poll_next(cx), Poll::Ready(Some(3))); + + ().into() + })); + } + + + + // pharos drops closed observers. + // + #[ test ] + // + fn poll_flush_drop() + { + block_on( poll_fn( move |mut cx| + { + let mut ph = Pharos::::default(); + + let _open = ph.observe( Channel::Bounded ( 10 ).into() ).expect( "observe" ); + let full = ph.observe( Channel::Bounded ( 1 ).into() ).expect( "observe" ); + let _unbound = ph.observe( Channel::Unbounded .into() ).expect( "observe" ); + + let mut ph = Pin::new( &mut ph ); + + drop( full ); + + assert_matches!( ph.as_mut().poll_flush( &mut cx ), Poll::Ready( Ok(_) ) ); + + assert!( ph.observers[1].is_none() ); + ().into() + })); + } + + + + } diff --git a/tests/bounded.rs b/tests/bounded.rs index e6f0b90..a33b7d9 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -22,7 +22,7 @@ fn basic() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut events = isis.observe( Channel::Bounded( 2 ).into() ).expect( "observe" ); isis.sail().await; isis.sail().await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d6e0e0a..54d711a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,15 +6,17 @@ pub mod import // pub(crate) use { - pharos :: { * } , - std :: { sync::Arc, thread } , + pharos :: { * } , + std :: { sync::Arc, thread, task::{ Context, Poll }, pin::Pin } , futures :: { - channel::mpsc :: Receiver , - channel::mpsc :: UnboundedReceiver , - executor :: block_on , - stream :: StreamExt, SinkExt, + channel::mpsc :: Receiver , + channel::mpsc :: UnboundedReceiver , + executor :: block_on , + stream :: Stream, StreamExt, SinkExt , + sink :: Sink , + future :: poll_fn , }, }; } From 1dfeb11a72d9ac38230dd35ac5db00a985492095 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 28 Sep 2019 11:41:28 +0200 Subject: [PATCH 12/13] update documentation --- Cargo.yml | 4 ++-- README.md | 15 ++++++------ src/error.rs | 61 +++++++++++++++++++++++++---------------------- src/events.rs | 19 ++++++++------- src/filter.rs | 10 ++++---- src/observable.rs | 20 +++++++++------- src/pharos.rs | 22 +++++++++-------- 7 files changed, 82 insertions(+), 69 deletions(-) diff --git a/Cargo.yml b/Cargo.yml index 173bcea..4042e80 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -43,8 +43,8 @@ badges: dependencies: - futures-preview : { version: ^0.3.0-alpha, features: [async-await] } + futures-preview: { version: ^0.3.0-alpha, features: [async-await] } dev-dependencies: - assert_matches : ^1 + assert_matches: ^1 diff --git a/README.md b/README.md index 5997e8c..8a425b0 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,7 @@ More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to. -I created it to leverage interoperability we can create by using async [Stream](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_core/stream/trait.Stream.html) and [Sink](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures/sink/trait.Sink.html -) from the futures library. So you can use all stream combinators, forward it into Sinks and so on. +I created it to leverage interoperability we can create by using async [Stream](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/stream/trait.Stream.html) and [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) from the futures library. So you can use all stream combinators, forward it into Sinks and so on. Minimal rustc version: 1.39. @@ -63,14 +62,14 @@ With [cargo yaml](https://gitlab.com/storedbox/cargo-yaml): ```yaml dependencies: - pharos: ^0.3 + pharos: ^0.4 ``` With raw Cargo.toml ```toml [dependencies] - pharos = "0.3" + pharos = "0.4" ``` ### Upgrade @@ -85,17 +84,17 @@ This crate only has one dependencies. Cargo will automatically handle it's depen ```yaml dependencies: - futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } + futures-preview: { version: ^0.3.0-alpha, features: [async-await, nightly] } ``` ## Usage -`pharos` only works for async code, as the notify method is asynchronous. Observers must consume the messages -fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). +`pharos` only works from async code, implementing Sink to notify observers. You can notify observers from within +`poll_*` methods by calling the poll methods of the [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) impl directly. In async context you can use [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send). Observers must consume the messages fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers. -Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size (eg. there's no point putting an enum without data in an Arc). +Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than 2 pointer sizes (eg. there's no point putting an enum without data in an Arc). Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is the most basic one: diff --git a/src/error.rs b/src/error.rs index 0e2849f..ebfc611 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,7 +3,7 @@ use crate::{ import::* }; /// The error type for errors happening in `pharos`. /// -/// Use [`err.kind()`] to know which kind of error happened. +/// Use [`Error::kind()`] to know which kind of error happened. // #[ derive( Debug ) ] // @@ -15,6 +15,35 @@ pub struct Error +impl Error +{ + /// Identify which error happened. + // + pub fn kind( &self ) -> &ErrorKind + { + &self.kind + } +} + + +impl From for Error +{ + fn from( kind: ErrorKind ) -> Error + { + Error { inner: None, kind } + } +} + +impl From for Error +{ + fn from( inner: FutSendError ) -> Error + { + Error { inner: Some( Box::new( inner ) ), kind: ErrorKind::SendError } + } +} + + + /// The different kind of errors that can happen when you use the `pharos` API. // #[ derive( Debug, Copy, Clone, PartialEq, Eq ) ] @@ -28,10 +57,11 @@ pub enum ErrorKind SendError, /// The pharos object is already closed. You can no longer send messages or observe it. + /// This should only happen if you call [SinkExt::close](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.close) on it. // Closed, - /// The minimum valid buffer size for [`Channel::Bounded`] is `1`, you send in `0`. + /// The minimum valid buffer size for [`Channel::Bounded`](crate::observable::Channel) is `1`, you sent in `0`. // MinChannelSizeOne, @@ -99,30 +129,3 @@ impl fmt::Display for Error } } - - -impl Error -{ - /// Allows matching on the error kind - // - pub fn kind( &self ) -> &ErrorKind - { - &self.kind - } -} - -impl From for Error -{ - fn from( kind: ErrorKind ) -> Error - { - Error { inner: None, kind } - } -} - -impl From for Error -{ - fn from( inner: FutSendError ) -> Error - { - Error { inner: Some( Box::new( inner ) ), kind: ErrorKind::SendError } - } -} diff --git a/src/events.rs b/src/events.rs index 3bebc9e..b5ce49b 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,9 @@ use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error, ErrorKind }; + /// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). +/// You will only start receiving events from the moment you call this. Any events in the observed +/// object emitted before will not be delivered. /// /// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>() == 16` // @@ -40,9 +43,8 @@ impl Events where Event: Clone + 'static + Send } - /// Close the channel. This way the sender will stop sending new events, and you can still - /// continue to read any events that are still pending in the channel. This avoids data loss - /// compared to just dropping this object. + /// Disconnect from the observable object. This way the sender will stop sending new events + /// and you can still continue to read any events that are still pending in the channel. // pub fn close( &mut self ) { @@ -52,7 +54,8 @@ impl Events where Event: Clone + 'static + Send - +// Just forward +// impl Stream for Events where Event: Clone + 'static + Send { type Item = Event; @@ -70,7 +73,7 @@ impl Stream for Events where Event: Clone + 'static + Send // pub(crate) enum Sender where Event: Clone + 'static + Send { - Bounded { tx: FutSender , filter: Option> } , + Bounded { tx: FutSender , filter: Option> } , Unbounded{ tx: FutUnboundedSender, filter: Option> } , } @@ -79,7 +82,7 @@ pub(crate) enum Sender where Event: Clone + 'static + Send impl Sender where Event: Clone + 'static + Send { - // Verify whether this observer is still around + // Verify whether this observer is still around. // pub(crate) fn is_closed( &self ) -> bool { @@ -91,7 +94,7 @@ impl Sender where Event: Clone + 'static + Send } - /// Check whether this sender is interested in this event + /// Check whether this sender is interested in this event. // pub(crate) fn filter( &mut self, evt: &Event ) -> bool { @@ -115,7 +118,7 @@ impl Sender where Event: Clone + 'static + Send -/// The receiver of the channel. +/// The receiver of the channel, abstracting over different channel types. // enum Receiver where Event: Clone + 'static + Send { diff --git a/src/filter.rs b/src/filter.rs index 47aef3c..bd488f7 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,6 +1,8 @@ use crate :: { import::* }; -/// Predicate for filtering events. This is an enum because closures that capture variables from +/// Predicate for filtering events. +/// +/// This is an enum because closures that capture variables from /// their environment need to be boxed. More often than not, an event will be a simple enum and /// the predicate will just match on the variant, so it would be wasteful to impose boxing in those /// cases, hence there is a function pointer variant which does not require boxing. This should @@ -43,11 +45,11 @@ pub enum Filter where Event: Clone + 'static + Send , { - /// A function pointer to use to filter events. + /// A function pointer to a predicate to filter events. // Pointer( fn(&Event) -> bool ), - /// A boxed closure to use to filter events. + /// A boxed closure to a predicate to filter events. // Closure( Box bool + Send> ), } @@ -55,7 +57,7 @@ pub enum Filter impl Filter where Event: Clone + 'static + Send { - /// Invoke the predicate + /// Invoke the predicate. // pub(crate) fn call( &mut self, evt: &Event ) -> bool { diff --git a/src/observable.rs b/src/observable.rs index 6267b94..e6d27c3 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Filter, Events }; +use crate :: { Filter, Events }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. @@ -75,7 +75,10 @@ pub trait Observable where Event: Clone + 'static + Send , { - /// The error type that is returned if observing is not possible. [Pharos](crate::Pharos) implements Sink + /// The error type that is returned if observing is not possible. + /// + /// [Pharos](crate::Pharos) implements + /// [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) /// which has a close method, so observing will no longer be possible after close is called. /// /// Other than that, you might want to have moments in your objects lifetime when you don't want to take @@ -83,9 +86,9 @@ pub trait Observable /// /// You can of course map the error of pharos to your own error type. // - type Error: ErrorTrait; + type Error: std::error::Error; - /// Add an observer to the observable. Options can be in order to choose channel type and + /// Add an observer to the observable. Options allow chosing the channel type and /// to filter events with a predicate. // fn observe( &mut self, options: ObserveConfig ) -> Result, Self::Error>; @@ -128,8 +131,9 @@ impl Default for Channel -/// Configuration for your event stream, passed to [Observable::observe] when subscribing. -/// This let's you choose the type of [channel](Channel) and let's +/// Configuration for your event stream. +/// +/// Pass to [Observable::observe] when subscribing. This let's you choose the type of [channel](Channel) and let's /// you set a [filter](Filter) to ignore certain events. /// /// ``` @@ -229,7 +233,7 @@ impl ObserveConfig where Event: Clone + 'static + Send } -/// Create a ObserveConfig from a [Channel], getting default values for other options. +/// Create a [ObserveConfig] from a [Channel], getting default values for other options. // impl From for ObserveConfig where Event: Clone + 'static + Send { @@ -240,7 +244,7 @@ impl From for ObserveConfig where Event: Clone + 'static } -/// Create a ObserveConfig from a [Filter], getting default values for other options. +/// Create a [ObserveConfig] from a [Filter], getting default values for other options. // impl From> for ObserveConfig where Event: Clone + 'static + Send { diff --git a/src/pharos.rs b/src/pharos.rs index 156cb27..885ef2e 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,8 +1,8 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Error, ErrorKind, Channel }; -/// The Pharos lighthouse. When you implement Observable on your type, you can forward -/// the [`observe`](Observable::observe) method to Pharos and call notify on it. +/// The Pharos lighthouse. When you implement [Observable] on your type, you can forward +/// the [`observe`](Observable::observe) method to Pharos and use [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send) to notify observers. /// /// You can of course create several `Pharos` (I know, historical sacrilege) for (different) types /// of events. @@ -12,17 +12,16 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Err /// /// ## Implementation. /// -/// Currently just holds a `Vec>`. It will stop notifying observers if the channel has +/// Currently just holds a `Vec>`. It will drop observers if the channel has /// returned an error, which means it is closed or disconnected. However, we currently don't /// compact the vector. Slots are reused for new observers, but the vector never shrinks. /// -/// In observe, we do loop the vector to find a free spot to re-use before pushing. -/// -/// **Note**: we only detect that observers can be removed when [futures::SinkExt::send] or [Pharos::num_observers] +/// **Note**: we only detect that observers can be removed when [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send) or [Pharos::num_observers] /// is being called. Otherwise, we won't find out about disconnected observers and the vector of observers /// will not mark deleted observers and thus their slots can not be reused. /// -/// The [Sink] impl is not very optimized for the moment. It just loops over all observers in each poll method +/// The [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) impl +/// is not very optimized for the moment. It just loops over all observers in each poll method /// so it will call `poll_ready` and `poll_flush` again for observers that already returned `Poll::Ready(Ok(()))`. /// /// TODO: I will do some benchmarking and see if this can be improved, eg. by keeping a state which tracks which @@ -63,7 +62,7 @@ impl Pharos where Event: 'static + Clone + Send { /// Create a new Pharos. May it's light guide you to safe harbor. /// - /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers + /// You can set the initial capacity of the vector of observers, if you know you will a lot of observers /// it will save allocations by setting this to a higher number. /// /// For pharos 0.4.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. @@ -88,7 +87,7 @@ impl Pharos where Event: 'static + Clone + Send } - /// Returns the number of actual observers that are still listening (have not closed or dropped the Events). + /// Returns the number of actual observers that are still listening (have not closed or dropped the [Events]). /// This will loop and it will verify for each if they are closed, clearing them from the internal storage /// if they are closed. This is similar to what notify does, but without sending an event. // @@ -136,7 +135,10 @@ impl Observable for Pharos where Event: 'static + Clone + { type Error = Error; - /// Will try to re-use slots in the vector from disconnected observers. + /// Will re-use slots from disconnected observers to avoid growing to much. + /// + /// TODO: provide API for the client to compact the pharos object after reducing the + /// number of observers. // fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { From a886ab394016d9c55e2b36defc5f46a0e480cead Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sat, 28 Sep 2019 11:48:45 +0200 Subject: [PATCH 13/13] bump version and changlog --- CHANGELOG.md | 8 ++++++++ Cargo.toml | 2 +- Cargo.yml | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f904c1..542b359 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Pharos Changelog +## 0.4.0 - 2019-09-28 + +- **BREAKING CHANGE**: The notify function had a sub optimal implemetation and did not allow notifying observers + from within `poll_*` functions. It has been replaced with an implementation of Sink on Pharos. +- got rid of dependency on pin_project. +- as Error::kind returns a reference to the error kind, you can now compare `ErrorKind::SomeVariant == err.kind()` without having to write: `&ErrorKind::SomeVariant == err.kind()`. +- updated to futures-preview 0.3.0-alpha.19 + ## 0.3.2 - 2019-09-23 - check spelling diff --git a/Cargo.toml b/Cargo.toml index f2c4b02..497562c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ license = "Unlicense" name = "pharos" readme = "README.md" repository = "https://github.com/najamelan/pharos" -version = "0.3.2" +version = "0.4.0" [package.metadata] [package.metadata.docs] diff --git a/Cargo.yml b/Cargo.yml index 4042e80..f885989 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -10,7 +10,7 @@ package: # - merge dev branch into master # - create git tag # - version : 0.3.2 + version : 0.4.0 name : pharos authors : [ Naja Melan ] edition : '2018'