Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
* dev:
  Bump version and readme
  rename Filter::from_pointer and from_closure to pointer and closure
  Touch up documentation
  Clean up algorithm in Pharos::notify
  Re-use unused slots in vector in Pharos.
  Add `storage_len` and `num_observers` to Pharos.
  Let user decide the initial capacity of the vector used to store observers in Pharos
  Add docs
  Don't expose error type publicly since we don't use it in the API
  Complete redesign of the API.
  Switch to an enum Filter allowing to take both fn pointer and boxed closure as filter
  Add filter functionality, yipy
  Run bounded and unbounded notify concurrently and add an integration test for that
  Cleanup tests
  • Loading branch information
najamelan committed Sep 23, 2019
2 parents f6d00a0 + c12bc45 commit fc30cbf
Show file tree
Hide file tree
Showing 17 changed files with 1,628 additions and 379 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Pharos Changelog

## 0.3.0 - 2019-09-23

**BREAKING CHANGE**: This is an almost complete rewrite with a much improved API, documentation, ...

- Only have one Observable trait that takes options rather than UboundedObservable.
- Allow filtering events with a predicate.
- Many small improvements.

Please have a look at the readme and the API docs for more.

## 0.2.2 - 2019-08-26

- update to futures 0.3.0-alpha.18
Expand Down
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ status = "actively-developed"
repository = "najamelan/pharos"

[dependencies]
pin-project = "^0.4.0-beta"

[dependencies.futures-preview]
features = ["async-await", "nightly"]
version = "^0.3.0-alpha"

[dev-dependencies]
assert_matches = "^1"

[features]
external_doc = []

Expand All @@ -23,7 +29,7 @@ license = "Unlicense"
name = "pharos"
readme = "README.md"
repository = "https://github.com/najamelan/pharos"
version = "0.2.2"
version = "0.3.0"

[package.metadata]
[package.metadata."docs.rs"]
Expand Down
9 changes: 7 additions & 2 deletions Cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package:
# - merge dev branch into master
# - create git tag
#
version : 0.2.2
version : 0.3.0
name : pharos
authors : [ Naja Melan <[email protected]> ]
edition : '2018'
Expand Down Expand Up @@ -42,5 +42,10 @@ badges:

dependencies:

futures-preview: { version: ^0.3.0-alpha }
futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] }
pin-project : ^0.4.0-beta


dev-dependencies:

assert_matches: ^1
134 changes: 100 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,50 @@
More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to.

I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. You can now use all stream combinators, forward it into Sinks and so on.
I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on.

Minimal rustc version: 1.39.

## Table of Contents

- [Security](#security)
- [Limitations](#limitations)
- [Future work](#future-work)
- [Install](#install)
- [Upgrade](#upgrade)
- [Dependencies](#dependencies)
- [Usage](#usage)
- [Filter](#filter)
- [API](#api)
- [Contributing](#contributing)
- [Code of Conduct](#code-of-conduct)
- [License](#license)


## Security

The main issue with this crate right now is the possibility for the observable to outpace the observer. When using bounded form, there is back pressure, which might allow DDOS attacks if using the pattern on arriving network packets. When using the unbounded form, it might lead to excessive memory consumption if observers are outpaced.
The main issue with this crate right now is the posibility for the observable to outpace the observer. When using bounded channels, there is back pressure, which might allow DDOS attacks if using the pattern on arriving network packets. When using the unbounded channels, it might lead to excessive memory consumption if observers are outpaced.

To mitigate these problems effectively, I would like to implement an unbounded drop channel where the stream will only buffer a certain amount events and will overwrite the oldest event instead of blocking the sender when the buffer is full.
TODO: To mitigate these problems effectively, I will add a ring channel where the channel will only buffer a certain amount events and will overwrite the oldest event instead of blocking the sender when the buffer is full.

This crate has: `#![ forbid( unsafe_code ) ]`


### Limitations

- only bounded and unbounded channel as backend (for now)
- [`Events`] is not clonable right now (would require support from the channels we use as backends, eg. broadcast type channel)
- performance tweaking still needs to be done
- pharos requires mut access for most operations. This is not intended to change anytime soon. Both on
[notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal
state, and most channels also require mutable access to either read or write. If you need it from non mutable
context, use interior mutability primitives like locks or Cells...

### Future work

Please check out the [todo](https://github.com/najamelan/pharos/blob/master/TODO.md) for ambitions.


## Install

With [cargo add](https://github.com/killercup/cargo-edit):
Expand All @@ -43,14 +62,14 @@ With [cargo yaml](https://gitlab.com/storedbox/cargo-yaml):
```yaml
dependencies:

pharos: ^0.2
pharos: ^0.3
```
With raw Cargo.toml
```toml
[dependencies]

pharos = "0.2"
pharos = "0.3"
```

### Upgrade
Expand All @@ -60,37 +79,31 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master

### Dependencies

This crate only has one dependiency. Cargo will automatically handle it's dependencies for you.
This crate only has two dependiencies. Cargo will automatically handle it's dependencies for you.

```yaml
dependencies:

futures-preview: { version: ^0.3.0-alpha }
futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] }
pin-project : ^0.4.0-beta
```
## Usage
pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages
fast enough, otherwise they will slow down the observable (bounded form) or cause memory leak (unbounded form).
fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel).
Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers.

Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size. Eg, there's no point putting an enum without associated data in an Arc.
Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size (eg. there's no point putting an enum without data in an Arc).

Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is a summary of the most basic one:
Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is the most basic one:

```rust
use
{
pharos :: { * } ,
futures ::
{
channel::mpsc :: Receiver ,
executor :: LocalPool ,
task :: LocalSpawnExt ,
stream :: StreamExt ,
},
pharos :: { * } ,
futures :: { executor::block_on, StreamExt } ,
};
Expand All @@ -103,7 +116,7 @@ impl Godess
{
fn new() -> Self
{
Self { pharos: Pharos::new() }
Self { pharos: Pharos::default() }
}
// Send Godess sailing so she can tweet about it!
Expand All @@ -115,7 +128,6 @@ impl Godess
}
// Event types need to implement clone, but you can wrap them in Arc if not. Also they will be
// cloned, so if you will have several observers and big event data, putting them in an Arc is
// definitely best. It has no benefit to put a simple dataless enum in an Arc though.
Expand All @@ -135,41 +147,95 @@ enum GodessEvent
//
impl Observable<GodessEvent> for Godess
{
fn observe( &mut self, queue_size: usize ) -> Receiver<GodessEvent>
fn observe( &mut self, options: ObserveConfig<GodessEvent>) -> Events<GodessEvent>
{
self.pharos.observe( queue_size )
self.pharos.observe( options )
}
}
fn main()
{
let mut pool = LocalPool::new();
let mut exec = pool.spawner();
let program = async move
{
let mut isis = Godess::new();
// subscribe: bounded channel with 3 + 1 slots
// subscribe, the observe method takes options to let you choose:
// - channel type (bounded/unbounded)
// - a predicate to filter events
//
let mut events = isis.observe( 3 );
let mut events = isis.observe( Channel::Bounded( 3 ).into() );
// trigger an event
//
isis.sail().await;
// read from stream
// read from stream and let's put on the console what the event looks like.
//
let from_stream = events.next().await.unwrap();
let evt = dbg!( events.next().await.unwrap() );
dbg!( from_stream );
assert_eq!( GodessEvent::Sailing, from_stream );
// After this reads on the event stream will return None.
//
drop( isis );
assert_eq!( GodessEvent::Sailing, evt );
assert_eq!( None, events.next().await );
};
exec.spawn_local( program ).expect( "Spawn program" );
block_on( program );
}
```

### Filter

pool.run();
Sometimes you are not interested in all event types an observable can emit. A common use case is only listening for a
close event on a network connection. The observe method takes options which let you set the predicate. You can only
set one predicate for a given observer.

```rust
use pharos::*;
#[ derive( Clone, Debug, PartialEq, Copy ) ]
//
enum NetworkEvent
{
Open ,
Error ,
Closing ,
Closed ,
}
struct Connection { pharos: Pharos<NetworkEvent> }
impl Observable<NetworkEvent> for Connection
{
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Events<NetworkEvent>
{
self.pharos.observe( options )
}
}
fn main()
{
let mut conn = Connection{ pharos: Pharos::default() };
// We will only get close events.
//
let filter = Filter::pointer( |e| e == &NetworkEvent::Closed );
// By creating the config object through into, other options will be defaults, notably here
// this will use unbounded channels.
//
let observer = conn.observe( filter.into() );
// Combine both options.
//
let filter = Filter::pointer( |e| e != &NetworkEvent::Closed );
let opts = ObserveConfig::from( filter ).channel( Channel::Bounded(5) );
// Get everything but close events over a bounded channel with queue size 5.
//
let bounded_observer = conn.observe( opts );
}
```

Expand Down
14 changes: 7 additions & 7 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# TODO

- Create a filter possibility. Let users provide a closure with a predicate to filter which events they want to receive.
The reasons for doing this in pharos are:

- It's unwieldy to do this in the client because of the unwieldy type that you need to annotate if you need to store
the stream (FilterMap will include a closure in it's type)
- performance. If filtering happens on the client side, we will clone and send out events they are not interested in.
By doing it in pharos, we can avoid that.
- make Events clone? means we can only work with broadcast channels

- switch to more performant channels (crossbeam). Will be easier once they provide an async api.

- allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks?

- scaling? For now we have an ever growing vector of observers

- other data structure than vec? smallvec?
- type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review.
- let users set capacity on creation?
Loading

0 comments on commit fc30cbf

Please sign in to comment.