From c6575cc53a039e8c6141e5788ee7712cd7b92c6d Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Thu, 19 Sep 2019 16:14:38 +0200 Subject: [PATCH 01/14] Cleanup tests --- TODO.md | 2 + src/pharos.rs | 3 +- tests/bounded.rs | 110 +++++++++----------------------------------- tests/common/mod.rs | 7 ++- tests/unbounded.rs | 99 +++++++-------------------------------- 5 files changed, 46 insertions(+), 175 deletions(-) diff --git a/TODO.md b/TODO.md index baaccec..e871cb9 100644 --- a/TODO.md +++ b/TODO.md @@ -11,3 +11,5 @@ - switch to more performant channels (crossbeam). Will be easier once they provide an async api. - allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks? + +- scaling? For now we have an ever growing vector of observers diff --git a/src/pharos.rs b/src/pharos.rs index 6e9aba6..cbf733a 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -6,8 +6,6 @@ use crate :: { import::* }; /// /// You can of course create several `Pharos` (I know, historical sacrilege) for (different) types /// of events. -/// -// TODO: why do we require Send? // #[ derive( Clone, Debug ) ] // @@ -116,6 +114,7 @@ impl Pharos // Put back the observers that we "borrowed" + // TODO: compact the vector from time to time? // *observers = fut.await; } diff --git a/tests/bounded.rs b/tests/bounded.rs index 63c4721..533b4c4 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -2,8 +2,6 @@ // // - ✔ basic functionality // - ✔ test closing senders/receivers? -// - ✔ multiple observers + names coming back correctly -// - ✔ same names // - ✔ send events of 2 types from one object + something other than an enum without data // - ✔ accross threads // - TODO: test that sender task blocks when observer falls behind (see comments below) @@ -14,25 +12,16 @@ mod common; use common::{ *, import::* }; -fn run( task: impl Future + 'static ) -{ - let mut pool = LocalPool::new(); - let mut exec = pool.spawner(); - - exec.spawn_local( task ).expect( "Spawn task" ); - pool.run(); -} - #[ test ] // fn basic() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut events: Receiver = isis.observe( 5 ); + let mut events = isis.observe( 5 ); isis.sail().await; isis.sail().await; @@ -52,11 +41,11 @@ fn basic() // fn close_receiver() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut events: Receiver = isis.observe( 5 ); + let mut events = isis.observe( 5 ); isis.sail().await; events.close(); @@ -74,101 +63,48 @@ fn close_receiver() // fn one_receiver_drops() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut egypt_evts: Receiver = isis.observe( 1 ); - let mut shine_evts: Receiver = isis.observe( 2 ); + let mut egypt_evts = isis.observe( 1 ); + let mut shine_evts = isis.observe( 2 ); isis.sail().await; - let shine_evt = shine_evts.next().await.unwrap(); - let egypt_evt = egypt_evts.next().await.unwrap(); - - assert_eq!( IsisEvent::Sail, shine_evt ); - assert_eq!( IsisEvent::Sail, egypt_evt ); + assert_eq!( IsisEvent::Sail, shine_evts.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, egypt_evts.next().await.unwrap() ); drop( egypt_evts ); isis.sail().await; isis.sail().await; - let shine_evt = shine_evts.next().await.unwrap(); - assert_eq!( IsisEvent::Sail, shine_evt ); - - let shine_evt = shine_evts.next().await.unwrap(); - assert_eq!( IsisEvent::Sail, shine_evt ); + assert_eq!( IsisEvent::Sail, shine_evts.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, shine_evts.next().await.unwrap() ); }); } -// Have two receivers with different names on the same object and verify that the names are correct on reception. -// -#[ test ] -// -fn names() -{ - run( async move - { - let mut isis = Godess::new(); - - let mut egypt_evts: Receiver = isis.observe( 5 ); - let mut shine_evts: Receiver = isis.observe( 5 ); - - isis.sail().await; - - let shine_evt = shine_evts.next().await.unwrap(); - let egypt_evt = egypt_evts.next().await.unwrap(); - - assert_eq!( IsisEvent::Sail, shine_evt ); - assert_eq!( IsisEvent::Sail, egypt_evt ); - }); -} - - - -// Verify that several observers can set the same name. -// -#[ test ] -// -fn same_names() -{ - run( async move - { - let mut isis = Godess::new(); - - let mut egypt_evts: Receiver = isis.observe( 5 ); - let mut shine_evts: Receiver = isis.observe( 5 ); - - isis.sail().await; - - let shine_evt = shine_evts.next().await.unwrap(); - let egypt_evt = egypt_evts.next().await.unwrap(); - - assert_eq!( IsisEvent::Sail, shine_evt ); - assert_eq!( IsisEvent::Sail, egypt_evt ); - }); -} - - - -// Send different types of objects, and send a struct with data rather than just an enum +// Send different types of events from same observable, and send a struct with data rather than just an enum // #[ test ] // fn types() { - run( async move + block_on( async move { let mut isis = Godess::new(); + // Note that because of the asserts below type inference works here and we don't have to + // put type annotation, but I do find it quite obscure and better to be explicit. + // + let mut shine_evts: Receiver< NutEvent> = isis.observe( 5 ); let mut egypt_evts: Receiver = isis.observe( 5 ); - let mut shine_evts: Receiver = isis.observe( 5 ); - isis.sail ().await; isis.shine().await; + isis.sail ().await; let shine_evt = shine_evts.next().await.unwrap(); let egypt_evt = egypt_evts.next().await.unwrap(); @@ -186,17 +122,17 @@ fn types() // fn threads() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut egypt_evts: Receiver = isis.observe( 5 ); - let mut shine_evts: Receiver = isis.observe( 5 ); + let mut egypt_evts = isis.observe( 5 ); + let mut shine_evts = isis.observe( 5 ); thread::spawn( move || { - run( async move + block_on( async move { isis.sail ().await; isis.shine().await; @@ -255,6 +191,6 @@ fn block_sender() exec.spawn_local( receiver ).expect( "Spawn receiver" ); exec.spawn_local( remote ).unwrap(); - pool.run(); + pool.block_on(); } */ diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d6038fa..9d7fa92 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,15 +2,14 @@ pub mod import { pub(crate) use { - pharos :: { * } , - std :: { sync::Arc, future::Future, thread } , + pharos :: { * } , + std :: { sync::Arc, thread } , futures :: { channel::mpsc :: Receiver , channel::mpsc :: UnboundedReceiver , - executor :: LocalPool , - task :: LocalSpawnExt , + executor :: block_on , stream :: StreamExt , }, }; diff --git a/tests/unbounded.rs b/tests/unbounded.rs index a5c4c98..ea06ff6 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -2,9 +2,7 @@ // // - ✔ basic functionality // - ✔ test closing senders/receivers? -// - ✔ multiple observers + names coming back correctly // - ✔ multiple observers + one drops, others continue to see messages -// - ✔ same names // - ✔ send events of 2 types from one object + something other than an enum without data // - ✔ accross threads // - ✔ test big number of events @@ -15,25 +13,15 @@ mod common; use common::{ *, import::* }; -fn run( task: impl Future + 'static ) -{ - let mut pool = LocalPool::new(); - let mut exec = pool.spawner(); - - exec.spawn_local( task ).expect( "Spawn task" ); - pool.run(); -} - - #[ test ] // fn basic() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut events: UnboundedReceiver = isis.observe_unbounded(); + let mut events = isis.observe_unbounded(); isis.sail().await; isis.sail().await; @@ -52,11 +40,11 @@ fn basic() // fn close_receiver() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut events: UnboundedReceiver = isis.observe_unbounded(); + let mut events = isis.observe_unbounded(); isis.sail().await; events.close(); @@ -74,12 +62,12 @@ fn close_receiver() // fn one_receiver_drops() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded(); - let mut shine_evts: UnboundedReceiver = isis.observe_unbounded(); + let mut egypt_evts = isis.observe_unbounded(); + let mut shine_evts = isis.observe_unbounded(); isis.sail().await; @@ -87,79 +75,26 @@ fn one_receiver_drops() let egypt_evt = egypt_evts.next().await.unwrap(); assert_eq!( IsisEvent::Sail, shine_evt ); - assert_eq!( IsisEvent::Sail , egypt_evt ); + assert_eq!( IsisEvent::Sail, egypt_evt ); drop( egypt_evts ); isis.sail().await; isis.sail().await; - let shine_evt = shine_evts.next().await.unwrap(); - assert_eq!( IsisEvent::Sail, shine_evt ); - - let shine_evt = shine_evts.next().await.unwrap(); - assert_eq!( IsisEvent::Sail, shine_evt ); - }); -} - - -// Have two receivers with different names on the same object and verify that the names are correct on reception. -// -#[ test ] -// -fn names() -{ - run( async move - { - let mut isis = Godess::new(); - - let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded(); - let mut shine_evts: UnboundedReceiver = isis.observe_unbounded(); - - isis.sail().await; - - let shine_evt = shine_evts.next().await.unwrap(); - let egypt_evt = egypt_evts.next().await.unwrap(); - - assert_eq!( IsisEvent::Sail, shine_evt ); - assert_eq!( IsisEvent::Sail, egypt_evt ); - }); -} - - - -// Verify that several observers can set the same name. -// -#[ test ] -// -fn same_names() -{ - run( async move - { - let mut isis = Godess::new(); - - let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded(); - let mut shine_evts: UnboundedReceiver = isis.observe_unbounded(); - - isis.sail().await; - - let shine_evt = shine_evts.next().await.unwrap(); - let egypt_evt = egypt_evts.next().await.unwrap(); - - assert_eq!( IsisEvent::Sail, shine_evt ); - assert_eq!( IsisEvent::Sail, egypt_evt ); + assert_eq!( IsisEvent::Sail, shine_evts.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, shine_evts.next().await.unwrap() ); }); } - // Send different types of objects, and send a struct with data rather than just an enum // #[ test ] // fn types() { - run( async move + block_on( async move { let mut isis = Godess::new(); @@ -184,17 +119,17 @@ fn types() // fn threads() { - run( async move + block_on( async move { let mut isis = Godess::new(); - let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded(); - let mut shine_evts: UnboundedReceiver = isis.observe_unbounded(); + let mut egypt_evts = isis.observe_unbounded(); + let mut shine_evts = isis.observe_unbounded(); thread::spawn( move || { - run( async move + block_on( async move { isis.sail ().await; isis.shine().await; @@ -218,11 +153,11 @@ fn threads() // fn alot_of_events() { - run( async move + block_on( async move { let mut w = Godess::new(); - let mut events: UnboundedReceiver = w.observe_unbounded(); + let mut events = w.observe_unbounded(); let amount = 1000; From 16fe749d530e5897659894842ad3efa9d1ea8c55 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Thu, 19 Sep 2019 18:59:16 +0200 Subject: [PATCH 02/14] Run bounded and unbounded notify concurrently and add an integration test for that --- Cargo.toml | 1 + Cargo.yml | 2 +- src/lib.rs | 2 ++ src/pharos.rs | 6 ++++-- tests/combined.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 tests/combined.rs diff --git a/Cargo.toml b/Cargo.toml index 86fbef0..866057a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ repository = "najamelan/pharos" [dependencies] [dependencies.futures-preview] +features = ["async-await", "nightly"] version = "^0.3.0-alpha" [features] diff --git a/Cargo.yml b/Cargo.yml index 69f74e7..d1b7733 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -42,5 +42,5 @@ badges: dependencies: - futures-preview: { version: ^0.3.0-alpha } + futures-preview: { version: ^0.3.0-alpha, features: [async-await, nightly] } diff --git a/src/lib.rs b/src/lib.rs index ccba2c5..b82c374 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,8 @@ mod import { futures :: { + join, + future::{ join_all }, Sink, SinkExt, channel::mpsc:: diff --git a/src/pharos.rs b/src/pharos.rs index cbf733a..53029a4 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -60,8 +60,10 @@ impl Pharos // pub async fn notify<'a>( &'a mut self, evt: &'a Event ) { - Self::notify_inner( &mut self.unbounded, &evt ).await; - Self::notify_inner( &mut self.observers, &evt ).await; + let unbound = Self::notify_inner( &mut self.unbounded, &evt ); + let bounded = Self::notify_inner( &mut self.observers, &evt ); + + join!( unbound, bounded ); } diff --git a/tests/combined.rs b/tests/combined.rs new file mode 100644 index 0000000..92151a9 --- /dev/null +++ b/tests/combined.rs @@ -0,0 +1,49 @@ +// Tested: +// +// - ✔ an object which has both bounded and unbounded observers. +// +mod common; + +use common::{ *, import::* }; + + +#[ test ] +// +fn both() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let mut events: Receiver = isis.observe( 5, None ) ; + let mut ubevts: UnboundedReceiver = isis.observe_unbounded( None) ; + + let mut nuevts: Receiver = isis.observe( 5, None ) ; + let mut ubnuts: UnboundedReceiver = isis.observe_unbounded( None) ; + + isis.sail ().await; + isis.shine().await; + isis.sail ().await; + isis.shine().await; + + drop( isis ); + + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + + assert_eq!( IsisEvent::Sail, ubevts.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, ubevts.next().await.unwrap() ); + assert_eq!( None , ubevts.next().await ); + + let nut_event = NutEvent { time: "midnight".into() }; + + assert_eq!( nut_event, nuevts.next().await.unwrap() ); + assert_eq!( nut_event, nuevts.next().await.unwrap() ); + assert_eq!( None , nuevts.next().await ); + + assert_eq!( nut_event, ubnuts.next().await.unwrap() ); + assert_eq!( nut_event, ubnuts.next().await.unwrap() ); + assert_eq!( None , ubnuts.next().await ); + }); +} From b8f79afe09cc9998c775eb35d53ba6962abad14e Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Thu, 19 Sep 2019 22:49:40 +0200 Subject: [PATCH 03/14] Add filter functionality, yipy --- examples/basic.rs | 6 +-- src/lib.rs | 6 +++ src/observable.rs | 9 ++-- src/pharos.rs | 98 +++++++++++++++++++++-------------- tests/bounded.rs | 119 ++++++++++++++++++++++++++++++++++++++---- tests/common/mod.rs | 28 ++++++---- tests/unbounded.rs | 123 ++++++++++++++++++++++++++++++++++++++++---- 7 files changed, 315 insertions(+), 74 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index d619a56..5a23bce 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -53,9 +53,9 @@ enum GodessEvent // impl Observable for Godess { - fn observe( &mut self, queue_size: usize ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option> ) -> Receiver { - self.pharos.observe( queue_size ) + self.pharos.observe( queue_size, predicate ) } } @@ -71,7 +71,7 @@ fn main() // subscribe // - let mut events = isis.observe( 3 ); + let mut events = isis.observe( 3, None ); // trigger an event // diff --git a/src/lib.rs b/src/lib.rs index b82c374..2461b85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,11 +38,17 @@ pub use }; +/// The type of predicates used to filter events. +// +pub type Predicate = Box bool + Send >; + mod import { pub(crate) use { + std:: { fmt }, + futures :: { join, diff --git a/src/observable.rs b/src/observable.rs index 539eddd..cbbbad2 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -1,4 +1,4 @@ -use crate :: { import::* }; +use crate :: { import::*, Predicate }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. @@ -10,8 +10,11 @@ pub trait Observable /// Add an observer to the observable. This will use a bounded channel of the size of `queue_size`. /// Note that the use of a bounded channel provides backpressure and can slow down the observed /// task. + /// + /// The predicate parameter allows filtering the events that should be send to this observer. + /// It receives a reference to the event. If the predicate returns true, it will be sent. // - fn observe( &mut self, queue_size: usize ) -> Receiver; + fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver; } @@ -25,5 +28,5 @@ pub trait UnboundedObservable /// Add an observer to the observable. This will use an unbounded channel. Beware that if the observable /// outpaces the observer, this will lead to growing memory consumption over time. // - fn observe_unbounded( &mut self ) -> UnboundedReceiver; + fn observe_unbounded( &mut self, predicate: Option> ) -> UnboundedReceiver; } diff --git a/src/pharos.rs b/src/pharos.rs index 53029a4..fc80dec 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,4 +1,4 @@ -use crate :: { import::* }; +use crate :: { import::*, Observable, UnboundedObservable, Predicate }; /// The Pharos lighthouse. When you implement Observable on your type, you can forward @@ -7,51 +7,33 @@ use crate :: { import::* }; /// You can of course create several `Pharos` (I know, historical sacrilege) for (different) types /// of events. // -#[ derive( Clone, Debug ) ] -// pub struct Pharos { - observers: Vec >>, - unbounded: Vec >>, + observers: Vec, Option>) >>, + unbounded: Vec, Option>) >>, } -impl Pharos -{ - /// Create a new Pharos. May it's light guide you to safe harbour. - // - pub fn new() -> Self - { - Self::default() - } - - - /// Add an observer to the pharos. This will use a bounded channel of the size of `queue_size`. - /// Note that the use of a bounded channel provides backpressure and can slow down the observed - /// task. - // - pub fn observe( &mut self, queue_size: usize ) -> Receiver +// TODO: figure out what we really want here... +// +impl fmt::Debug for Pharos +{ + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - let (tx, rx) = mpsc::channel( queue_size ); - - self.observers.push( Some( tx ) ); - - rx + write!( f, "Pharos" ) } +} - /// Add an observer to the pharos. This will use an unbounded channel. Beware that if the observable - /// outpaces the observer, this will lead to growing memory consumption over time. +impl Pharos +{ + /// Create a new Pharos. May it's light guide you to safe harbour. // - pub fn observe_unbounded( &mut self ) -> UnboundedReceiver + pub fn new() -> Self { - let (tx, rx) = mpsc::unbounded(); - - self.unbounded.push( Some( tx ) ); - - rx + Self::default() } @@ -72,7 +54,7 @@ impl Pharos // async fn notify_inner<'a> ( - observers: &'a mut Vec< Option + Unpin + Clone> > , + observers: &'a mut Vec< Option< (impl Sink + Unpin + Clone, Option>) > > , evt: &'a Event ) { @@ -93,18 +75,25 @@ impl Pharos async move { - if let Some( mut tx ) = opt + if let Some( (mut tx, pre_opt) ) = opt { - // It's disconnected, drop it + // If we have a predicate, run it, otherwise return true. + // we return the predicate, since we need to give it back at the end. + // + let (go, pre_opt2) = pre_opt.map_or( (true, None), |pred| (pred( &evt ), Some(pred)) ); + + + // We count on the send not being executed if go is false. + // If an error is returned, it's disconnected, drop it. // - if tx.send( evt ).await.is_err() + if go && tx.send( evt ).await.is_err() { None } // Put it back after use // - else { Some( tx ) } + else { Some( (tx, pre_opt2) ) } } // It was already none @@ -135,3 +124,36 @@ impl Default for Pharos } } } + + +impl Observable for Pharos +{ + /// Add an observer to the pharos. This will use a bounded channel of the size of `queue_size`. + /// Note that the use of a bounded channel provides backpressure and can slow down the observed + /// task. + // + fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver + { + let (tx, rx) = mpsc::channel( queue_size ); + + self.observers.push( Some(( tx, predicate )) ); + + rx + } +} + + +impl UnboundedObservable for Pharos +{ + /// Add an observer to the pharos. This will use an unbounded channel. Beware that if the observable + /// outpaces the observer, this will lead to growing memory consumption over time. + // + fn observe_unbounded( &mut self, predicate: Option< Predicate > ) -> UnboundedReceiver + { + let (tx, rx) = mpsc::unbounded(); + + self.unbounded.push( Some(( tx, predicate )) ); + + rx + } +} diff --git a/tests/bounded.rs b/tests/bounded.rs index 533b4c4..f0895e0 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -5,8 +5,10 @@ // - ✔ send events of 2 types from one object + something other than an enum without data // - ✔ accross threads // - TODO: test that sender task blocks when observer falls behind (see comments below) - - +// - ✔ Basic filter usage, only one event type should be returned. +// - ✔ A filter that always returns true should get all events. +// - ✔ A filter that always returns false should not get any events. +// mod common; use common::{ *, import::* }; @@ -21,7 +23,7 @@ fn basic() { let mut isis = Godess::new(); - let mut events = isis.observe( 5 ); + let mut events = isis.observe( 5, None ); isis.sail().await; isis.sail().await; @@ -45,7 +47,7 @@ fn close_receiver() { let mut isis = Godess::new(); - let mut events = isis.observe( 5 ); + let mut events = isis.observe( 5, None ); isis.sail().await; events.close(); @@ -67,8 +69,8 @@ fn one_receiver_drops() { let mut isis = Godess::new(); - let mut egypt_evts = isis.observe( 1 ); - let mut shine_evts = isis.observe( 2 ); + let mut egypt_evts = isis.observe( 1, None ); + let mut shine_evts = isis.observe( 2, None ); isis.sail().await; @@ -100,8 +102,8 @@ fn types() // Note that because of the asserts below type inference works here and we don't have to // put type annotation, but I do find it quite obscure and better to be explicit. // - let mut shine_evts: Receiver< NutEvent> = isis.observe( 5 ); - let mut egypt_evts: Receiver = isis.observe( 5 ); + let mut shine_evts: Receiver< NutEvent> = isis.observe( 5, None ); + let mut egypt_evts: Receiver = isis.observe( 5, None ); isis.shine().await; isis.sail ().await; @@ -126,8 +128,8 @@ fn threads() { let mut isis = Godess::new(); - let mut egypt_evts = isis.observe( 5 ); - let mut shine_evts = isis.observe( 5 ); + let mut egypt_evts = isis.observe( 5, None ); + let mut shine_evts = isis.observe( 5, None ); thread::spawn( move || @@ -194,3 +196,100 @@ fn block_sender() pool.block_on(); } */ + + +// Basic filter usage, only Dock should be returned. +// +#[ test ] +// +fn filter() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let filter = Box::new( |evt: &IsisEvent| + { + match evt + { + IsisEvent::Sail => false, + IsisEvent::Dock => true , + } + }); + + let mut events: Receiver = isis.observe( 5, Some(filter) ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + }); +} + + + +// A filter that always returns true should get all events. +// +#[ test ] +// +fn filter_true() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let filter = Box::new( |_: &IsisEvent| true ); + + let mut events: Receiver = isis.observe( 5, Some(filter) ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + }); +} + + + +// A filter that always returns false should not get any events. +// +#[ test ] +// +fn filter_false() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let filter = Box::new( |_: &IsisEvent| false ); + + let mut events: Receiver = isis.observe( 5, Some(filter) ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( None, events.next().await ); + }); +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 9d7fa92..5b0af3c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,5 +1,9 @@ +#![ allow( dead_code ) ] + pub mod import { + #[ allow( unused_imports )] + // pub(crate) use { pharos :: { * } , @@ -42,6 +46,11 @@ impl Godess self.isis.notify( &IsisEvent::Sail ).await; } + pub async fn dock( &mut self ) + { + self.isis.notify( &IsisEvent::Dock ).await; + } + pub async fn shine( &mut self ) { let evt = NutEvent { time: "midnight".into() }; @@ -56,7 +65,8 @@ impl Godess // pub enum IsisEvent { - Sail + Sail, + Dock, } @@ -71,35 +81,35 @@ pub struct NutEvent impl Observable for Godess { - fn observe( &mut self, queue_size: usize ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver { - self.isis.observe( queue_size ) + self.isis.observe( queue_size, predicate ) } } impl Observable for Godess { - fn observe( &mut self, queue_size: usize ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver { - self.nut.observe( queue_size ) + self.nut.observe( queue_size, predicate ) } } impl UnboundedObservable for Godess { - fn observe_unbounded( &mut self ) -> UnboundedReceiver + fn observe_unbounded( &mut self, predicate: Option< Predicate > ) -> UnboundedReceiver { - self.isis.observe_unbounded() + self.isis.observe_unbounded( predicate ) } } impl UnboundedObservable for Godess { - fn observe_unbounded( &mut self ) -> UnboundedReceiver + fn observe_unbounded( &mut self, predicate: Option< Predicate > ) -> UnboundedReceiver { - self.nut.observe_unbounded() + self.nut.observe_unbounded( predicate ) } } diff --git a/tests/unbounded.rs b/tests/unbounded.rs index ea06ff6..ba6f7c9 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -6,8 +6,10 @@ // - ✔ send events of 2 types from one object + something other than an enum without data // - ✔ accross threads // - ✔ test big number of events - - +// - ✔ Basic filter usage, only one event type should be returned. +// - ✔ A filter that always returns true should get all events. +// - ✔ A filter that always returns false should not get any events. +// mod common; use common::{ *, import::* }; @@ -21,7 +23,7 @@ fn basic() { let mut isis = Godess::new(); - let mut events = isis.observe_unbounded(); + let mut events = isis.observe_unbounded( None ); isis.sail().await; isis.sail().await; @@ -44,7 +46,7 @@ fn close_receiver() { let mut isis = Godess::new(); - let mut events = isis.observe_unbounded(); + let mut events = isis.observe_unbounded( None ); isis.sail().await; events.close(); @@ -66,8 +68,8 @@ fn one_receiver_drops() { let mut isis = Godess::new(); - let mut egypt_evts = isis.observe_unbounded(); - let mut shine_evts = isis.observe_unbounded(); + let mut egypt_evts = isis.observe_unbounded( None ); + let mut shine_evts = isis.observe_unbounded( None ); isis.sail().await; @@ -98,8 +100,8 @@ fn types() { let mut isis = Godess::new(); - let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded(); - let mut shine_evts: UnboundedReceiver = isis.observe_unbounded(); + let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded( None ); + let mut shine_evts: UnboundedReceiver = isis.observe_unbounded( None ); isis.sail ().await; isis.shine().await; @@ -123,8 +125,8 @@ fn threads() { let mut isis = Godess::new(); - let mut egypt_evts = isis.observe_unbounded(); - let mut shine_evts = isis.observe_unbounded(); + let mut egypt_evts = isis.observe_unbounded( None ); + let mut shine_evts = isis.observe_unbounded( None ); thread::spawn( move || @@ -157,7 +159,7 @@ fn alot_of_events() { let mut w = Godess::new(); - let mut events = w.observe_unbounded(); + let mut events = w.observe_unbounded( None ); let amount = 1000; @@ -174,3 +176,102 @@ fn alot_of_events() } }); } + + + + +// Basic filter usage, only Dock should be returned. +// +#[ test ] +// +fn filter() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let filter = Box::new( |evt: &IsisEvent| + { + match evt + { + IsisEvent::Sail => false, + IsisEvent::Dock => true , + } + }); + + let mut events: UnboundedReceiver = isis.observe_unbounded( Some(filter) ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + }); +} + + + +// A filter that always returns true should get all events. +// +#[ test ] +// +fn filter_true() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let filter = Box::new( |_: &IsisEvent| true ); + + let mut events: UnboundedReceiver = isis.observe_unbounded( Some(filter) ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Sail, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + }); +} + + + +// A filter that always returns false should not get any events. +// +#[ test ] +// +fn filter_false() +{ + block_on( async move + { + let mut isis = Godess::new(); + + let filter = Box::new( |_: &IsisEvent| false ); + + let mut events: UnboundedReceiver = isis.observe_unbounded( Some(filter) ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( None, events.next().await ); + }); +} From 218c6c23e482040a9adc131c83f6152e8d076ffb Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Sun, 22 Sep 2019 17:01:59 +0200 Subject: [PATCH 04/14] Switch to an enum Filter allowing to take both fn pointer and boxed closure as filter --- examples/basic.rs | 2 +- src/filter.rs | 45 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 6 ++--- src/observable.rs | 6 ++--- src/pharos.rs | 28 ++++++++++++++--------- tests/bounded.rs | 55 +++++++++++++++++++++++++++++++++++++++------ tests/common/mod.rs | 8 +++---- tests/unbounded.rs | 55 +++++++++++++++++++++++++++++++++++++++------ 8 files changed, 169 insertions(+), 36 deletions(-) create mode 100644 src/filter.rs diff --git a/examples/basic.rs b/examples/basic.rs index 5a23bce..fbe997f 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -53,7 +53,7 @@ enum GodessEvent // impl Observable for Godess { - fn observe( &mut self, queue_size: usize, predicate: Option> ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option> ) -> Receiver { self.pharos.observe( queue_size, predicate ) } diff --git a/src/filter.rs b/src/filter.rs new file mode 100644 index 0000000..0bc622d --- /dev/null +++ b/src/filter.rs @@ -0,0 +1,45 @@ +use crate :: { import::* }; + +/// Filter events +// +pub enum Filter + + where Event: Clone + 'static + Send , + +{ + /// A function pointer to use to filter events + // + Pointer( fn(&Event) -> bool ), + + /// A boxed closure to use to filter events + // + Closure( Box bool + Send> ), +} + + + +impl fmt::Debug for Filter +{ + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result + { + write!( f, "pharos::Filter" ) + } +} + + +impl From bool> for Filter +{ + fn from( pointer: fn(&Event) -> bool ) -> Self + { + Filter::Pointer( pointer ) + } +} + + +impl From bool + Send>> for Filter +{ + fn from( closure: Box bool + Send> ) -> Self + { + Filter::Closure( closure ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 2461b85..76c7c0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,20 +28,18 @@ mod observable; mod pharos ; +mod filter ; pub use { self::pharos :: { Pharos } , + filter :: { Filter } , observable :: { Observable, UnboundedObservable } , }; -/// The type of predicates used to filter events. -// -pub type Predicate = Box bool + Send >; - mod import { diff --git a/src/observable.rs b/src/observable.rs index cbbbad2..5db9b40 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Predicate }; +use crate :: { import::*, Filter }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. @@ -14,7 +14,7 @@ pub trait Observable /// The predicate parameter allows filtering the events that should be send to this observer. /// It receives a reference to the event. If the predicate returns true, it will be sent. // - fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver; + fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver; } @@ -28,5 +28,5 @@ pub trait UnboundedObservable /// Add an observer to the observable. This will use an unbounded channel. Beware that if the observable /// outpaces the observer, this will lead to growing memory consumption over time. // - fn observe_unbounded( &mut self, predicate: Option> ) -> UnboundedReceiver; + fn observe_unbounded( &mut self, predicate: Option> ) -> UnboundedReceiver; } diff --git a/src/pharos.rs b/src/pharos.rs index fc80dec..7ca00bf 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Observable, UnboundedObservable, Predicate }; +use crate :: { import::*, Observable, UnboundedObservable, Filter }; /// The Pharos lighthouse. When you implement Observable on your type, you can forward @@ -9,8 +9,8 @@ use crate :: { import::*, Observable, UnboundedObservable, Predicate }; // pub struct Pharos { - observers: Vec, Option>) >>, - unbounded: Vec, Option>) >>, + observers: Vec, Option>) >>, + unbounded: Vec, Option>) >>, } @@ -54,7 +54,7 @@ impl Pharos // async fn notify_inner<'a> ( - observers: &'a mut Vec< Option< (impl Sink + Unpin + Clone, Option>) > > , + observers: &'a mut Vec< Option< (impl Sink + Unpin + Clone, Option>) > > , evt: &'a Event ) { @@ -75,13 +75,21 @@ impl Pharos async move { - if let Some( (mut tx, pre_opt) ) = opt + if let Some( (mut tx, filter_opt) ) = opt { - // If we have a predicate, run it, otherwise return true. - // we return the predicate, since we need to give it back at the end. + // If we have a filter, run it, otherwise return true. + // we return the filter, since we need to give it back at the end. // - let (go, pre_opt2) = pre_opt.map_or( (true, None), |pred| (pred( &evt ), Some(pred)) ); + let (go, pre_opt2) = filter_opt.map_or( (true, None), |mut filter| + { + let filtered = match filter + { + Filter::Pointer(ref p) => p( &evt ), + Filter::Closure(ref mut p) => p( &evt ), + }; + (filtered, Some(filter)) + }); // We count on the send not being executed if go is false. // If an error is returned, it's disconnected, drop it. @@ -132,7 +140,7 @@ impl Observable for Pharos /// Note that the use of a bounded channel provides backpressure and can slow down the observed /// task. // - fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver { let (tx, rx) = mpsc::channel( queue_size ); @@ -148,7 +156,7 @@ impl UnboundedObservable for Pharos /// Add an observer to the pharos. This will use an unbounded channel. Beware that if the observable /// outpaces the observer, this will lead to growing memory consumption over time. // - fn observe_unbounded( &mut self, predicate: Option< Predicate > ) -> UnboundedReceiver + fn observe_unbounded( &mut self, predicate: Option< Filter > ) -> UnboundedReceiver { let (tx, rx) = mpsc::unbounded(); diff --git a/tests/bounded.rs b/tests/bounded.rs index f0895e0..e21e924 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -208,16 +208,16 @@ fn filter() { let mut isis = Godess::new(); - let filter = Box::new( |evt: &IsisEvent| + let filter = |evt: &IsisEvent| { match evt { IsisEvent::Sail => false, IsisEvent::Dock => true , } - }); + }; - let mut events: Receiver = isis.observe( 5, Some(filter) ); + let mut events: Receiver = isis.observe( 5, Some( Filter::Pointer(filter) ) ); isis.sail().await; isis.sail().await; @@ -245,9 +245,9 @@ fn filter_true() { let mut isis = Godess::new(); - let filter = Box::new( |_: &IsisEvent| true ); + let filter = |_: &IsisEvent| true; - let mut events: Receiver = isis.observe( 5, Some(filter) ); + let mut events: Receiver = isis.observe( 5, Some( Filter::Pointer(filter) ) ); isis.sail().await; isis.sail().await; @@ -278,9 +278,9 @@ fn filter_false() { let mut isis = Godess::new(); - let filter = Box::new( |_: &IsisEvent| false ); + let filter = |_: &IsisEvent| false; - let mut events: Receiver = isis.observe( 5, Some(filter) ); + let mut events: Receiver = isis.observe( 5, Some( Filter::Pointer(filter) ) ); isis.sail().await; isis.sail().await; @@ -293,3 +293,44 @@ fn filter_false() assert_eq!( None, events.next().await ); }); } + + + +// Make sure we can move something into the closure, only Dock should be returned. +// +#[ test ] +// +fn filter_move() +{ + block_on( async move + { + let mut isis = Godess::new(); + let v: Vec = Vec::new(); + + let filter = move |evt: &IsisEvent| + { + match evt + { + IsisEvent::Sail if v.is_empty() => false, + IsisEvent::Dock if v.is_empty() => true , + _ => false, + } + }; + + let mut events: Receiver = isis.observe( 5, Filter::Closure( Box::new(filter) ).into() ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + }); +} + + diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 5b0af3c..8577f9b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -81,7 +81,7 @@ pub struct NutEvent impl Observable for Godess { - fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver { self.isis.observe( queue_size, predicate ) } @@ -90,7 +90,7 @@ impl Observable for Godess impl Observable for Godess { - fn observe( &mut self, queue_size: usize, predicate: Option< Predicate > ) -> Receiver + fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver { self.nut.observe( queue_size, predicate ) } @@ -99,7 +99,7 @@ impl Observable for Godess impl UnboundedObservable for Godess { - fn observe_unbounded( &mut self, predicate: Option< Predicate > ) -> UnboundedReceiver + fn observe_unbounded( &mut self, predicate: Option< Filter > ) -> UnboundedReceiver { self.isis.observe_unbounded( predicate ) } @@ -108,7 +108,7 @@ impl UnboundedObservable for Godess impl UnboundedObservable for Godess { - fn observe_unbounded( &mut self, predicate: Option< Predicate > ) -> UnboundedReceiver + fn observe_unbounded( &mut self, predicate: Option< Filter > ) -> UnboundedReceiver { self.nut.observe_unbounded( predicate ) } diff --git a/tests/unbounded.rs b/tests/unbounded.rs index ba6f7c9..84f2ca4 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -180,6 +180,7 @@ fn alot_of_events() + // Basic filter usage, only Dock should be returned. // #[ test ] @@ -190,16 +191,16 @@ fn filter() { let mut isis = Godess::new(); - let filter = Box::new( |evt: &IsisEvent| + let filter = |evt: &IsisEvent| { match evt { IsisEvent::Sail => false, IsisEvent::Dock => true , } - }); + }; - let mut events: UnboundedReceiver = isis.observe_unbounded( Some(filter) ); + let mut events: UnboundedReceiver = isis.observe_unbounded( Some( Filter::Pointer(filter) ) ); isis.sail().await; isis.sail().await; @@ -227,9 +228,9 @@ fn filter_true() { let mut isis = Godess::new(); - let filter = Box::new( |_: &IsisEvent| true ); + let filter = |_: &IsisEvent| true; - let mut events: UnboundedReceiver = isis.observe_unbounded( Some(filter) ); + let mut events: UnboundedReceiver = isis.observe_unbounded( Some( Filter::Pointer(filter) ) ); isis.sail().await; isis.sail().await; @@ -260,9 +261,9 @@ fn filter_false() { let mut isis = Godess::new(); - let filter = Box::new( |_: &IsisEvent| false ); + let filter = |_: &IsisEvent| false; - let mut events: UnboundedReceiver = isis.observe_unbounded( Some(filter) ); + let mut events: UnboundedReceiver = isis.observe_unbounded( Some( Filter::Pointer(filter) ) ); isis.sail().await; isis.sail().await; @@ -275,3 +276,43 @@ fn filter_false() assert_eq!( None, events.next().await ); }); } + + + +// Make sure we can move something into the closure, only Dock should be returned. +// +#[ test ] +// +fn filter_move() +{ + block_on( async move + { + let mut isis = Godess::new(); + let v: Vec = Vec::new(); + + let filter = move |evt: &IsisEvent| + { + match evt + { + IsisEvent::Sail if v.is_empty() => false, + IsisEvent::Dock if v.is_empty() => true , + _ => false, + } + }; + + let mut events: UnboundedReceiver = isis.observe_unbounded( Filter::Closure( Box::new(filter) ).into() ); + + isis.sail().await; + isis.sail().await; + isis.dock().await; + isis.dock().await; + isis.sail().await; + + drop( isis ); + + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( IsisEvent::Dock, events.next().await.unwrap() ); + assert_eq!( None , events.next().await ); + }); +} + From 44c2d563fbcb123ed2f25d41e3e0c948ead45d8e Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 01:08:34 +0200 Subject: [PATCH 05/14] Complete redesign of the API. - cleaner, only one trait with one method - pass channel choices with a config object - allow filtering events --- Cargo.toml | 5 + Cargo.yml | 7 +- README.md | 113 +++++++++++++----- TODO.md | 12 +- examples/basic.rs | 41 +++---- examples/filter.rs | 46 ++++++++ src/error.rs | 100 ++++++++++++++++ src/events.rs | 277 ++++++++++++++++++++++++++++++++++++++++++++ src/filter.rs | 87 +++++++++++--- src/lib.rs | 41 ++++--- src/observable.rs | 136 +++++++++++++++++++--- src/pharos.rs | 111 ++++++------------ tests/bounded.rs | 50 ++++---- tests/combined.rs | 8 +- tests/common/mod.rs | 25 +--- tests/unbounded.rs | 26 ++--- 16 files changed, 847 insertions(+), 238 deletions(-) create mode 100644 examples/filter.rs create mode 100644 src/error.rs create mode 100644 src/events.rs diff --git a/Cargo.toml b/Cargo.toml index 866057a..1eb8221 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,10 +7,15 @@ status = "actively-developed" repository = "najamelan/pharos" [dependencies] +pin-project = "^0.4.0-beta" + [dependencies.futures-preview] features = ["async-await", "nightly"] version = "^0.3.0-alpha" +[dev-dependencies] +assert_matches = "^1" + [features] external_doc = [] diff --git a/Cargo.yml b/Cargo.yml index d1b7733..cacdf01 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -42,5 +42,10 @@ badges: dependencies: - futures-preview: { version: ^0.3.0-alpha, features: [async-await, nightly] } + futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } + pin-project : ^0.4.0-beta + +dev-dependencies: + + assert_matches: ^1 diff --git a/README.md b/README.md index 253c280..9c85f46 100644 --- a/README.md +++ b/README.md @@ -20,16 +20,18 @@ Minimal rustc version: 1.39. - [Upgrade](#upgrade) - [Dependencies](#dependencies) - [Usage](#usage) + - [Filter](#filter) - [API](#api) - [Contributing](#contributing) - [Code of Conduct](#code-of-conduct) - [License](#license) + ## Security -The main issue with this crate right now is the possibility for the observable to outpace the observer. When using bounded form, there is back pressure, which might allow DDOS attacks if using the pattern on arriving network packets. When using the unbounded form, it might lead to excessive memory consumption if observers are outpaced. +The main issue with this crate right now is the posibility for the observable to outpace the observer. When using bounded channels, there is back pressure, which might allow DDOS attacks if using the pattern on arriving network packets. When using the unbounded channels, it might lead to excessive memory consumption if observers are outpaced. -To mitigate these problems effectively, I would like to implement an unbounded drop channel where the stream will only buffer a certain amount events and will overwrite the oldest event instead of blocking the sender when the buffer is full. +TODO: To mitigate these problems effectively, I will add a ring channel where the channel will only buffer a certain amount events and will overwrite the oldest event instead of blocking the sender when the buffer is full. This crate has: `#![ forbid( unsafe_code ) ]` @@ -43,14 +45,14 @@ With [cargo yaml](https://gitlab.com/storedbox/cargo-yaml): ```yaml dependencies: - pharos: ^0.2 + pharos: ^0.3 ``` With raw Cargo.toml ```toml [dependencies] - pharos = "0.2" + pharos = "0.3" ``` ### Upgrade @@ -60,37 +62,31 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master ### Dependencies -This crate only has one dependiency. Cargo will automatically handle it's dependencies for you. +This crate only has two dependiencies. Cargo will automatically handle it's dependencies for you. ```yaml dependencies: - futures-preview: { version: ^0.3.0-alpha } + futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } + pin-project : ^0.4.0-beta ``` ## Usage pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages -fast enough, otherwise they will slow down the observable (bounded form) or cause memory leak (unbounded form). +fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers. -Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size. Eg, there's no point putting an enum without associated data in an Arc. +Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size (eg. there's no point putting an enum without data in an Arc). -Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is a summary of the most basic one: +Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is the most basic one: ```rust use { - pharos :: { * } , - - futures :: - { - channel::mpsc :: Receiver , - executor :: LocalPool , - task :: LocalSpawnExt , - stream :: StreamExt , - }, + pharos :: { * } , + futures :: { executor::block_on, StreamExt } , }; @@ -115,7 +111,6 @@ impl Godess } - // Event types need to implement clone, but you can wrap them in Arc if not. Also they will be // cloned, so if you will have several observers and big event data, putting them in an Arc is // definitely best. It has no benefit to put a simple dataless enum in an Arc though. @@ -135,41 +130,95 @@ enum GodessEvent // impl Observable for Godess { - fn observe( &mut self, queue_size: usize ) -> Receiver + fn observe( &mut self, options: ObserveConfig) -> Events { - self.pharos.observe( queue_size ) + self.pharos.observe( options ) } } fn main() { - let mut pool = LocalPool::new(); - let mut exec = pool.spawner(); - let program = async move { let mut isis = Godess::new(); - // subscribe: bounded channel with 3 + 1 slots + // subscribe, the observe method takes options to let you choose: + // - channel type (bounded/unbounded) + // - a predicate to filter events // - let mut events = isis.observe( 3 ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ); // trigger an event // isis.sail().await; - // read from stream + // read from stream and let's put on the console what the event looks like. // - let from_stream = events.next().await.unwrap(); + let evt = dbg!( events.next().await.unwrap() ); - dbg!( from_stream ); - assert_eq!( GodessEvent::Sailing, from_stream ); + // After this reads on the event stream will return None. + // + drop( isis ); + + assert_eq!( GodessEvent::Sailing, evt ); + assert_eq!( None, events.next().await ); }; - exec.spawn_local( program ).expect( "Spawn program" ); + block_on( program ); +} +``` + +### Filter + +Sometimes you are not interested in all event types an observable can emit. A common use case is only listening for a +close event on a network connection. The observe method takes options which let you set the predicate. You can only +set one predicate for a given observer. + +```rust +use pharos::*; + +#[ derive( Clone, Debug, PartialEq, Copy ) ] +// +enum NetworkEvent +{ + Open , + Error , + Closing , + Closed , +} + +struct Connection { pharos: Pharos } - pool.run(); +impl Observable for Connection +{ + fn observe( &mut self, options: ObserveConfig) -> Events + { + self.pharos.observe( options ) + } +} + +fn main() +{ + let mut conn = Connection{ pharos: Pharos::new() }; + + // We will only get close events. + // + let filter = Filter::from_pointer( |e| e == &NetworkEvent::Closed ); + + // By creating the config object through into, other options will be defaults, notably here + // this will use unbounded channels. + // + let observer = conn.observe( filter.into() ); + + // Combine both options. + // + let filter = Filter::from_pointer( |e| e != &NetworkEvent::Closed ); + let opts = ObserveConfig::from( filter ).channel( Channel::Bounded(5) ); + + // Get everything but close events over a bounded channel with queue size 5. + // + let bounded_observer = conn.observe( opts ); } ``` diff --git a/TODO.md b/TODO.md index e871cb9..4b425b7 100644 --- a/TODO.md +++ b/TODO.md @@ -1,15 +1,11 @@ # TODO -- Create a filter possibility. Let users provide a closure with a predicate to filter which events they want to receive. - The reasons for doing this in pharos are: - - - It's unwieldy to do this in the client because of the unwieldy type that you need to annotate if you need to store - the stream (FilterMap will include a closure in it's type) - - performance. If filtering happens on the client side, we will clone and send out events they are not interested in. - By doing it in pharos, we can avoid that. - - switch to more performant channels (crossbeam). Will be easier once they provide an async api. - allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks? - scaling? For now we have an ever growing vector of observers + + - other data structure than vec? smallvec? + - keep a vector of free indexes, pop one on observe, push on removal, to reuse + - let users set capacity on creation? diff --git a/examples/basic.rs b/examples/basic.rs index fbe997f..28bb05a 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,14 +1,7 @@ use { - pharos :: { * } , - - futures :: - { - channel::mpsc :: Receiver , - executor :: LocalPool , - task :: LocalSpawnExt , - stream :: StreamExt , - }, + pharos :: { * } , + futures :: { executor::block_on, StreamExt } , }; @@ -33,7 +26,6 @@ impl Godess } - // Event types need to implement clone, but you can wrap them in Arc if not. Also they will be // cloned, so if you will have several observers and big event data, putting them in an Arc is // definitely best. It has no benefit to put a simple dataless enum in an Arc though. @@ -53,39 +45,40 @@ enum GodessEvent // impl Observable for Godess { - fn observe( &mut self, queue_size: usize, predicate: Option> ) -> Receiver + fn observe( &mut self, options: ObserveConfig) -> Events { - self.pharos.observe( queue_size, predicate ) + self.pharos.observe( options ) } } fn main() { - let mut pool = LocalPool::new(); - let mut exec = pool.spawner(); - let program = async move { let mut isis = Godess::new(); - // subscribe + // subscribe, the observe method takes options to let you choose: + // - channel type (bounded/unbounded) + // - a predicate to filter events // - let mut events = isis.observe( 3, None ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ); // trigger an event // isis.sail().await; - // read from stream + // read from stream and let's put on the console what the event looks like. // - let from_stream = events.next().await.unwrap(); + let evt = dbg!( events.next().await.unwrap() ); - dbg!( from_stream ); - assert_eq!( GodessEvent::Sailing, from_stream ); - }; + // After this reads on the event stream will return None. + // + drop( isis ); - exec.spawn_local( program ).expect( "Spawn program" ); + assert_eq!( GodessEvent::Sailing, evt ); + assert_eq!( None, events.next().await ); + }; - pool.run(); + block_on( program ); } diff --git a/examples/filter.rs b/examples/filter.rs new file mode 100644 index 0000000..1fa86ae --- /dev/null +++ b/examples/filter.rs @@ -0,0 +1,46 @@ +#![ allow( unused_variables, dead_code ) ] + +use pharos::*; + +#[ derive( Clone, Debug, PartialEq, Copy ) ] +// +enum NetworkEvent +{ + Open , + Error , + Closing , + Closed , +} + +struct Connection { pharos: Pharos } + +impl Observable for Connection +{ + fn observe( &mut self, options: ObserveConfig) -> Events + { + self.pharos.observe( options ) + } +} + +fn main() +{ + let mut conn = Connection{ pharos: Pharos::new() }; + + // We will only get close events. + // + let filter = Filter::from_pointer( |e| e == &NetworkEvent::Closed ); + + // By creating the config object through into, other options will be defaults, notably here + // this will use unbounded channels. + // + let observer = conn.observe( filter.into() ); + + // Combine both options. + // + let filter = Filter::from_pointer( |e| e != &NetworkEvent::Closed ); + let opts = ObserveConfig::from( filter ).channel( Channel::Bounded(5) ); + + // Get everything but close events over a bounded channel with queue size 5. + // + let bounded_observer = conn.observe( opts ); +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..4f284e4 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,100 @@ +use crate::{ import::* }; + + +/// The error type for errors happening in `pharos`. +/// +/// Use [`err.kind()`] to know which kind of error happened. +// +#[ derive( Debug ) ] +// +pub struct Error +{ + pub(crate) inner: Option< Box >, + pub(crate) kind : ErrorKind, +} + + + +/// The different kind of errors that can happen when you use the `pharos` API. +// +#[ derive( Debug ) ] +// +pub enum ErrorKind +{ + /// Failed to send on channel, normally means it's closed. + // + SendError, + + #[ doc( hidden ) ] + // + __NonExhaustive__ +} + + + +impl ErrorTrait for Error +{ + fn source( &self ) -> Option< &(dyn ErrorTrait + 'static) > + { + self.inner.as_ref().map( |e| -> &(dyn ErrorTrait + 'static) { e.deref() } ) + } +} + + + + +impl fmt::Display for ErrorKind +{ + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result + { + match self + { + Self::SendError => fmt::Display::fmt( "Channel closed.", f ) , + + _ => unreachable!(), + } + } +} + + +impl fmt::Display for Error +{ + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result + { + let inner = match self.source() + { + Some(e) => format!( " Caused by: {}", e ), + None => String::new() , + }; + + write!( f, "pharos::Error: {}{}", self.kind, inner ) + } +} + + + +impl Error +{ + /// Allows matching on the error kind + // + pub fn kind( &self ) -> &ErrorKind + { + &self.kind + } +} + +impl From for Error +{ + fn from( kind: ErrorKind ) -> Error + { + Error { inner: None, kind } + } +} + +impl From for Error +{ + fn from( inner: FutSendError ) -> Error + { + Error { inner: Some( Box::new( inner ) ), kind: ErrorKind::SendError } + } +} diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..7fb41f4 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,277 @@ +use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; + +/// A stream of events. +// +#[ derive( Debug ) ] +// +pub struct Events where Event: Clone + 'static + Send +{ + rx: Receiver, +} + + +impl Events where Event: Clone + 'static + Send +{ + pub(crate) fn new( config: ObserveConfig ) -> (Self, Sender) + { + let (tx, rx) = match config.channel + { + Channel::Bounded( queue_size ) => + { + let (tx, rx) = mpsc::channel( queue_size ); + + ( Sender::Bounded{ tx, filter: config.filter }, Receiver::Bounded{ rx } ) + } + + Channel::Unbounded => + { + let (tx, rx) = mpsc::unbounded(); + + ( Sender::Unbounded{ tx, filter: config.filter }, Receiver::Unbounded{ rx } ) + } + + _ => unreachable!(), + }; + + + ( Self{ rx }, tx ) + } + + + /// Close the channel. This way the sender will stop sending new events, and you can still + /// continue to read any events that are still pending in the channel. This avoids data loss + /// compared to just dropping this object. + // + pub fn close( &mut self ) + { + self.rx.close(); + } +} + + + + +impl Stream for Events where Event: Clone + 'static + Send +{ + type Item = Event; + + fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< Option > + { + Pin::new( &mut self.rx ).poll_next( cx ) + } +} + + + +/// The sender of the channel. +// +#[ pin_project ] +// +pub(crate) enum Sender where Event: Clone + 'static + Send +{ + Bounded { #[pin] tx: FutSender , filter: Option> } , + Unbounded{ #[pin] tx: FutUnboundedSender, filter: Option> } , +} + + + + +impl Sender where Event: Clone + 'static + Send +{ + // Notify the observer and return a bool indicating whether this observer is still + // operational. If an error happens on a channel it usually means that the channel + // is closed, in which case we should drop this sender. + // + pub(crate) async fn notify( &mut self, evt: &Event ) -> bool + { + match self + { + Sender::Bounded{ tx, filter } => + { + match tx.is_closed() + { + true => false , + false => Self::notifier( tx, filter, evt ).await , + } + } + + Sender::Unbounded{ tx, filter } => + { + match tx.is_closed() + { + true => false , + false => Self::notifier( tx, filter, evt ).await , + } + } + } + } + + + async fn notifier + ( + mut tx: impl Sink + Unpin , + filter: &mut Option> , + evt : &Event , + ) + + -> bool + + { + let interested = match filter + { + Some(f) => f.call(evt), + None => true , + }; + + match interested + { + true => tx.send( evt.clone() ).await.is_ok(), + + // since we don't try to send, we know nothing about whether they are still + // observing, so assume they do. + // + false => true, + } + } +} + + + +/// The receiver of the channel. +// +#[ pin_project ] +// +enum Receiver where Event: Clone + 'static + Send +{ + Bounded { #[pin] rx: FutReceiver } , + Unbounded{ #[pin] rx: FutUnboundedReceiver } , +} + + +impl Receiver where Event: Clone + 'static + Send +{ + fn close( &mut self ) + { + match self + { + Receiver::Bounded { rx } => rx.close(), + Receiver::Unbounded{ rx } => rx.close(), + }; + } +} + + + +impl fmt::Debug for Receiver where Event: 'static + Clone + Send +{ + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result + { + match self + { + Self::Bounded {..} => write!( f, "pharos::events::Receiver::<{}>::Bounded(_)" , type_name::() ), + Self::Unbounded{..} => write!( f, "pharos::events::Receiver::<{}>::Unbounded(_)", type_name::() ), + } + } +} + + + + +impl Stream for Receiver where Event: Clone + 'static + Send +{ + type Item = Event; + + #[ project ] + // + fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< Option > + { + #[ project ] + // + match self.project() + { + Receiver::Bounded { rx } => rx.poll_next( cx ), + Receiver::Unbounded{ rx } => rx.poll_next( cx ), + } + } +} + + + +impl Sink for Sender where Event: Clone + 'static + Send +{ + type Error = Error; + + #[ project ] + // + fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + #[ project ] + // + match self.project() + { + Sender::Bounded { tx, .. } => tx.poll_ready( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => tx.poll_ready( cx ).map_err( Into::into ), + } + } + + #[ project ] + // + fn start_send( self: Pin<&mut Self>, item: Event ) -> Result<(), Self::Error> + { + #[ project ] + // + match self.project() + { + Sender::Bounded { tx, .. } => tx.start_send( item ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => tx.start_send( item ).map_err( Into::into ), + } + } + + /// This will do a send under the hood, so the same errors as from start_send can occur here. + // + #[ project ] + // + fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + #[ project ] + // + match self.project() + { + Sender::Bounded { tx, .. } => tx.poll_flush( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => tx.poll_flush( cx ).map_err( Into::into ), + } + } + + #[ project ] + // + fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + #[ project ] + // + match self.project() + { + Sender::Bounded { tx, .. } => tx.poll_close( cx ).map_err( Into::into ), + Sender::Unbounded{ tx, .. } => tx.poll_close( cx ).map_err( Into::into ), + } + } +} + + + + + +#[ cfg( test ) ] +// +mod tests +{ + use super::*; + + #[test] + // + fn debug() + { + let e = Events::::new( ObserveConfig::default() ); + + assert_eq!( "Events { rx: pharos::events::Receiver::::Unbounded(_) }", &format!( "{:?}", e.0 ) ); + } +} diff --git a/src/filter.rs b/src/filter.rs index 0bc622d..17c0a05 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,45 +1,106 @@ use crate :: { import::* }; -/// Filter events +/// Predicate for filtering events. This is an enum because closures that capture variables from +/// their environment need to be boxed. More often than not, an event will be a simple enum and +/// the predicate will just match on the variant, so it would be wasteful to impose boxing in those +/// cases, hence there is a function pointer variant which does not require boxing. This should +/// be preferred where possible. // pub enum Filter where Event: Clone + 'static + Send , { - /// A function pointer to use to filter events + /// A function pointer to use to filter events. // Pointer( fn(&Event) -> bool ), - /// A boxed closure to use to filter events + /// A boxed closure to use to filter events. // Closure( Box bool + Send> ), } - -impl fmt::Debug for Filter +impl Filter where Event: Clone + 'static + Send { - fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result + /// Construct a filter from a closure that captures something from it's environment. This will + /// be boxed and stored under the [Filter::Closure] variant. To avoid boxing, do not capture + /// any variables from the environment and use [Filter::from_pointer]. + // + pub fn from_closure( predicate: F ) -> Self where F: FnMut(&Event) -> bool + Send + 'static + { + Self::Closure( Box::new( predicate ) ) + } + + + /// Construct a filter from a function pointer to a predicate. + // + pub fn from_pointer( predicate: fn(&Event) -> bool ) -> Self { - write!( f, "pharos::Filter" ) + Self::Pointer( predicate ) + } + + + /// Invoke the predicate + // + pub(crate) fn call( &mut self, evt: &Event ) -> bool + { + match self + { + Self::Pointer(f) => f(evt), + Self::Closure(f) => f(evt), + } } } -impl From bool> for Filter +impl fmt::Debug for Filter where Event: Clone + 'static + Send { - fn from( pointer: fn(&Event) -> bool ) -> Self + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - Filter::Pointer( pointer ) + match self + { + Self::Pointer(_) => write!( f, "pharos::Filter<{}>::Pointer(_)", type_name::() ), + Self::Closure(_) => write!( f, "pharos::Filter<{}>::Closure(_)", type_name::() ), + } + } } -impl From bool + Send>> for Filter + +#[ cfg( test ) ] +// +mod tests { - fn from( closure: Box bool + Send> ) -> Self + use super::*; + + #[test] + // + fn from_pointer() { - Filter::Closure( closure ) + let f = Filter::from_pointer( |b| *b ); + + assert_matches!( f, Filter::Pointer(_) ); + } + + #[test] + // + fn from_closure() + { + let f = Filter::from_closure( |b| *b ); + + assert_matches!( f, Filter::Closure(_) ); + } + + #[test] + // + fn debug() + { + let f = Filter::from_pointer( |b| *b ); + let g = Filter::from_closure( |b| *b ); + + assert_eq!( "pharos::Filter::Pointer(_)", &format!( "{:?}", f ) ); + assert_eq!( "pharos::Filter::Closure(_)", &format!( "{:?}", g ) ); } } diff --git a/src/lib.rs b/src/lib.rs index 76c7c0d..9baf878 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,17 +26,21 @@ )] -mod observable; -mod pharos ; -mod filter ; +mod error ; +mod events ; +mod observable ; +mod pharos ; +mod filter ; pub use { - self::pharos :: { Pharos } , - filter :: { Filter } , - observable :: { Observable, UnboundedObservable } , + self::pharos :: { Pharos } , + filter :: { Filter } , + observable :: { Observable, ObserveConfig, Channel } , + events :: { Events } , + error :: { Error } , }; @@ -45,20 +49,31 @@ mod import { pub(crate) use { - std:: { fmt }, + std :: { fmt, error::Error as ErrorTrait, ops::Deref, any::type_name } , + std :: { task::{ Poll, Context }, pin::Pin } , + pin_project :: { project, pin_project } , futures :: { - join, - - future::{ join_all }, Sink, SinkExt, + future::{ join_all }, Stream, Sink, SinkExt, channel::mpsc:: { - self , - Sender , Receiver , - UnboundedSender, UnboundedReceiver , + self , + Sender as FutSender , + Receiver as FutReceiver , + UnboundedSender as FutUnboundedSender , + UnboundedReceiver as FutUnboundedReceiver , + SendError as FutSendError , } , }, }; + + + #[ cfg( test ) ] + // + pub(crate) use + { + assert_matches :: { assert_matches } , + }; } diff --git a/src/observable.rs b/src/observable.rs index 5db9b40..a740f2d 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Filter }; +use crate :: { Filter, Events }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. @@ -7,26 +7,132 @@ pub trait Observable where Event: Clone + 'static + Send , { - /// Add an observer to the observable. This will use a bounded channel of the size of `queue_size`. - /// Note that the use of a bounded channel provides backpressure and can slow down the observed - /// task. - /// - /// The predicate parameter allows filtering the events that should be send to this observer. - /// It receives a reference to the event. If the predicate returns true, it will be sent. + /// Add an observer to the observable. Options can be in order to choose channel type and + /// to filter events with a predicate. // - fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver; + fn observe( &mut self, options: ObserveConfig ) -> Events; } -/// Indicate that a type is observable through an unbounded stream. You can call [`observe_unbounded`](UnboundedObservable::observe_unbounded) -/// to get a stream of events. + +/// Choose the type of channel that will be used for your event stream. +// +#[ derive( Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord )] // -pub trait UnboundedObservable +pub enum Channel +{ + /// A channel with a limited buffer (the usize parameter). Creates back pressure when the buffer is full. + /// This means that producer tasks may block if consumers can't process fast enough. + // + Bounded(usize), - where Event: Clone + 'static + Send , + /// A channel with unbounded capacity. Note that this may lead to unbouded memory consumption if producers + /// outpace consumers. + // + Unbounded, + + /// This enum might grow in the future, thanks to this that won't be a breaking change. + // + __NonExhaustive__ +} + + +impl Default for Channel +{ + fn default() -> Self + { + Channel::Unbounded + } +} + + + +/// Configuration for your event stream, passed to [Observable::observe] when subscribing. +/// This let's you choose the type of channel (currently Bounded or Unbounded) and let's +/// you set a filter to ignore certain events (see: [Filter]). +// +#[ derive( Debug ) ] +// +pub struct ObserveConfig where Event: Clone + 'static + Send +{ + pub(crate) channel: Channel, + pub(crate) filter : Option>, +} + + + +/// Create a default configuration: +/// - no filter +/// - an unbounded channel +// +impl Default for ObserveConfig where Event: Clone + 'static + Send { - /// Add an observer to the observable. This will use an unbounded channel. Beware that if the observable - /// outpaces the observer, this will lead to growing memory consumption over time. + fn default() -> Self + { + Self + { + channel: Channel::default(), + filter : None , + } + } +} + + + +impl ObserveConfig where Event: Clone + 'static + Send +{ + /// Choose which channel implementation to use for your event stream. + // + pub fn channel( mut self, channel: Channel ) -> Self + { + self.channel = channel; + self + } + + + /// Filter your event stream with a predicate that is a fn pointer. + // + pub fn filter( mut self, filter: fn(&Event) -> bool ) -> Self + { + debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); + + self.filter = Some( Filter::Pointer(filter) ); + self + } + + + /// Filter your event stream with a predicate that is a closure that captures environment. + /// It is preferred to use [filter](ObserveConfig::filter) if you can as this will box the closure. // - fn observe_unbounded( &mut self, predicate: Option> ) -> UnboundedReceiver; + pub fn filter_boxed( mut self, filter: impl FnMut(&Event) -> bool + Send + 'static ) -> Self + { + debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); + + self.filter = Some( Filter::from_closure(filter) ); + self + } +} + + +/// Create a ObserveConfig from a [Channel], getting default values for other options. +// +impl From for ObserveConfig where Event: Clone + 'static + Send +{ + fn from( channel: Channel ) -> Self + { + Self::default().channel( channel ) + } +} + + +/// Create a ObserveConfig from a [Filter], getting default values for other options. +// +impl From> for ObserveConfig where Event: Clone + 'static + Send +{ + fn from( filter: Filter ) -> Self + { + let mut s = Self::default(); + s.filter = Some( filter ); + s + } } diff --git a/src/pharos.rs b/src/pharos.rs index 7ca00bf..9e32124 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,33 +1,30 @@ -use crate :: { import::*, Observable, UnboundedObservable, Filter }; +use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; /// The Pharos lighthouse. When you implement Observable on your type, you can forward -/// the [`observe`](Pharos::observe) method to Pharos and call notify on it. +/// the [`observe`](Observable::observe) method to Pharos and call notify on it. /// /// You can of course create several `Pharos` (I know, historical sacrilege) for (different) types /// of events. // -pub struct Pharos +pub struct Pharos where Event: 'static + Clone + Send { - observers: Vec, Option>) >>, - unbounded: Vec, Option>) >>, + observers: Vec >>, } -// TODO: figure out what we really want here... -// -impl fmt::Debug for Pharos +impl fmt::Debug for Pharos where Event: 'static + Clone + Send { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - write!( f, "Pharos" ) + write!( f, "pharos::Pharos<{}>", type_name::() ) } } -impl Pharos +impl Pharos where Event: 'static + Clone + Send { /// Create a new Pharos. May it's light guide you to safe harbour. // @@ -40,74 +37,39 @@ impl Pharos /// Notify all observers of Event evt. // - pub async fn notify<'a>( &'a mut self, evt: &'a Event ) - { - let unbound = Self::notify_inner( &mut self.unbounded, &evt ); - let bounded = Self::notify_inner( &mut self.observers, &evt ); - - join!( unbound, bounded ); - } - - - - // Helper method to abstract out over bounded and unbounded observers. - // - async fn notify_inner<'a> - ( - observers: &'a mut Vec< Option< (impl Sink + Unpin + Clone, Option>) > > , - evt: &'a Event - ) + pub async fn notify( &mut self, evt: &Event ) { // Try to send to all channels in parallel, so they can all start processing this event // even if one of them is blocked on a full queue. // - // We can not have mutable access in parallel, so we destructure our vector. This probably + // We can not have mutable access in parallel, so we take options out and put them back. This // allocates a new vector every time. If you have a better idea, please open an issue! // // The output of the join is a vec of options with the disconnected observers removed. // let fut = join_all ( - ( 0..observers.len() ).map( |i| + ( 0..self.observers.len() ).map( |i| { + let opt = self.observers[i].take(); let evt = evt.clone(); - let opt = observers[i].take(); async move { - if let Some( (mut tx, filter_opt) ) = opt + let mut new = None; + + if let Some( mut s ) = opt { - // If we have a filter, run it, otherwise return true. - // we return the filter, since we need to give it back at the end. - // - let (go, pre_opt2) = filter_opt.map_or( (true, None), |mut filter| + match s.notify( &evt ).await { - let filtered = match filter - { - Filter::Pointer(ref p) => p( &evt ), - Filter::Closure(ref mut p) => p( &evt ), - }; - - (filtered, Some(filter)) - }); - - // We count on the send not being executed if go is false. - // If an error is returned, it's disconnected, drop it. - // - if go && tx.send( evt ).await.is_err() - { - None + true => new = Some( s ), + false => {} } - - // Put it back after use - // - else { Some( (tx, pre_opt2) ) } } - // It was already none - // - else { None } + new } + }) ); @@ -115,53 +77,56 @@ impl Pharos // Put back the observers that we "borrowed" // TODO: compact the vector from time to time? // - *observers = fut.await; + self.observers = fut.await; } } -impl Default for Pharos +impl Default for Pharos where Event: 'static + Clone + Send { fn default() -> Self { Self { observers: Vec::new(), - unbounded: Vec::new(), } } } -impl Observable for Pharos +impl Observable for Pharos where Event: 'static + Clone + Send { /// Add an observer to the pharos. This will use a bounded channel of the size of `queue_size`. /// Note that the use of a bounded channel provides backpressure and can slow down the observed /// task. // - fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver + fn observe( &mut self, options: ObserveConfig ) -> Events { - let (tx, rx) = mpsc::channel( queue_size ); + let (events, sender) = Events::new( options ); - self.observers.push( Some(( tx, predicate )) ); + self.observers.push( Some(sender) ); - rx + events } } -impl UnboundedObservable for Pharos + + + +#[ cfg( test ) ] +// +mod tests { - /// Add an observer to the pharos. This will use an unbounded channel. Beware that if the observable - /// outpaces the observer, this will lead to growing memory consumption over time. + use super::*; + + #[test] // - fn observe_unbounded( &mut self, predicate: Option< Filter > ) -> UnboundedReceiver + fn debug() { - let (tx, rx) = mpsc::unbounded(); - - self.unbounded.push( Some(( tx, predicate )) ); + let lighthouse = Pharos::::new(); - rx + assert_eq!( "pharos::Pharos", &format!( "{:?}", lighthouse ) ); } } diff --git a/tests/bounded.rs b/tests/bounded.rs index e21e924..24e6ec3 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -21,9 +21,8 @@ fn basic() { block_on( async move { - let mut isis = Godess::new(); - - let mut events = isis.observe( 5, None ); + let mut isis = Godess::new(); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ); isis.sail().await; isis.sail().await; @@ -45,9 +44,8 @@ fn close_receiver() { block_on( async move { - let mut isis = Godess::new(); - - let mut events = isis.observe( 5, None ); + let mut isis = Godess::new(); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ); isis.sail().await; events.close(); @@ -67,10 +65,9 @@ fn one_receiver_drops() { block_on( async move { - let mut isis = Godess::new(); - - let mut egypt_evts = isis.observe( 1, None ); - let mut shine_evts = isis.observe( 2, None ); + let mut isis = Godess::new(); + let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ); + let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ); isis.sail().await; @@ -102,8 +99,8 @@ fn types() // Note that because of the asserts below type inference works here and we don't have to // put type annotation, but I do find it quite obscure and better to be explicit. // - let mut shine_evts: Receiver< NutEvent> = isis.observe( 5, None ); - let mut egypt_evts: Receiver = isis.observe( 5, None ); + let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); isis.shine().await; isis.sail ().await; @@ -126,10 +123,9 @@ fn threads() { block_on( async move { - let mut isis = Godess::new(); - - let mut egypt_evts = isis.observe( 5, None ); - let mut shine_evts = isis.observe( 5, None ); + let mut isis = Godess::new(); + let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ); + let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ); thread::spawn( move || @@ -217,7 +213,10 @@ fn filter() } }; - let mut events: Receiver = isis.observe( 5, Some( Filter::Pointer(filter) ) ); + + let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); + + let mut events = isis.observe( opts ); isis.sail().await; isis.sail().await; @@ -247,7 +246,10 @@ fn filter_true() let filter = |_: &IsisEvent| true; - let mut events: Receiver = isis.observe( 5, Some( Filter::Pointer(filter) ) ); + let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); + + + let mut events = isis.observe( opts ); isis.sail().await; isis.sail().await; @@ -280,7 +282,10 @@ fn filter_false() let filter = |_: &IsisEvent| false; - let mut events: Receiver = isis.observe( 5, Some( Filter::Pointer(filter) ) ); + let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); + + + let mut events = isis.observe( opts ); isis.sail().await; isis.sail().await; @@ -304,7 +309,7 @@ fn filter_move() { block_on( async move { - let mut isis = Godess::new(); + let mut isis = Godess::new(); let v: Vec = Vec::new(); let filter = move |evt: &IsisEvent| @@ -317,7 +322,10 @@ fn filter_move() } }; - let mut events: Receiver = isis.observe( 5, Filter::Closure( Box::new(filter) ).into() ); + let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter_boxed( filter ); + + + let mut events = isis.observe( opts ); isis.sail().await; isis.sail().await; diff --git a/tests/combined.rs b/tests/combined.rs index 92151a9..221d4ae 100644 --- a/tests/combined.rs +++ b/tests/combined.rs @@ -15,11 +15,11 @@ fn both() { let mut isis = Godess::new(); - let mut events: Receiver = isis.observe( 5, None ) ; - let mut ubevts: UnboundedReceiver = isis.observe_unbounded( None) ; + let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut nuevts: Receiver = isis.observe( 5, None ) ; - let mut ubnuts: UnboundedReceiver = isis.observe_unbounded( None) ; + let mut ubevts: Events = isis.observe( ObserveConfig::default() ); + let mut ubnuts: Events = isis.observe( ObserveConfig::default() ); isis.sail ().await; isis.shine().await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 8577f9b..665f6a3 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -81,35 +81,18 @@ pub struct NutEvent impl Observable for Godess { - fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver + fn observe( &mut self, options: ObserveConfig ) -> Events { - self.isis.observe( queue_size, predicate ) + self.isis.observe( options ) } } impl Observable for Godess { - fn observe( &mut self, queue_size: usize, predicate: Option< Filter > ) -> Receiver + fn observe( &mut self, options: ObserveConfig ) -> Events { - self.nut.observe( queue_size, predicate ) + self.nut.observe( options ) } } - -impl UnboundedObservable for Godess -{ - fn observe_unbounded( &mut self, predicate: Option< Filter > ) -> UnboundedReceiver - { - self.isis.observe_unbounded( predicate ) - } -} - - -impl UnboundedObservable for Godess -{ - fn observe_unbounded( &mut self, predicate: Option< Filter > ) -> UnboundedReceiver - { - self.nut.observe_unbounded( predicate ) - } -} diff --git a/tests/unbounded.rs b/tests/unbounded.rs index 84f2ca4..f72dabf 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -23,7 +23,7 @@ fn basic() { let mut isis = Godess::new(); - let mut events = isis.observe_unbounded( None ); + let mut events = isis.observe( ObserveConfig::default() ); isis.sail().await; isis.sail().await; @@ -46,7 +46,7 @@ fn close_receiver() { let mut isis = Godess::new(); - let mut events = isis.observe_unbounded( None ); + let mut events = isis.observe( ObserveConfig::default() ); isis.sail().await; events.close(); @@ -68,8 +68,8 @@ fn one_receiver_drops() { let mut isis = Godess::new(); - let mut egypt_evts = isis.observe_unbounded( None ); - let mut shine_evts = isis.observe_unbounded( None ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ); + let mut shine_evts = isis.observe( ObserveConfig::default() ); isis.sail().await; @@ -100,8 +100,8 @@ fn types() { let mut isis = Godess::new(); - let mut egypt_evts: UnboundedReceiver = isis.observe_unbounded( None ); - let mut shine_evts: UnboundedReceiver = isis.observe_unbounded( None ); + let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ); + let mut shine_evts: Events = isis.observe( ObserveConfig::default() ); isis.sail ().await; isis.shine().await; @@ -125,8 +125,8 @@ fn threads() { let mut isis = Godess::new(); - let mut egypt_evts = isis.observe_unbounded( None ); - let mut shine_evts = isis.observe_unbounded( None ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ); + let mut shine_evts = isis.observe( ObserveConfig::default() ); thread::spawn( move || @@ -159,7 +159,7 @@ fn alot_of_events() { let mut w = Godess::new(); - let mut events = w.observe_unbounded( None ); + let mut events = w.observe( ObserveConfig::default() ); let amount = 1000; @@ -200,7 +200,7 @@ fn filter() } }; - let mut events: UnboundedReceiver = isis.observe_unbounded( Some( Filter::Pointer(filter) ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); isis.sail().await; isis.sail().await; @@ -230,7 +230,7 @@ fn filter_true() let filter = |_: &IsisEvent| true; - let mut events: UnboundedReceiver = isis.observe_unbounded( Some( Filter::Pointer(filter) ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); isis.sail().await; isis.sail().await; @@ -263,7 +263,7 @@ fn filter_false() let filter = |_: &IsisEvent| false; - let mut events: UnboundedReceiver = isis.observe_unbounded( Some( Filter::Pointer(filter) ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); isis.sail().await; isis.sail().await; @@ -300,7 +300,7 @@ fn filter_move() } }; - let mut events: UnboundedReceiver = isis.observe_unbounded( Filter::Closure( Box::new(filter) ).into() ); + let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ); isis.sail().await; isis.sail().await; From 69f58cda3d48a58aa00e1762d420e525af87ce01 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 10:37:52 +0200 Subject: [PATCH 06/14] Don't expose error type publicly since we don't use it in the API --- src/error.rs | 9 +++++---- src/lib.rs | 6 +++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/error.rs b/src/error.rs index 4f284e4..afd5c5d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use crate::{ import::* }; // #[ derive( Debug ) ] // -pub struct Error +pub(crate) struct Error { pub(crate) inner: Option< Box >, pub(crate) kind : ErrorKind, @@ -19,9 +19,10 @@ pub struct Error // #[ derive( Debug ) ] // -pub enum ErrorKind +pub(crate) enum ErrorKind { - /// Failed to send on channel, normally means it's closed. + /// Failed to send on channel, normally means it's closed. Pharos does not expose these errors + /// to the user. // SendError, @@ -77,7 +78,7 @@ impl Error { /// Allows matching on the error kind // - pub fn kind( &self ) -> &ErrorKind + pub(crate) fn _kind( &self ) -> &ErrorKind { &self.kind } diff --git a/src/lib.rs b/src/lib.rs index 9baf878..ea16b62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,10 +40,14 @@ pub use filter :: { Filter } , observable :: { Observable, ObserveConfig, Channel } , events :: { Events } , - error :: { Error } , }; +pub(crate) use +{ + error :: { Error } , +}; + mod import { From 9dae7c8e3f10087fa71b4eebdf2546e4f28dcc3d Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 13:24:23 +0200 Subject: [PATCH 07/14] Add docs --- README.md | 6 ++ TODO.md | 2 + src/events.rs | 2 +- src/filter.rs | 32 ++++++ src/observable.rs | 248 ++++++++++++++++++++++++++++++++-------------- src/pharos.rs | 3 + 6 files changed, 217 insertions(+), 76 deletions(-) diff --git a/README.md b/README.md index 9c85f46..90d9153 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Minimal rustc version: 1.39. ## Table of Contents - [Security](#security) +- [Future work](#future-work) - [Install](#install) - [Upgrade](#upgrade) - [Dependencies](#dependencies) @@ -36,6 +37,11 @@ TODO: To mitigate these problems effectively, I will add a ring channel where th This crate has: `#![ forbid( unsafe_code ) ]` +### Future work + +Please check out the [todo](https://github.com/najamelan/pharos/blob/master/TODO.md) for ambitions. + + ## Install With [cargo add](https://github.com/killercup/cargo-edit): diff --git a/TODO.md b/TODO.md index 4b425b7..9531b7f 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,7 @@ # TODO +- make Events clone? means we can only work with broadcast channels + - switch to more performant channels (crossbeam). Will be easier once they provide an async api. - allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks? diff --git a/src/events.rs b/src/events.rs index 7fb41f4..a1ab88a 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,6 @@ use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; -/// A stream of events. +/// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). // #[ derive( Debug ) ] // diff --git a/src/filter.rs b/src/filter.rs index 17c0a05..9e6b1ba 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -5,6 +5,38 @@ use crate :: { import::* }; /// the predicate will just match on the variant, so it would be wasteful to impose boxing in those /// cases, hence there is a function pointer variant which does not require boxing. This should /// be preferred where possible. +/// +/// ``` +/// use pharos::*; +/// +/// let a = 5; +/// +/// // This closure captures the a variable from it's environment. +/// // We can still use it as a filter by boxing it with `from_closure`. +/// // +/// // Note: it depends on the circumstances, but often enough, we need to +/// // annotate the type of the event parameter to the predicate. +/// // +/// // For this example we use bool as event type for simplicity, but it works +/// // just the same if that's an enum. +/// // +/// let predicate = move |_: &bool| { a; true }; +/// +/// let filter = Filter::::from_closure( predicate ); +/// +/// // This one does not capture anything, so it can be stored as a function pointer +/// // without boxing. +/// // +/// let predicate = move |_: &bool| { true }; +/// +/// let filter = Filter::::from_pointer( predicate ); +/// +/// // You can also use actual functions as filters. +/// // +/// fn predicate_function( event: &bool ) -> bool { true } +/// +/// let filter = Filter::::from_pointer( predicate_function ); +/// ``` // pub enum Filter diff --git a/src/observable.rs b/src/observable.rs index a740f2d..cd81d93 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -2,61 +2,157 @@ use crate :: { Filter, Events }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. +/// +/// Generally used with a [Pharos](crate::Pharos) object which manages the observers for you. +/// +/// ``` +/// use pharos::*; +/// use futures::stream::StreamExt; +/// +/// // The event we want to broadcast +/// // +/// #[ derive( Debug, Clone ) ] +/// // +/// enum Steps +/// { +/// Step1 , +/// Step2 , +/// Done , +/// +/// // Data is possible, but it has to be clone and will be cloned for each observer +/// // except observers that filter this event out. +/// // +/// Error(u8) , +/// } +/// +/// +/// impl Steps +/// { +/// fn is_err( &self ) -> bool +/// { +/// match self +/// { +/// Self::Error(_) => true , +/// _ => false , +/// } +/// } +/// } +/// +/// +/// // The object we want to be observable +/// // +/// struct Foo { pharos: Pharos }; +/// +/// impl Observable for Foo +/// { +/// // Pharos implements observable, so we just forward the call. +/// // +/// fn observe( &mut self, options: ObserveConfig ) -> Events +/// { +/// self.pharos.observe( options ) +/// } +/// } +/// +/// +/// // use in async context +/// // +/// async fn task() +/// { +/// let mut foo = Foo { pharos: Pharos::new() }; +/// let mut errors = foo.observe( Filter::from_pointer( Steps::is_err ).into() ); +/// +/// // will only be notified on errors now +/// // +/// let next_error = errors.next().await; +/// } +/// ``` // pub trait Observable - where Event: Clone + 'static + Send , + where Event: Clone + 'static + Send , { - /// Add an observer to the observable. Options can be in order to choose channel type and - /// to filter events with a predicate. - // - fn observe( &mut self, options: ObserveConfig ) -> Events; + /// Add an observer to the observable. Options can be in order to choose channel type and + /// to filter events with a predicate. + // + fn observe( &mut self, options: ObserveConfig ) -> Events; } -/// Choose the type of channel that will be used for your event stream. +/// Choose the type of channel that will be used for your event stream. Used in [ObserveConfig]. // #[ derive( Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord )] // pub enum Channel { - /// A channel with a limited buffer (the usize parameter). Creates back pressure when the buffer is full. - /// This means that producer tasks may block if consumers can't process fast enough. - // - Bounded(usize), - - /// A channel with unbounded capacity. Note that this may lead to unbouded memory consumption if producers - /// outpace consumers. - // - Unbounded, - - /// This enum might grow in the future, thanks to this that won't be a breaking change. - // - __NonExhaustive__ + /// A channel with a limited buffer (the usize parameter). Creates back pressure when the buffer is full. + /// This means that producer tasks may block if consumers can't process fast enough. + // + Bounded(usize), + + /// A channel with unbounded capacity. Note that this may lead to unbouded memory consumption if producers + /// outpace consumers. + // + Unbounded, + + /// This enum might grow in the future, thanks to this that won't be a breaking change. + // + __NonExhaustive__ } impl Default for Channel { - fn default() -> Self - { - Channel::Unbounded - } + fn default() -> Self + { + Channel::Unbounded + } } /// Configuration for your event stream, passed to [Observable::observe] when subscribing. -/// This let's you choose the type of channel (currently Bounded or Unbounded) and let's -/// you set a filter to ignore certain events (see: [Filter]). +/// This let's you choose the type of [channel](Channel) and let's +/// you set a [filter](Filter) to ignore certain events. +/// +/// ``` +/// use pharos::*; +/// +/// // We choose event type usize for simplicity. You choose whatever type you want here, +/// // see the bounds on the Event type parameter throughout this library. +/// // +/// let mut pharos = Pharos::::new(); +/// +/// // Use defaults, unbounded channel and no filter. +/// // +/// pharos.observe( ObserveConfig::default() ); +/// +/// // Use bounded channel and defaults for other options. +/// // +/// pharos.observe( Channel::Bounded(5).into() ); +/// +/// // Use a filter and defaults for other options. +/// // Will only receive events if they are bigger than three. +/// // +/// pharos.observe( Filter::from_pointer( |evt| *evt > 3 ).into() ); +/// +/// // Set both channel and filter. Note you can only set one filter per observable. +/// // +/// let opts = ObserveConfig::default() +/// +/// .channel( Channel::Bounded( 5 ) ) +/// .filter ( |evt| *evt > 3 ) +/// ; +/// +/// pharos.observe( opts ); +/// ``` // #[ derive( Debug ) ] // pub struct ObserveConfig where Event: Clone + 'static + Send { - pub(crate) channel: Channel, - pub(crate) filter : Option>, + pub(crate) channel: Channel, + pub(crate) filter : Option>, } @@ -67,50 +163,52 @@ pub struct ObserveConfig where Event: Clone + 'static + Send // impl Default for ObserveConfig where Event: Clone + 'static + Send { - fn default() -> Self - { - Self - { - channel: Channel::default(), - filter : None , - } - } + fn default() -> Self + { + Self + { + channel: Channel::default(), + filter : None , + } + } } impl ObserveConfig where Event: Clone + 'static + Send { - /// Choose which channel implementation to use for your event stream. - // - pub fn channel( mut self, channel: Channel ) -> Self - { - self.channel = channel; - self - } - - - /// Filter your event stream with a predicate that is a fn pointer. - // - pub fn filter( mut self, filter: fn(&Event) -> bool ) -> Self - { - debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); - - self.filter = Some( Filter::Pointer(filter) ); - self - } - - - /// Filter your event stream with a predicate that is a closure that captures environment. - /// It is preferred to use [filter](ObserveConfig::filter) if you can as this will box the closure. - // - pub fn filter_boxed( mut self, filter: impl FnMut(&Event) -> bool + Send + 'static ) -> Self - { - debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); - - self.filter = Some( Filter::from_closure(filter) ); - self - } + /// Choose which channel implementation to use for your event stream. + // + pub fn channel( mut self, channel: Channel ) -> Self + { + self.channel = channel; + self + } + + + /// Filter your event stream with a predicate that is a fn pointer. + /// You can only set one filter per observable. + // + pub fn filter( mut self, filter: fn(&Event) -> bool ) -> Self + { + debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); + + self.filter = Some( Filter::Pointer(filter) ); + self + } + + + /// Filter your event stream with a predicate that is a closure that captures environment. + /// It is preferred to use [filter](ObserveConfig::filter) if you can as this will box the closure. + /// You can only set one filter per observable. + // + pub fn filter_boxed( mut self, filter: impl FnMut(&Event) -> bool + Send + 'static ) -> Self + { + debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); + + self.filter = Some( Filter::from_closure(filter) ); + self + } } @@ -118,10 +216,10 @@ impl ObserveConfig where Event: Clone + 'static + Send // impl From for ObserveConfig where Event: Clone + 'static + Send { - fn from( channel: Channel ) -> Self - { - Self::default().channel( channel ) - } + fn from( channel: Channel ) -> Self + { + Self::default().channel( channel ) + } } @@ -129,10 +227,10 @@ impl From for ObserveConfig where Event: Clone + 'static // impl From> for ObserveConfig where Event: Clone + 'static + Send { - fn from( filter: Filter ) -> Self - { - let mut s = Self::default(); - s.filter = Some( filter ); - s - } + fn from( filter: Filter ) -> Self + { + let mut s = Self::default(); + s.filter = Some( filter ); + s + } } diff --git a/src/pharos.rs b/src/pharos.rs index 9e32124..1834c28 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -6,6 +6,9 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; /// /// You can of course create several `Pharos` (I know, historical sacrilege) for (different) types /// of events. +/// +/// Please see the docs for [Observable] for an example. Others can be found in the README and +/// the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory of the repository. // pub struct Pharos where Event: 'static + Clone + Send { From c42b7e7a9987b0d7c57cc4f9c522db1f2adc4f6a Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 17:17:00 +0200 Subject: [PATCH 08/14] Let user decide the initial capacity of the vector used to store observers in Pharos --- README.md | 4 ++-- TODO.md | 2 +- examples/basic.rs | 2 +- examples/filter.rs | 2 +- src/observable.rs | 4 ++-- src/pharos.rs | 22 +++++++++++++++------- tests/common/mod.rs | 4 ++-- 7 files changed, 24 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 90d9153..029861e 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ impl Godess { fn new() -> Self { - Self { pharos: Pharos::new() } + Self { pharos: Pharos::default() } } // Send Godess sailing so she can tweet about it! @@ -206,7 +206,7 @@ impl Observable for Connection fn main() { - let mut conn = Connection{ pharos: Pharos::new() }; + let mut conn = Connection{ pharos: Pharos::default() }; // We will only get close events. // diff --git a/TODO.md b/TODO.md index 9531b7f..9a59f97 100644 --- a/TODO.md +++ b/TODO.md @@ -9,5 +9,5 @@ - scaling? For now we have an ever growing vector of observers - other data structure than vec? smallvec? - - keep a vector of free indexes, pop one on observe, push on removal, to reuse + - type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review. - let users set capacity on creation? diff --git a/examples/basic.rs b/examples/basic.rs index 28bb05a..e3b8977 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -14,7 +14,7 @@ impl Godess { fn new() -> Self { - Self { pharos: Pharos::new() } + Self { pharos: Pharos::default() } } // Send Godess sailing so she can tweet about it! diff --git a/examples/filter.rs b/examples/filter.rs index 1fa86ae..02534ac 100644 --- a/examples/filter.rs +++ b/examples/filter.rs @@ -24,7 +24,7 @@ impl Observable for Connection fn main() { - let mut conn = Connection{ pharos: Pharos::new() }; + let mut conn = Connection{ pharos: Pharos::default() }; // We will only get close events. // diff --git a/src/observable.rs b/src/observable.rs index cd81d93..f23fead 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -58,7 +58,7 @@ use crate :: { Filter, Events }; /// // /// async fn task() /// { -/// let mut foo = Foo { pharos: Pharos::new() }; +/// let mut foo = Foo { pharos: Pharos::default() }; /// let mut errors = foo.observe( Filter::from_pointer( Steps::is_err ).into() ); /// /// // will only be notified on errors now @@ -121,7 +121,7 @@ impl Default for Channel /// // We choose event type usize for simplicity. You choose whatever type you want here, /// // see the bounds on the Event type parameter throughout this library. /// // -/// let mut pharos = Pharos::::new(); +/// let mut pharos = Pharos::::default(); /// /// // Use defaults, unbounded channel and no filter. /// // diff --git a/src/pharos.rs b/src/pharos.rs index 1834c28..e779480 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -30,10 +30,18 @@ impl fmt::Debug for Pharos where Event: 'static + Clone + Send impl Pharos where Event: 'static + Clone + Send { /// Create a new Pharos. May it's light guide you to safe harbour. + /// + /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers + /// it will save allocations by setting this to a higher number. + /// + /// For pharos 0.3.0 on x64 linux: std::mem::size_of::>>() == 56 bytes. // - pub fn new() -> Self + pub fn new( capacity: usize ) -> Self { - Self::default() + Self + { + observers: Vec::with_capacity( capacity ), + } } @@ -86,14 +94,14 @@ impl Pharos where Event: 'static + Clone + Send +/// Creates a new pharos, using 10 as the initial capacity of the vector used to store +/// observers. If this number does really not fit your use case, call [Pharos::new]. +// impl Default for Pharos where Event: 'static + Clone + Send { fn default() -> Self { - Self - { - observers: Vec::new(), - } + Self::new( 10 ) } } @@ -128,7 +136,7 @@ mod tests // fn debug() { - let lighthouse = Pharos::::new(); + let lighthouse = Pharos::::default(); assert_eq!( "pharos::Pharos", &format!( "{:?}", lighthouse ) ); } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 665f6a3..cd6492c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -36,8 +36,8 @@ impl Godess { Self { - isis: Pharos::new(), - nut : Pharos::new(), + isis: Pharos::default(), + nut : Pharos::default(), } } From 85a79d8caee0690936295b9028b2778d8868e9b1 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 17:37:44 +0200 Subject: [PATCH 09/14] Add `storage_len` and `num_observers` to Pharos. --- src/events.rs | 33 +++++++++---------- src/pharos.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/src/events.rs b/src/events.rs index a1ab88a..5bb6130 100644 --- a/src/events.rs +++ b/src/events.rs @@ -78,31 +78,30 @@ pub(crate) enum Sender where Event: Clone + 'static + Send impl Sender where Event: Clone + 'static + Send { + // Verify whether this observer is still around + // + pub(crate) fn is_closed( &self ) -> bool + { + match self + { + Sender::Bounded { tx, .. } => tx.is_closed(), + Sender::Unbounded{ tx, .. } => tx.is_closed(), + } + } + + // Notify the observer and return a bool indicating whether this observer is still // operational. If an error happens on a channel it usually means that the channel // is closed, in which case we should drop this sender. // pub(crate) async fn notify( &mut self, evt: &Event ) -> bool { + if self.is_closed() { return false } + match self { - Sender::Bounded{ tx, filter } => - { - match tx.is_closed() - { - true => false , - false => Self::notifier( tx, filter, evt ).await , - } - } - - Sender::Unbounded{ tx, filter } => - { - match tx.is_closed() - { - true => false , - false => Self::notifier( tx, filter, evt ).await , - } - } + Sender::Bounded { tx, filter } => Self::notifier( tx, filter, evt ).await, + Sender::Unbounded{ tx, filter } => Self::notifier( tx, filter, evt ).await, } } diff --git a/src/pharos.rs b/src/pharos.rs index e779480..d85c094 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -90,6 +90,44 @@ impl Pharos where Event: 'static + Clone + Send // self.observers = fut.await; } + + + /// Returns the size of the vector used to store the observers. Useful for debugging and testing if it + /// seems to get to big. + // + pub fn storage_len( &self ) -> usize + { + self.observers.len() + } + + + /// Returns the number of actual observers that are still listening (have not closed or dropped the Events). + /// This will loop and it will verify for each if they are closed, clearing them from the internal storage + /// if they are closed. This is similar to what notify does, but without sending an event. + // + pub fn num_observers( &mut self ) -> usize + { + let mut count = 0; + + for opt in self.observers.iter_mut() + { + if let Some(observer) = opt.take() + { + match observer.is_closed() + { + true => {} + + false => + { + count += 1; + *opt = Some( observer ); + } + } + } + } + + count + } } @@ -140,4 +178,56 @@ mod tests assert_eq!( "pharos::Pharos", &format!( "{:?}", lighthouse ) ); } + + // #[test] + // // + // fn size_of_sender() + // { + // dbg!( std::mem::size_of::>>() ); + // } + + + // verify storage_len and num_observers + // + #[test] + // + fn new() + { + let ph = Pharos::::new( 5 ); + + assert_eq!( ph.observers.capacity(), 5 ); + } + + + // verify storage_len and num_observers + // + #[test] + // + fn storage_len() + { + let mut ph = Pharos::::default(); + + assert_eq!( ph.storage_len (), 0 ); + assert_eq!( ph.num_observers(), 0 ); + + let mut a = ph.observe( ObserveConfig::default() ); + + assert_eq!( ph.storage_len (), 1 ); + assert_eq!( ph.num_observers(), 1 ); + + let b = ph.observe( ObserveConfig::default() ); + + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers(), 2 ); + + a.close(); + + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers(), 1 ); + + drop( b ); + + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers(), 0 ); + } } From e29030711fbf4c58929c6a28dd113b4f81344650 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 17:39:19 +0200 Subject: [PATCH 10/14] Re-use unused slots in vector in Pharos. Add documentation about implementation details --- src/events.rs | 3 ++ src/pharos.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/src/events.rs b/src/events.rs index 5bb6130..82f94a4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,8 @@ use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; /// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). +/// +/// For pharos 0.3.0 on x64 linux: std::mem::size_of::>() == 16 // #[ derive( Debug ) ] // @@ -64,6 +66,7 @@ impl Stream for Events where Event: Clone + 'static + Send /// The sender of the channel. +/// For pharos 0.3.0 on x64 linux: std::mem::size_of::>() == 56 // #[ pin_project ] // diff --git a/src/pharos.rs b/src/pharos.rs index d85c094..c2f6248 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -9,6 +9,27 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; /// /// Please see the docs for [Observable] for an example. Others can be found in the README and /// the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory of the repository. +/// +/// ## Implementation. +/// +/// Currently just holds a `Vec>`. It will stop notifying observers if the channel has +/// returned an error, which usually means it is closed or disconnected. However, we currently don't +/// compact the vector or use a more performant data structure for the observers. +/// +/// In observe, we do loop the vector to find a free spot to re-use before pushing. +/// +/// **Note**: we only detect that observers can be removed when [Pharos::notify] or [Pharos::num_observers] +/// is being called. Otherwise, we won't find out about disconnected observers and the vector of observers +/// will not mark deleted observers and thus their slots can not be reused. +/// +/// Right now, in notify, we use `join_all` from the futures library to notify all observers concurrently. +/// We take all of our senders out of the options in our vector, operate on them and put them back if +/// they did not generate an error. +/// +/// `join_all` will allocate a new vector on every notify from what our concurrent futures return. Ideally +/// we would use a datastructure which allows &mut access to individual elements, so we can work on them +/// concurrently in place without reallocating. I am looking into the partitions crate, but that's for +/// the next release ;). // pub struct Pharos where Event: 'static + Clone + Send { @@ -154,7 +175,25 @@ impl Observable for Pharos where Event: 'static + Clone + { let (events, sender) = Events::new( options ); - self.observers.push( Some(sender) ); + let mut new_observer = Some(sender); + + // Try to find a free slot + // + for option in &mut self.observers + { + if option.is_none() + { + *option = new_observer.take(); + break; + } + } + + // no free slots found + // + if new_observer.is_some() + { + self.observers.push( new_observer ); + } events } @@ -230,4 +269,42 @@ mod tests assert_eq!( ph.storage_len (), 2 ); assert_eq!( ph.num_observers(), 0 ); } + + + // Make sure we are reusing slots + // + #[test] + // + fn reuse() + { + let mut ph = Pharos::::default(); + let _a = ph.observe( ObserveConfig::default() ); + let b = ph.observe( ObserveConfig::default() ); + let _c = ph.observe( ObserveConfig::default() ); + + assert_eq!( ph.storage_len (), 3 ); + assert_eq!( ph.num_observers(), 3 ); + + drop( b ); + + // It's important we call num_observers here, to clear the dropped one + // + assert_eq!( ph.storage_len (), 3 ); + assert_eq!( ph.num_observers(), 2 ); + + assert!( ph.observers[1].is_none() ); + + + let _d = ph.observe( ObserveConfig::default() ); + + assert_eq!( ph.storage_len (), 3 ); + assert_eq!( ph.num_observers(), 3 ); + + let _e = ph.observe( ObserveConfig::default() ); + + // Now we should have pushed again + // + assert_eq!( ph.storage_len (), 4 ); + assert_eq!( ph.num_observers(), 4); + } } From 1b8ca10d789350a55f4e2a08abfaf91c4b477929 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 17:39:24 +0200 Subject: [PATCH 11/14] Clean up algorithm in Pharos::notify --- src/pharos.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/pharos.rs b/src/pharos.rs index c2f6248..e93d961 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -68,31 +68,31 @@ impl Pharos where Event: 'static + Clone + Send /// Notify all observers of Event evt. + /// Currently allocates a new vector for all observers on every run. That will be fixed in future + /// versions. // pub async fn notify( &mut self, evt: &Event ) { // Try to send to all channels in parallel, so they can all start processing this event // even if one of them is blocked on a full queue. // - // We can not have mutable access in parallel, so we take options out and put them back. This - // allocates a new vector every time. If you have a better idea, please open an issue! + // We can not have mutable access in parallel, so we take options out and put them back. // // The output of the join is a vec of options with the disconnected observers removed. // let fut = join_all ( - ( 0..self.observers.len() ).map( |i| + self.observers.iter_mut().map( |opt| { - let opt = self.observers[i].take(); - let evt = evt.clone(); + let opt = opt.take(); - async move + async { let mut new = None; if let Some( mut s ) = opt { - match s.notify( &evt ).await + match s.notify( evt ).await { true => new = Some( s ), false => {} From bbf722cf3e71a805b7f18ca8ebfe09a52529b16c Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 18:01:23 +0200 Subject: [PATCH 12/14] Touch up documentation --- README.md | 13 ++++++++++++- src/events.rs | 4 ++-- src/pharos.rs | 9 +++------ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 029861e..3cefba3 100644 --- a/README.md +++ b/README.md @@ -9,13 +9,14 @@ More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to. -I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. You can now use all stream combinators, forward it into Sinks and so on. +I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on. Minimal rustc version: 1.39. ## Table of Contents - [Security](#security) +- [Limitations](#limitations) - [Future work](#future-work) - [Install](#install) - [Upgrade](#upgrade) @@ -37,6 +38,16 @@ TODO: To mitigate these problems effectively, I will add a ring channel where th This crate has: `#![ forbid( unsafe_code ) ]` +### Limitations + +- only bounded and unbounded channel as backend (for now) +- [`Events`] is not clonable right now (would require support from the channels we use as backends, eg. broadcast type channel) +- performance tweaking still needs to be done +- pharos requires mut access for most operations. This is not intended to change anytime soon. Both on + [notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal + state, and most channels also require mutable access to either read or write. If you need it from non mutable + context, use interior mutability primitives like locks or Cells... + ### Future work Please check out the [todo](https://github.com/najamelan/pharos/blob/master/TODO.md) for ambitions. diff --git a/src/events.rs b/src/events.rs index 82f94a4..7d08f26 100644 --- a/src/events.rs +++ b/src/events.rs @@ -2,7 +2,7 @@ use crate :: { import::*, Filter, ObserveConfig, observable::Channel, Error }; /// A stream of events. This is returned from [Observable::observe](crate::Observable::observe). /// -/// For pharos 0.3.0 on x64 linux: std::mem::size_of::>() == 16 +/// For pharos 0.3.0 on x64 linux: `std::mem::size_of::>() == 16` // #[ derive( Debug ) ] // @@ -66,7 +66,7 @@ impl Stream for Events where Event: Clone + 'static + Send /// The sender of the channel. -/// For pharos 0.3.0 on x64 linux: std::mem::size_of::>() == 56 +/// For pharos 0.3.0 on x64 linux: `std::mem::size_of::>() == 56` // #[ pin_project ] // diff --git a/src/pharos.rs b/src/pharos.rs index e93d961..6b1f9a6 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -55,7 +55,7 @@ impl Pharos where Event: 'static + Clone + Send /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers /// it will save allocations by setting this to a higher number. /// - /// For pharos 0.3.0 on x64 linux: std::mem::size_of::>>() == 56 bytes. + /// For pharos 0.3.0 on x64 linux: `std::mem::size_of::>>() == 56 bytes`. // pub fn new( capacity: usize ) -> Self { @@ -67,7 +67,8 @@ impl Pharos where Event: 'static + Clone + Send - /// Notify all observers of Event evt. + /// Notify all observers of Event `evt`. + /// /// Currently allocates a new vector for all observers on every run. That will be fixed in future /// versions. // @@ -167,10 +168,6 @@ impl Default for Pharos where Event: 'static + Clone + Send impl Observable for Pharos where Event: 'static + Clone + Send { - /// Add an observer to the pharos. This will use a bounded channel of the size of `queue_size`. - /// Note that the use of a bounded channel provides backpressure and can slow down the observed - /// task. - // fn observe( &mut self, options: ObserveConfig ) -> Events { let (events, sender) = Events::new( options ); From 9816fbb3819451c96ae133b84eff563f3755168e Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 18:58:40 +0200 Subject: [PATCH 13/14] rename Filter::from_pointer and from_closure to pointer and closure --- README.md | 4 ++-- examples/filter.rs | 4 ++-- src/filter.rs | 26 +++++++++++++------------- src/observable.rs | 6 +++--- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 3cefba3..cf29c43 100644 --- a/README.md +++ b/README.md @@ -221,7 +221,7 @@ fn main() // We will only get close events. // - let filter = Filter::from_pointer( |e| e == &NetworkEvent::Closed ); + let filter = Filter::pointer( |e| e == &NetworkEvent::Closed ); // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. @@ -230,7 +230,7 @@ fn main() // Combine both options. // - let filter = Filter::from_pointer( |e| e != &NetworkEvent::Closed ); + let filter = Filter::pointer( |e| e != &NetworkEvent::Closed ); let opts = ObserveConfig::from( filter ).channel( Channel::Bounded(5) ); // Get everything but close events over a bounded channel with queue size 5. diff --git a/examples/filter.rs b/examples/filter.rs index 02534ac..8fa7e6e 100644 --- a/examples/filter.rs +++ b/examples/filter.rs @@ -28,7 +28,7 @@ fn main() // We will only get close events. // - let filter = Filter::from_pointer( |e| e == &NetworkEvent::Closed ); + let filter = Filter::pointer( |e| e == &NetworkEvent::Closed ); // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. @@ -37,7 +37,7 @@ fn main() // Combine both options. // - let filter = Filter::from_pointer( |e| e != &NetworkEvent::Closed ); + let filter = Filter::pointer( |e| e != &NetworkEvent::Closed ); let opts = ObserveConfig::from( filter ).channel( Channel::Bounded(5) ); // Get everything but close events over a bounded channel with queue size 5. diff --git a/src/filter.rs b/src/filter.rs index 9e6b1ba..f7a5835 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -12,7 +12,7 @@ use crate :: { import::* }; /// let a = 5; /// /// // This closure captures the a variable from it's environment. -/// // We can still use it as a filter by boxing it with `from_closure`. +/// // We can still use it as a filter by boxing it with `closure`. /// // /// // Note: it depends on the circumstances, but often enough, we need to /// // annotate the type of the event parameter to the predicate. @@ -22,20 +22,20 @@ use crate :: { import::* }; /// // /// let predicate = move |_: &bool| { a; true }; /// -/// let filter = Filter::::from_closure( predicate ); +/// let filter = Filter::::closure( predicate ); /// /// // This one does not capture anything, so it can be stored as a function pointer /// // without boxing. /// // /// let predicate = move |_: &bool| { true }; /// -/// let filter = Filter::::from_pointer( predicate ); +/// let filter = Filter::::pointer( predicate ); /// /// // You can also use actual functions as filters. /// // /// fn predicate_function( event: &bool ) -> bool { true } /// -/// let filter = Filter::::from_pointer( predicate_function ); +/// let filter = Filter::::pointer( predicate_function ); /// ``` // pub enum Filter @@ -57,9 +57,9 @@ impl Filter where Event: Clone + 'static + Send { /// Construct a filter from a closure that captures something from it's environment. This will /// be boxed and stored under the [Filter::Closure] variant. To avoid boxing, do not capture - /// any variables from the environment and use [Filter::from_pointer]. + /// any variables from the environment and use [Filter::pointer]. // - pub fn from_closure( predicate: F ) -> Self where F: FnMut(&Event) -> bool + Send + 'static + pub fn closure( predicate: F ) -> Self where F: FnMut(&Event) -> bool + Send + 'static { Self::Closure( Box::new( predicate ) ) } @@ -67,7 +67,7 @@ impl Filter where Event: Clone + 'static + Send /// Construct a filter from a function pointer to a predicate. // - pub fn from_pointer( predicate: fn(&Event) -> bool ) -> Self + pub fn pointer( predicate: fn(&Event) -> bool ) -> Self { Self::Pointer( predicate ) } @@ -109,18 +109,18 @@ mod tests #[test] // - fn from_pointer() + fn pointer() { - let f = Filter::from_pointer( |b| *b ); + let f = Filter::pointer( |b| *b ); assert_matches!( f, Filter::Pointer(_) ); } #[test] // - fn from_closure() + fn closure() { - let f = Filter::from_closure( |b| *b ); + let f = Filter::closure( |b| *b ); assert_matches!( f, Filter::Closure(_) ); } @@ -129,8 +129,8 @@ mod tests // fn debug() { - let f = Filter::from_pointer( |b| *b ); - let g = Filter::from_closure( |b| *b ); + let f = Filter::pointer( |b| *b ); + let g = Filter::closure( |b| *b ); assert_eq!( "pharos::Filter::Pointer(_)", &format!( "{:?}", f ) ); assert_eq!( "pharos::Filter::Closure(_)", &format!( "{:?}", g ) ); diff --git a/src/observable.rs b/src/observable.rs index f23fead..d8fd76c 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -59,7 +59,7 @@ use crate :: { Filter, Events }; /// async fn task() /// { /// let mut foo = Foo { pharos: Pharos::default() }; -/// let mut errors = foo.observe( Filter::from_pointer( Steps::is_err ).into() ); +/// let mut errors = foo.observe( Filter::pointer( Steps::is_err ).into() ); /// /// // will only be notified on errors now /// // @@ -134,7 +134,7 @@ impl Default for Channel /// // Use a filter and defaults for other options. /// // Will only receive events if they are bigger than three. /// // -/// pharos.observe( Filter::from_pointer( |evt| *evt > 3 ).into() ); +/// pharos.observe( Filter::pointer( |evt| *evt > 3 ).into() ); /// /// // Set both channel and filter. Note you can only set one filter per observable. /// // @@ -206,7 +206,7 @@ impl ObserveConfig where Event: Clone + 'static + Send { debug_assert!( self.filter.is_none(), "You can only set one filter on ObserveConfig" ); - self.filter = Some( Filter::from_closure(filter) ); + self.filter = Some( Filter::closure(filter) ); self } } From c12bc456b8af5847f0f78e0f1120722f442ef630 Mon Sep 17 00:00:00 2001 From: Naja Melan Date: Mon, 23 Sep 2019 19:02:34 +0200 Subject: [PATCH 14/14] Bump version and readme --- CHANGELOG.md | 10 ++++++++++ Cargo.toml | 2 +- Cargo.yml | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbb9e5c..1d1ab36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Pharos Changelog +## 0.3.0 - 2019-09-23 + +**BREAKING CHANGE**: This is an almost complete rewrite with a much improved API, documentation, ... + +- Only have one Observable trait that takes options rather than UboundedObservable. +- Allow filtering events with a predicate. +- Many small improvements. + +Please have a look at the readme and the API docs for more. + ## 0.2.2 - 2019-08-26 - update to futures 0.3.0-alpha.18 diff --git a/Cargo.toml b/Cargo.toml index 1eb8221..81b8183 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ license = "Unlicense" name = "pharos" readme = "README.md" repository = "https://github.com/najamelan/pharos" -version = "0.2.2" +version = "0.3.0" [package.metadata] [package.metadata."docs.rs"] diff --git a/Cargo.yml b/Cargo.yml index cacdf01..69d53fa 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -10,7 +10,7 @@ package: # - merge dev branch into master # - create git tag # - version : 0.2.2 + version : 0.3.0 name : pharos authors : [ Naja Melan ] edition : '2018'