Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Futures 0.3 #60

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: rust
rust:
- 1.31.1
- 1.39.0
- stable
- beta
- nightly
Expand Down
13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@ homepage = "https://github.com/rust-embedded/rust-sysfs-gpio"
documentation = "http://rust-embedded.github.io/rust-sysfs-gpio/"
description = "Provides access to GPIOs using the Linux sysfs interface."
readme = "README.md"
edition = "2018"

[features]
mio-evented = ["mio"]
use_tokio = ["futures", "tokio", "mio-evented"]

[dependencies]
futures = { version = "0.1", optional = true }
futures = { version = "0.3", optional = true }
nix = "0.14.0"
mio = { version = "0.6", optional = true }
tokio = { version = "0.1", optional = true }
tokio = { version = "0.2", optional = true, features = ["io-driver"] }
pin-project-lite = "0.1.4"

[dev-dependencies]
tokio = { version = "0.2", features = ["macros", "rt-core"] }

[[example]]
name = "tokio"
required-features = ["use_tokio"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ The following features are planned for the library:

## Minimum Supported Rust Version (MSRV)

This crate is guaranteed to compile on stable Rust 1.31 and up. It *might*
This crate is guaranteed to compile on stable Rust 1.39 and up. It *might*
compile with older versions but that may change in any new patch release.

## Cross Compiling
Expand Down
67 changes: 24 additions & 43 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,44 @@
#[cfg(feature = "use_tokio")]
extern crate futures;
#[cfg(feature = "use_tokio")]
extern crate sysfs_gpio;
#[cfg(feature = "use_tokio")]
extern crate tokio;
// Copyright (c) 2020. The sysfs-gpio Authors.

#[cfg(feature = "use_tokio")]
use futures::future::join_all;
use futures::StreamExt;
use std::env;

#[cfg(feature = "use_tokio")]
use futures::{lazy, Future, Stream};

#[cfg(feature = "use_tokio")]
use sysfs_gpio::{Direction, Edge, Pin};

#[cfg(feature = "use_tokio")]
fn stream(pin_nums: Vec<u64>) -> sysfs_gpio::Result<()> {
async fn monitor_pin(pin: Pin) -> Result<(), sysfs_gpio::Error> {
pin.export()?;
pin.set_direction(Direction::In)?;
pin.set_edge(Edge::BothEdges)?;
let mut gpio_events = pin.get_value_stream()?;
while let Some(evt) = gpio_events.next().await {
let val = evt.unwrap();
println!("Pin {} changed value to {}", pin.get_pin_num(), val);
}
Ok(())
}

async fn stream(pin_nums: Vec<u64>) {
// NOTE: this currently runs forever and as such if
// the app is stopped (Ctrl-C), no cleanup will happen
// and the GPIO will be left exported. Not much
// can be done about this as Rust signal handling isn't
// really present at the moment. Revisit later.
let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect();
let task = lazy(move || {
for &(i, ref pin) in pins.iter() {
pin.export().unwrap();
pin.set_direction(Direction::In).unwrap();
pin.set_edge(Edge::BothEdges).unwrap();
tokio::spawn(
pin.get_value_stream()
.unwrap()
.for_each(move |val| {
println!("Pin {} changed value to {}", i, val);
Ok(())
})
.map_err(|_| ()),
);
}
Ok(())
});
tokio::run(task);

Ok(())
join_all(pin_nums.into_iter().map(|p| {
let pin = Pin::new(p);
tokio::task::spawn(monitor_pin(pin))
}))
.await;
}

#[cfg(feature = "use_tokio")]
fn main() {
#[tokio::main]
async fn main() {
let pins: Vec<u64> = env::args()
.skip(1)
.map(|a| a.parse().expect("Pins must be specified as integers"))
.collect();
if pins.is_empty() {
println!("Usage: ./tokio <pin> [pin ...]");
} else {
stream(pins).unwrap();
stream(pins).await;
}
}

#[cfg(not(feature = "use_tokio"))]
fn main() {
println!("This example requires the `tokio` feature to be enabled.");
}
127 changes: 55 additions & 72 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ extern crate futures;
extern crate mio;
extern crate nix;
#[cfg(feature = "use_tokio")]
extern crate pin_project_lite;
#[cfg(feature = "use_tokio")]
extern crate tokio;

use std::fs;
Expand All @@ -58,18 +60,22 @@ use std::io::prelude::*;
use std::io::SeekFrom;
use std::os::unix::prelude::*;
use std::path::Path;
#[cfg(feature = "use_tokio")]
use std::pin::{Pin as SPin};
benjumanji marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(feature = "use_tokio")]
use futures::{Async, Poll, Stream};
use futures::{Stream, task::{Context, Poll}};
#[cfg(feature = "mio-evented")]
use mio::unix::EventedFd;
#[cfg(feature = "mio-evented")]
use mio::Evented;
use mio::{Evented, Ready};
#[cfg(any(target_os = "linux", target_os = "android"))]
use nix::sys::epoll::*;
use nix::unistd::close;
#[cfg(feature = "use_tokio")]
use tokio::reactor::{Handle, PollEvented};
use pin_project_lite::pin_project;
#[cfg(feature = "use_tokio")]
use tokio::io::PollEvented;

pub use error::Error;

Expand Down Expand Up @@ -466,17 +472,6 @@ impl Pin {
AsyncPinPoller::new(self.pin_num)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `use_tokio` crate feature is enabled.
#[cfg(feature = "use_tokio")]
pub fn get_stream_with_handle(&self, handle: &Handle) -> Result<PinStream> {
PinStream::init_with_handle(self.clone(), handle)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
Expand All @@ -488,25 +483,6 @@ impl Pin {
PinStream::init(self.clone())
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
/// `set_edge(Edge::BothEdges)` before using this.
///
/// Note that the values produced are the value of the pin as soon as we get to handling the
/// interrupt in userspace. Each time this stream produces a value, a change has occurred, but
/// it could end up producing the same value multiple times if the value has changed back
/// between when the interrupt occurred and when the value was read.
///
/// This method is only available when the `use_tokio` crate feature is enabled.
#[cfg(feature = "use_tokio")]
pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result<PinValueStream> {
Ok(PinValueStream(PinStream::init_with_handle(
self.clone(),
handle,
)?))
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
Expand All @@ -520,7 +496,7 @@ impl Pin {
/// This method is only available when the `use_tokio` crate feature is enabled.
#[cfg(feature = "use_tokio")]
pub fn get_value_stream(&self) -> Result<PinValueStream> {
Ok(PinValueStream(PinStream::init(self.clone())?))
Ok(PinValueStream::new(PinStream::init(self.clone())?))
}
}

Expand Down Expand Up @@ -671,72 +647,79 @@ pub struct PinStream {
skipped_first_event: bool,
}

#[cfg(feature = "use_tokio")]
impl PinStream {
pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result<Self> {
Ok(PinStream {
evented: PollEvented::new(pin.get_async_poller()?, &handle)?,
skipped_first_event: false,
})
}
}

#[cfg(feature = "use_tokio")]
impl PinStream {
pub fn init(pin: Pin) -> Result<Self> {
Ok(PinStream {
evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?,
evented: PollEvented::new(pin.get_async_poller()?)?,
skipped_first_event: false,
})
}
}

#[cfg(feature = "use_tokio")]
impl Stream for PinStream {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(match self.evented.poll_read() {
Async::Ready(()) => {
self.evented.need_read();
if self.skipped_first_event {
Async::Ready(Some(()))
} else {
self.skipped_first_event = true;
Async::NotReady
type Item = Result<()>;

fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.evented.poll_read_ready(cx, Ready::readable()) {
Poll::Ready(res) => {
match res {
Ok(_) => {
let _ = self.evented.clear_read_ready(cx, Ready::readable());
if self.skipped_first_event {
Poll::Ready(Some(Ok(())))
} else {
self.as_mut().skipped_first_event = true;
Poll::Pending
}
}
Err(e) => Poll::Ready(Some(Err(Error::Io(e))))
}
}
Async::NotReady => Async::NotReady,
})
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(feature = "use_tokio")]
pub struct PinValueStream(PinStream);
pin_project! {
pub struct PinValueStream {
#[pin]
inner: PinStream
}
}

#[cfg(feature = "use_tokio")]
impl PinValueStream {

fn new(inner: PinStream) -> Self {
PinValueStream { inner }
}

#[inline]
fn get_value(&mut self) -> Result<u8> {
get_value_from_file(&mut self.0.evented.get_mut().devfile)
get_value_from_file(&mut self.inner.evented.get_mut().devfile)
}
}

#[cfg(feature = "use_tokio")]
impl Stream for PinValueStream {
type Item = u8;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.0.poll() {
Ok(Async::Ready(Some(()))) => {
let value = self.get_value()?;
Ok(Async::Ready(Some(value)))
type Item = Result<u8>;

fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.as_mut().project().inner.poll_next(cx) {
Poll::Ready(Some(res)) => {
match res {
Ok(_) => {
let value = self.get_value();
Poll::Ready(Some(value))
}
Err(e) => Poll::Ready(Some(Err(e)))
}
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}