Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
* dev:
  bump version and changlog
  update documentation
  Add unit tests for the Sink impl of pharos
  Update to futures 0.3.0-alpha.19
  Allow comparing ErrorKind to &ErrorKind
  Document the behavior of the futures channels
  Finish flush algorithm now ICE is solved
  fix clippy warning
  fix visibility of Error::kind()
  Update some comments
  ICE in rustc
  Another breaking change to the API.
  Actually, we don't need pin_project
  • Loading branch information
najamelan committed Sep 28, 2019
2 parents 10dcf85 + a886ab3 commit 4ef7b7f
Show file tree
Hide file tree
Showing 17 changed files with 732 additions and 322 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Pharos Changelog

## 0.4.0 - 2019-09-28

- **BREAKING CHANGE**: The notify function had a sub optimal implemetation and did not allow notifying observers
from within `poll_*` functions. It has been replaced with an implementation of Sink on Pharos.
- got rid of dependency on pin_project.
- as Error::kind returns a reference to the error kind, you can now compare `ErrorKind::SomeVariant == err.kind()` without having to write: `&ErrorKind::SomeVariant == err.kind()`.
- updated to futures-preview 0.3.0-alpha.19

## 0.3.2 - 2019-09-23

- check spelling
Expand Down
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ status = "actively-developed"
repository = "najamelan/pharos"

[dependencies]
pin-project = "^0.4.0-beta"

[dependencies.futures-preview]
features = ["async-await", "nightly"]
features = ["async-await"]
version = "^0.3.0-alpha"

[dev-dependencies]
assert_matches = "^1"

[features]
external_doc = []

Expand All @@ -27,7 +28,7 @@ license = "Unlicense"
name = "pharos"
readme = "README.md"
repository = "https://github.com/najamelan/pharos"
version = "0.3.2"
version = "0.4.0"

[package.metadata]
[package.metadata.docs]
Expand Down
8 changes: 5 additions & 3 deletions Cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package:
# - merge dev branch into master
# - create git tag
#
version : 0.3.2
version : 0.4.0
name : pharos
authors : [ Naja Melan <[email protected]> ]
edition : '2018'
Expand Down Expand Up @@ -43,6 +43,8 @@ badges:

dependencies:

futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] }
pin-project : ^0.4.0-beta
futures-preview: { version: ^0.3.0-alpha, features: [async-await] }

dev-dependencies:

assert_matches: ^1
42 changes: 24 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to.

I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on.
I created it to leverage interoperability we can create by using async [Stream](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/stream/trait.Stream.html) and [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) from the futures library. So you can use all stream combinators, forward it into Sinks and so on.

Minimal rustc version: 1.39.

Expand Down Expand Up @@ -44,8 +44,8 @@ This crate has: `#![ forbid( unsafe_code ) ]`
- [`Events`] is not clonable right now (would require support from the channels we use as back-ends, eg. broadcast type channel)
- performance tweaking still needs to be done
- pharos requires mut access for most operations. This is not intended to change anytime soon. Both on
[notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal
state, and most channels also require mutable access to either read or write. If you need it from non mutable
[send](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_util/sink/trait.SinkExt.html#method.send) and [observe](Observable::observe), the two main interfaces, manipulate internal
state, and most channels also require mutable access to either read or write. If you need it from immutable
context, use interior mutability primitives like locks or Cells...

### Future work
Expand All @@ -62,14 +62,14 @@ With [cargo yaml](https://gitlab.com/storedbox/cargo-yaml):
```yaml
dependencies:

pharos: ^0.3
pharos: ^0.4
```
With raw Cargo.toml
```toml
[dependencies]

pharos = "0.3"
pharos = "0.4"
```

### Upgrade
Expand All @@ -79,31 +79,30 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master

### Dependencies

This crate only has two dependencies. Cargo will automatically handle it's dependencies for you.
This crate only has one dependencies. Cargo will automatically handle it's dependencies for you.

```yaml
dependencies:

futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] }
pin-project : ^0.4.0-beta
futures-preview: { version: ^0.3.0-alpha, features: [async-await, nightly] }
```
## Usage
pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages
fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel).
`pharos` only works from async code, implementing Sink to notify observers. You can notify observers from within
`poll_*` methods by calling the poll methods of the [Sink](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.Sink.html) impl directly. In async context you can use [SinkExt::send](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/sink/trait.SinkExt.html#method.send). Observers must consume the messages fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel).

Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers.

Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than a pointer size (eg. there's no point putting an enum without data in an Arc).
Your event type will be cloned once for each observer, so you might want to put it in an Arc if it's bigger than 2 pointer sizes (eg. there's no point putting an enum without data in an Arc).

Examples can be found in the [examples](https://github.com/najamelan/pharos/tree/master/examples) directory. Here is the most basic one:

```rust
use
{
pharos :: { * } ,
futures :: { executor::block_on, StreamExt } ,
pharos :: { * } ,
futures :: { executor::block_on, StreamExt, SinkExt } ,
};
Expand All @@ -123,7 +122,10 @@ impl Goddess
//
pub async fn sail( &mut self )
{
self.pharos.notify( &GoddessEvent::Sailing ).await;
// It's infallible. Observers that error will be dropped, since the only kind of errors on
// channels are when the channel is closed.
//
self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" );
}
}
Expand All @@ -147,7 +149,9 @@ enum GoddessEvent
//
impl Observable<GoddessEvent> for Goddess
{
fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Events<GoddessEvent>
type Error = pharos::Error;
fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Result< Events<GoddessEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -164,7 +168,7 @@ fn main()
// - channel type (bounded/unbounded)
// - a predicate to filter events
//
let mut events = isis.observe( Channel::Bounded( 3 ).into() );
let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" );
// trigger an event
//
Expand Down Expand Up @@ -209,7 +213,9 @@ struct Connection { pharos: Pharos<NetworkEvent> }
impl Observable<NetworkEvent> for Connection
{
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Events<NetworkEvent>
type Error = pharos::Error;
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Result< Events<NetworkEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -226,7 +232,7 @@ fn main()
// By creating the config object through into, other options will be defaults, notably here
// this will use unbounded channels.
//
let observer = conn.observe( filter.into() );
let observer = conn.observe( filter.into() ).expect( "observe" );
// Combine both options.
//
Expand Down
6 changes: 0 additions & 6 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
# TODO

- make Events clone? means we can only work with broadcast channels

- switch to more performant channels (crossbeam). Will be easier once they provide an async api.

- allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks?

- scaling? For now we have an ever growing vector of observers

- other data structure than vec? smallvec?
- type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review.
- let users set capacity on creation?
15 changes: 10 additions & 5 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use
{
pharos :: { * } ,
futures :: { executor::block_on, StreamExt } ,
pharos :: { * } ,
futures :: { executor::block_on, StreamExt, SinkExt } ,
};


Expand All @@ -21,7 +21,10 @@ impl Goddess
//
pub async fn sail( &mut self )
{
self.pharos.notify( &GoddessEvent::Sailing ).await;
// It's infallible. Observers that error will be dropped, since the only kind of errors on
// channels are when the channel is closed.
//
let _ = self.pharos.send( GoddessEvent::Sailing ).await;
}
}

Expand All @@ -45,7 +48,9 @@ enum GoddessEvent
//
impl Observable<GoddessEvent> for Goddess
{
fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Events<GoddessEvent>
type Error = pharos::Error;

fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Result< Events<GoddessEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -62,7 +67,7 @@ fn main()
// - channel type (bounded/unbounded)
// - a predicate to filter events
//
let mut events = isis.observe( Channel::Bounded( 3 ).into() );
let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" );

// trigger an event
//
Expand Down
6 changes: 4 additions & 2 deletions examples/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ struct Connection { pharos: Pharos<NetworkEvent> }

impl Observable<NetworkEvent> for Connection
{
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Events<NetworkEvent>
type Error = pharos::Error;

fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Result< Events<NetworkEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -33,7 +35,7 @@ fn main()
// By creating the config object through into, other options will be defaults, notably here
// this will use unbounded channels.
//
let observer = conn.observe( filter.into() );
let observer = conn.observe( filter.into() ).expect( "observe" );

// Combine both options.
//
Expand Down
Loading

0 comments on commit 4ef7b7f

Please sign in to comment.