From 6c01b0d2efce08aa07f69ccc36cf941535601221 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Mon, 2 Nov 2020 09:12:19 -0800 Subject: [PATCH] migrate to tokio 0.3 --- ffi/rodbus-ffi/Cargo.toml | 2 +- ffi/rodbus-ffi/src/channel.rs | 15 +++++++++++---- ffi/rodbus-ffi/src/runtime.rs | 17 +++++++++++------ ffi/rodbus-ffi/src/server.rs | 10 +++++----- rodbus-client/Cargo.toml | 2 +- rodbus-client/src/main.rs | 4 ++-- rodbus/Cargo.toml | 4 ++-- rodbus/examples/client.rs | 2 +- rodbus/examples/perf.rs | 2 +- rodbus/examples/server.rs | 4 ++-- rodbus/src/lib.rs | 8 ++++---- rodbus/tests/integration_test.rs | 2 +- 12 files changed, 42 insertions(+), 30 deletions(-) diff --git a/ffi/rodbus-ffi/Cargo.toml b/ffi/rodbus-ffi/Cargo.toml index 04aebc47..83410c15 100644 --- a/ffi/rodbus-ffi/Cargo.toml +++ b/ffi/rodbus-ffi/Cargo.toml @@ -16,7 +16,7 @@ crate-type = ["cdylib"] [dependencies] rodbus = { path = "../../rodbus" } log = { version = "0.4", features = ["std"] } -tokio = { version = "^0.2.11", features = ["rt-threaded"]} +tokio = { version = "0.3", features = ["rt-multi-thread"]} [build-dependencies] rodbus-schema = { path = "../rodbus-schema" } diff --git a/ffi/rodbus-ffi/src/channel.rs b/ffi/rodbus-ffi/src/channel.rs index c5d7e959..74cf80e9 100644 --- a/ffi/rodbus-ffi/src/channel.rs +++ b/ffi/rodbus-ffi/src/channel.rs @@ -3,7 +3,7 @@ use std::ptr::null_mut; pub struct Channel { pub(crate) inner: rodbus::client::channel::Channel, - pub(crate) runtime: tokio::runtime::Handle, + pub(crate) runtime: crate::Runtime, } pub(crate) unsafe fn create_tcp_client( @@ -25,11 +25,11 @@ pub(crate) unsafe fn create_tcp_client( rodbus::client::channel::strategy::default(), ); - rt.spawn(task); + rt.inner.spawn(task); Box::into_raw(Box::new(Channel { inner: handle, - runtime: rt.handle().clone(), + runtime: rt.clone(), })) } @@ -66,7 +66,7 @@ pub(crate) unsafe fn channel_read_coils_async( let mut session = param.build_session(channel); channel - .runtime + .runtime.inner .block_on(session.read_coils(range, callback)); } @@ -98,6 +98,7 @@ pub(crate) unsafe fn channel_read_discrete_inputs_async( channel .runtime + .inner .block_on(session.read_discrete_inputs(range, callback)); } @@ -129,6 +130,7 @@ pub(crate) unsafe fn channel_read_holding_registers_async( channel .runtime + .inner .block_on(session.read_holding_registers(range, callback)); } @@ -160,6 +162,7 @@ pub(crate) unsafe fn channel_read_input_registers_async( channel .runtime + .inner .block_on(session.read_input_registers(range, callback)); } @@ -181,6 +184,7 @@ pub(crate) unsafe fn channel_write_single_coil_async( channel .runtime + .inner .block_on(session.write_single_coil(bit.into(), callback.convert_to_fn_once())); } @@ -202,6 +206,7 @@ pub(crate) unsafe fn channel_write_single_register_async( channel .runtime + .inner .block_on(session.write_single_register(register.into(), callback.convert_to_fn_once())); } @@ -242,6 +247,7 @@ pub(crate) unsafe fn channel_write_multiple_coils_async( channel .runtime + .inner .block_on(session.write_multiple_coils(argument, callback)); } @@ -282,5 +288,6 @@ pub(crate) unsafe fn channel_write_multiple_registers_async( channel .runtime + .inner .block_on(session.write_multiple_registers(argument, callback)); } diff --git a/ffi/rodbus-ffi/src/runtime.rs b/ffi/rodbus-ffi/src/runtime.rs index d3a425cd..113a5873 100644 --- a/ffi/rodbus-ffi/src/runtime.rs +++ b/ffi/rodbus-ffi/src/runtime.rs @@ -1,20 +1,25 @@ -pub use tokio::runtime::Runtime; +use std::sync::Arc; + +#[derive(Clone)] +pub struct Runtime { + pub(crate) inner: Arc +} pub(crate) unsafe fn runtime_new( config: Option<&crate::ffi::RuntimeConfig>, -) -> *mut tokio::runtime::Runtime { - let mut builder = tokio::runtime::Builder::new(); +) -> *mut crate::Runtime { + let mut builder = tokio::runtime::Builder::new_multi_thread(); - builder.enable_all().threaded_scheduler(); + builder.enable_all(); if let Some(x) = config.as_ref() { if x.num_core_threads > 0 { - builder.core_threads(x.num_core_threads as usize); + builder.worker_threads(x.num_core_threads as usize); } } match builder.build() { - Ok(r) => Box::into_raw(Box::new(r)), + Ok(r) => Box::into_raw(Box::new(crate::Runtime { inner: Arc::new(r) })), Err(_) => std::ptr::null_mut(), } } diff --git a/ffi/rodbus-ffi/src/server.rs b/ffi/rodbus-ffi/src/server.rs index 6714bc21..fb2b306a 100644 --- a/ffi/rodbus-ffi/src/server.rs +++ b/ffi/rodbus-ffi/src/server.rs @@ -120,7 +120,7 @@ impl RequestHandler for RequestHandlerWrapper { } pub struct Server { - runtime: tokio::runtime::Handle, + runtime: crate::Runtime, // never used but we have to hang onto it otherwise the server shuts down _server: rodbus::shutdown::TaskHandle, map: ServerHandlerMap, @@ -190,7 +190,7 @@ pub(crate) unsafe fn create_tcp_server( }; // at this point, we know that all the arguments are good, so we can go ahead and try to bind a listener - let listener = match runtime.block_on(TcpListener::bind(address)) { + let listener = match runtime.inner.block_on(TcpListener::bind(address)) { Ok(x) => x, Err(err) => { log::error!("error binding listener: {}", err); @@ -207,11 +207,11 @@ pub(crate) unsafe fn create_tcp_server( listener, handler_map.clone(), ); - let join_handle = runtime.spawn(task); + let join_handle = runtime.inner.spawn(task); let server_handle = Server { _server: TaskHandle::new(tx, join_handle), - runtime: runtime.handle().clone(), + runtime: runtime.clone(), map: handler_map, }; @@ -244,7 +244,7 @@ pub(crate) unsafe fn server_update_database( transaction.callback(&mut lock.database); }; - server.runtime.block_on(transaction); + server.runtime.inner.block_on(transaction); true } diff --git a/rodbus-client/Cargo.toml b/rodbus-client/Cargo.toml index 9cab75af..2503b052 100644 --- a/rodbus-client/Cargo.toml +++ b/rodbus-client/Cargo.toml @@ -17,6 +17,6 @@ path = "src/main.rs" [dependencies] rodbus = { path = "../rodbus", version = "0.1.1" } clap = "2.33" -tokio = { version = "^0.2.11", features = ["macros", "time"] } +tokio = { version = "0.3", features = ["macros", "time"] } simple_logger = "1.9" log = "0.4" diff --git a/rodbus-client/src/main.rs b/rodbus-client/src/main.rs index e450db7c..70e99089 100644 --- a/rodbus-client/src/main.rs +++ b/rodbus-client/src/main.rs @@ -50,7 +50,7 @@ impl Args { } } -#[tokio::main(basic_scheduler)] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { // print log messages to the console SimpleLogger::new() @@ -74,7 +74,7 @@ async fn run() -> Result<(), Error> { None => run_command(&args.command, &mut session).await, Some(period) => loop { run_command(&args.command, &mut session).await?; - tokio::time::delay_for(period).await + tokio::time::sleep(period).await }, } } diff --git a/rodbus/Cargo.toml b/rodbus/Cargo.toml index 3917589c..504a8d85 100644 --- a/rodbus/Cargo.toml +++ b/rodbus/Cargo.toml @@ -15,10 +15,10 @@ codecov = { repository = "automatak/rodbus", branch = "master", service = "githu maintenance = { status = "actively-developed" } [dependencies] -tokio = { version = "^0.2.20", features = ["tcp", "sync", "io-util", "time", "rt-core", "rt-threaded", "macros"]} +tokio = { version = "0.3", features = ["net", "sync", "io-util", "time", "rt", "rt-multi-thread", "macros"]} log = "0.4" no-panic = { version = "0.1", optional = true } [dev-dependencies] -tokio-test = "0.2" +tokio-test = "0.3" simple_logger = "1.9" \ No newline at end of file diff --git a/rodbus/examples/client.rs b/rodbus/examples/client.rs index d7969995..16805377 100644 --- a/rodbus/examples/client.rs +++ b/rodbus/examples/client.rs @@ -3,7 +3,7 @@ use std::time::Duration; use rodbus::prelude::*; -#[tokio::main(basic_scheduler)] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { // Create a channel let channel = spawn_tcp_client_task("127.0.0.1:502".parse().unwrap(), 1, strategy::default()); diff --git a/rodbus/examples/perf.rs b/rodbus/examples/perf.rs index d2758cf0..2037e4cd 100644 --- a/rodbus/examples/perf.rs +++ b/rodbus/examples/perf.rs @@ -20,7 +20,7 @@ impl RequestHandler for Handler { } } -#[tokio::main(threaded_scheduler)] +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { let args: Vec = std::env::args().collect(); diff --git a/rodbus/examples/server.rs b/rodbus/examples/server.rs index 4e8f53fd..dda48655 100644 --- a/rodbus/examples/server.rs +++ b/rodbus/examples/server.rs @@ -82,7 +82,7 @@ impl RequestHandler for SimpleHandler { } } -#[tokio::main(threaded_scheduler)] +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { let args: Vec = std::env::args().collect(); @@ -127,6 +127,6 @@ async fn main() -> Result<(), Box> { *c = !*c; } } - tokio::time::delay_until(next).await; + tokio::time::sleep_until(next).await; } } diff --git a/rodbus/src/lib.rs b/rodbus/src/lib.rs index f181a889..a827d72e 100644 --- a/rodbus/src/lib.rs +++ b/rodbus/src/lib.rs @@ -43,7 +43,7 @@ //!use std::str::FromStr; //! //! -//!use tokio::time::delay_for; +//!use tokio::time::sleep; //! //!#[tokio::main] //!async fn main() -> Result<(), Box> { @@ -70,7 +70,7 @@ //! Err(err) => println!("Error: {:?}", err) //! } //! -//! delay_for(std::time::Duration::from_secs(3)).await +//! sleep(std::time::Duration::from_secs(3)).await //! } //!} //! ``` @@ -102,7 +102,7 @@ //! } //! } //! -//! #[tokio::main(threaded_scheduler)] +//! #[tokio::main(flavor = "multi_thread")] //! async fn main() -> Result<(), Box> { //! //! let handler = CoilsOnlyHandler::new().wrap(); @@ -129,7 +129,7 @@ //! *c = !*c; //! } //! } -//! tokio::time::delay_until(next).await; +//! tokio::time::sleep_until(next).await; //! } //!} //!``` diff --git a/rodbus/tests/integration_test.rs b/rodbus/tests/integration_test.rs index 25024261..2fda9cdb 100644 --- a/rodbus/tests/integration_test.rs +++ b/rodbus/tests/integration_test.rs @@ -212,6 +212,6 @@ async fn test_requests_and_responses() { #[test] fn can_read_and_write_values() { - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); rt.block_on(test_requests_and_responses()) }