From fcafe621701d89c56af538e9de548a409db37002 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Tue, 10 Mar 2020 21:53:11 +0000 Subject: [PATCH 1/6] Futures 0.3 --- Cargo.toml | 5 ++- src/lib.rs | 110 +++++++++++++++++++++++++---------------------------- 2 files changed, 55 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9cb83025d..b634f07f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,8 @@ mio-evented = ["mio"] use_tokio = ["futures", "tokio", "mio-evented"] [dependencies] -futures = { version = "0.1", optional = true } +futures = { version = "0.3", optional = true } nix = "0.14.0" mio = { version = "0.6", optional = true } -tokio = { version = "0.1", optional = true } +tokio = { version = "0.2", optional = true, features = ["io-driver"] } +pin-utils = "0.1.0-alpha.4" diff --git a/src/lib.rs b/src/lib.rs index 0da996842..4328caf1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,8 @@ extern crate futures; extern crate mio; extern crate nix; #[cfg(feature = "use_tokio")] +extern crate pin_utils; +#[cfg(feature = "use_tokio")] extern crate tokio; use std::fs; @@ -58,18 +60,21 @@ use std::io::prelude::*; use std::io::SeekFrom; use std::os::unix::prelude::*; use std::path::Path; +use std::pin::{Pin as SPin}; #[cfg(feature = "use_tokio")] -use futures::{Async, Poll, Stream}; +use futures::{Stream, task::{Context, Poll}}; #[cfg(feature = "mio-evented")] use mio::unix::EventedFd; #[cfg(feature = "mio-evented")] -use mio::Evented; +use mio::{Evented, Ready}; #[cfg(any(target_os = "linux", target_os = "android"))] use nix::sys::epoll::*; use nix::unistd::close; #[cfg(feature = "use_tokio")] -use tokio::reactor::{Handle, PollEvented}; +use pin_utils::unsafe_pinned; +#[cfg(feature = "use_tokio")] +use tokio::{io::PollEvented, runtime::Handle}; pub use error::Error; @@ -466,17 +471,6 @@ impl Pin { AsyncPinPoller::new(self.pin_num) } - /// Get a Stream of pin interrupts for this pin - /// - /// The PinStream object can be used with the `tokio` crate. You should probably call - /// `set_edge()` before using this. - /// - /// This method is only available when the `use_tokio` crate feature is enabled. - #[cfg(feature = "use_tokio")] - pub fn get_stream_with_handle(&self, handle: &Handle) -> Result { - PinStream::init_with_handle(self.clone(), handle) - } - /// Get a Stream of pin interrupts for this pin /// /// The PinStream object can be used with the `tokio` crate. You should probably call @@ -500,11 +494,8 @@ impl Pin { /// /// This method is only available when the `use_tokio` crate feature is enabled. #[cfg(feature = "use_tokio")] - pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result { - Ok(PinValueStream(PinStream::init_with_handle( - self.clone(), - handle, - )?)) + pub fn get_value_stream_with_handle(&self, _handle: &Handle) -> Result { + Ok(PinValueStream::new(PinStream::init(self.clone())?)) } /// Get a Stream of pin values for this pin @@ -520,7 +511,7 @@ impl Pin { /// This method is only available when the `use_tokio` crate feature is enabled. #[cfg(feature = "use_tokio")] pub fn get_value_stream(&self) -> Result { - Ok(PinValueStream(PinStream::init(self.clone())?)) + Ok(PinValueStream::new(PinStream::init(self.clone())?)) } } @@ -671,21 +662,11 @@ pub struct PinStream { skipped_first_event: bool, } -#[cfg(feature = "use_tokio")] -impl PinStream { - pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result { - Ok(PinStream { - evented: PollEvented::new(pin.get_async_poller()?, &handle)?, - skipped_first_event: false, - }) - } -} - #[cfg(feature = "use_tokio")] impl PinStream { pub fn init(pin: Pin) -> Result { Ok(PinStream { - evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?, + evented: PollEvented::new(pin.get_async_poller()?)?, skipped_first_event: false, }) } @@ -693,50 +674,63 @@ impl PinStream { #[cfg(feature = "use_tokio")] impl Stream for PinStream { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - Ok(match self.evented.poll_read() { - Async::Ready(()) => { - self.evented.need_read(); - if self.skipped_first_event { - Async::Ready(Some(())) - } else { - self.skipped_first_event = true; - Async::NotReady + type Item = Result<()>; + + fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll> { + match self.evented.poll_read_ready(cx, Ready::readable()) { + Poll::Ready(res) => { + match res { + Ok(_) => { + let _ = self.evented.clear_read_ready(cx, Ready::readable()); + if self.skipped_first_event { + Poll::Ready(Some(Ok(()))) + } else { + self.as_mut().skipped_first_event = true; + Poll::Pending + } + } + Err(e) => Poll::Ready(Some(Err(Error::Io(e)))) } } - Async::NotReady => Async::NotReady, - }) + Poll::Pending => Poll::Pending, + } } } #[cfg(feature = "use_tokio")] -pub struct PinValueStream(PinStream); +pub struct PinValueStream { inner: PinStream } #[cfg(feature = "use_tokio")] impl PinValueStream { + unsafe_pinned!(inner: PinStream); + + fn new(inner: PinStream) -> Self { + PinValueStream { inner } + } + #[inline] fn get_value(&mut self) -> Result { - get_value_from_file(&mut self.0.evented.get_mut().devfile) + get_value_from_file(&mut self.inner.evented.get_mut().devfile) } } #[cfg(feature = "use_tokio")] impl Stream for PinValueStream { - type Item = u8; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.0.poll() { - Ok(Async::Ready(Some(()))) => { - let value = self.get_value()?; - Ok(Async::Ready(Some(value))) + type Item = Result; + + fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll> { + match self.as_mut().inner().poll_next(cx) { + Poll::Ready(Some(res)) => { + match res { + Ok(_) => { + let value = self.get_value(); + Poll::Ready(Some(value)) + } + Err(e) => Poll::Ready(Some(Err(e))) + } } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Err(e), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } } From d6f3d85739bef6395f6929891d08791cb298dea2 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Tue, 10 Mar 2020 22:43:21 +0000 Subject: [PATCH 2/6] Strip dead code --- src/lib.rs | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4328caf1f..9e992d735 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,7 @@ use nix::unistd::close; #[cfg(feature = "use_tokio")] use pin_utils::unsafe_pinned; #[cfg(feature = "use_tokio")] -use tokio::{io::PollEvented, runtime::Handle}; +use tokio::io::PollEvented; pub use error::Error; @@ -482,22 +482,6 @@ impl Pin { PinStream::init(self.clone()) } - /// Get a Stream of pin values for this pin - /// - /// The PinStream object can be used with the `tokio` crate. You should probably call - /// `set_edge(Edge::BothEdges)` before using this. - /// - /// Note that the values produced are the value of the pin as soon as we get to handling the - /// interrupt in userspace. Each time this stream produces a value, a change has occurred, but - /// it could end up producing the same value multiple times if the value has changed back - /// between when the interrupt occurred and when the value was read. - /// - /// This method is only available when the `use_tokio` crate feature is enabled. - #[cfg(feature = "use_tokio")] - pub fn get_value_stream_with_handle(&self, _handle: &Handle) -> Result { - Ok(PinValueStream::new(PinStream::init(self.clone())?)) - } - /// Get a Stream of pin values for this pin /// /// The PinStream object can be used with the `tokio` crate. You should probably call From 9f33272dfe8fbd4e498688b97c7f3a3316b72699 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Wed, 11 Mar 2020 10:28:16 +0000 Subject: [PATCH 3/6] Guard pin behind tokio feature --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 9e992d735..39b748d9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,7 @@ use std::io::prelude::*; use std::io::SeekFrom; use std::os::unix::prelude::*; use std::path::Path; +#[cfg(feature = "use_tokio")] use std::pin::{Pin as SPin}; #[cfg(feature = "use_tokio")] From c412bdc6d18910db8d66cea3db2dbad96af780b2 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Fri, 13 Mar 2020 15:56:16 +0000 Subject: [PATCH 4/6] switch to pin-project-lite --- Cargo.toml | 2 +- src/lib.rs | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b634f07f6..1eb6501db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,4 +18,4 @@ futures = { version = "0.3", optional = true } nix = "0.14.0" mio = { version = "0.6", optional = true } tokio = { version = "0.2", optional = true, features = ["io-driver"] } -pin-utils = "0.1.0-alpha.4" +pin-project-lite = "0.1.4" diff --git a/src/lib.rs b/src/lib.rs index 39b748d9f..92787fef9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,7 @@ extern crate futures; extern crate mio; extern crate nix; #[cfg(feature = "use_tokio")] -extern crate pin_utils; +extern crate pin_project_lite; #[cfg(feature = "use_tokio")] extern crate tokio; @@ -73,7 +73,7 @@ use mio::{Evented, Ready}; use nix::sys::epoll::*; use nix::unistd::close; #[cfg(feature = "use_tokio")] -use pin_utils::unsafe_pinned; +use pin_project_lite::pin_project; #[cfg(feature = "use_tokio")] use tokio::io::PollEvented; @@ -683,12 +683,16 @@ impl Stream for PinStream { } #[cfg(feature = "use_tokio")] -pub struct PinValueStream { inner: PinStream } +pin_project! { + pub struct PinValueStream { + #[pin] + inner: PinStream + } +} #[cfg(feature = "use_tokio")] impl PinValueStream { - unsafe_pinned!(inner: PinStream); - + fn new(inner: PinStream) -> Self { PinValueStream { inner } } @@ -704,7 +708,7 @@ impl Stream for PinValueStream { type Item = Result; fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll> { - match self.as_mut().inner().poll_next(cx) { + match self.as_mut().project().inner.poll_next(cx) { Poll::Ready(Some(res)) => { match res { Ok(_) => { From ed8ba13a792e04c6a1b003a8e7f489fb585c1fbd Mon Sep 17 00:00:00 2001 From: Paul Osborne Date: Sun, 2 Aug 2020 22:56:18 -0500 Subject: [PATCH 5/6] Bump MSRV to 1.39.0 for newer tokio stuff Signed-off-by: Paul Osborne --- .travis.yml | 2 +- Cargo.toml | 1 + README.md | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index f5991441d..b3d154c7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: rust rust: - - 1.31.1 + - 1.39.0 - stable - beta - nightly diff --git a/Cargo.toml b/Cargo.toml index 1eb6501db..34548e813 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ homepage = "https://github.com/rust-embedded/rust-sysfs-gpio" documentation = "http://rust-embedded.github.io/rust-sysfs-gpio/" description = "Provides access to GPIOs using the Linux sysfs interface." readme = "README.md" +edition = "2018" [features] mio-evented = ["mio"] diff --git a/README.md b/README.md index 1ae30cb13..b3b9600e4 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,7 @@ The following features are planned for the library: ## Minimum Supported Rust Version (MSRV) -This crate is guaranteed to compile on stable Rust 1.31 and up. It *might* +This crate is guaranteed to compile on stable Rust 1.39 and up. It *might* compile with older versions but that may change in any new patch release. ## Cross Compiling From 29c45ca88ab2ef6be6be21340aed239d12ffa98a Mon Sep 17 00:00:00 2001 From: Paul Osborne Date: Sun, 2 Aug 2020 22:56:35 -0500 Subject: [PATCH 6/6] Fix and cleanup tokio example using async/await Signed-off-by: Paul Osborne --- Cargo.toml | 7 +++++ examples/tokio.rs | 67 +++++++++++++++++------------------------------ 2 files changed, 31 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 34548e813..c56e85625 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,10 @@ nix = "0.14.0" mio = { version = "0.6", optional = true } tokio = { version = "0.2", optional = true, features = ["io-driver"] } pin-project-lite = "0.1.4" + +[dev-dependencies] +tokio = { version = "0.2", features = ["macros", "rt-core"] } + +[[example]] +name = "tokio" +required-features = ["use_tokio"] diff --git a/examples/tokio.rs b/examples/tokio.rs index 9395e9c51..94d80cee2 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -1,51 +1,37 @@ -#[cfg(feature = "use_tokio")] -extern crate futures; -#[cfg(feature = "use_tokio")] -extern crate sysfs_gpio; -#[cfg(feature = "use_tokio")] -extern crate tokio; +// Copyright (c) 2020. The sysfs-gpio Authors. -#[cfg(feature = "use_tokio")] +use futures::future::join_all; +use futures::StreamExt; use std::env; - -#[cfg(feature = "use_tokio")] -use futures::{lazy, Future, Stream}; - -#[cfg(feature = "use_tokio")] use sysfs_gpio::{Direction, Edge, Pin}; -#[cfg(feature = "use_tokio")] -fn stream(pin_nums: Vec) -> sysfs_gpio::Result<()> { +async fn monitor_pin(pin: Pin) -> Result<(), sysfs_gpio::Error> { + pin.export()?; + pin.set_direction(Direction::In)?; + pin.set_edge(Edge::BothEdges)?; + let mut gpio_events = pin.get_value_stream()?; + while let Some(evt) = gpio_events.next().await { + let val = evt.unwrap(); + println!("Pin {} changed value to {}", pin.get_pin_num(), val); + } + Ok(()) +} + +async fn stream(pin_nums: Vec) { // NOTE: this currently runs forever and as such if // the app is stopped (Ctrl-C), no cleanup will happen // and the GPIO will be left exported. Not much // can be done about this as Rust signal handling isn't // really present at the moment. Revisit later. - let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect(); - let task = lazy(move || { - for &(i, ref pin) in pins.iter() { - pin.export().unwrap(); - pin.set_direction(Direction::In).unwrap(); - pin.set_edge(Edge::BothEdges).unwrap(); - tokio::spawn( - pin.get_value_stream() - .unwrap() - .for_each(move |val| { - println!("Pin {} changed value to {}", i, val); - Ok(()) - }) - .map_err(|_| ()), - ); - } - Ok(()) - }); - tokio::run(task); - - Ok(()) + join_all(pin_nums.into_iter().map(|p| { + let pin = Pin::new(p); + tokio::task::spawn(monitor_pin(pin)) + })) + .await; } -#[cfg(feature = "use_tokio")] -fn main() { +#[tokio::main] +async fn main() { let pins: Vec = env::args() .skip(1) .map(|a| a.parse().expect("Pins must be specified as integers")) @@ -53,11 +39,6 @@ fn main() { if pins.is_empty() { println!("Usage: ./tokio [pin ...]"); } else { - stream(pins).unwrap(); + stream(pins).await; } } - -#[cfg(not(feature = "use_tokio"))] -fn main() { - println!("This example requires the `tokio` feature to be enabled."); -}