Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate to tokio 0.3 #41

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/rodbus-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ crate-type = ["cdylib"]
[dependencies]
rodbus = { path = "../../rodbus" }
log = { version = "0.4", features = ["std"] }
tokio = { version = "^0.2.11", features = ["rt-threaded"]}
tokio = { version = "0.3", features = ["rt-multi-thread"]}

[build-dependencies]
rodbus-schema = { path = "../rodbus-schema" }
Expand Down
15 changes: 11 additions & 4 deletions ffi/rodbus-ffi/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ptr::null_mut;

pub struct Channel {
pub(crate) inner: rodbus::client::channel::Channel,
pub(crate) runtime: tokio::runtime::Handle,
pub(crate) runtime: crate::Runtime,
}

pub(crate) unsafe fn create_tcp_client(
Expand All @@ -25,11 +25,11 @@ pub(crate) unsafe fn create_tcp_client(
rodbus::client::channel::strategy::default(),
);

rt.spawn(task);
rt.inner.spawn(task);

Box::into_raw(Box::new(Channel {
inner: handle,
runtime: rt.handle().clone(),
runtime: rt.clone(),
}))
}

Expand Down Expand Up @@ -66,7 +66,7 @@ pub(crate) unsafe fn channel_read_coils_async(
let mut session = param.build_session(channel);

channel
.runtime
.runtime.inner
.block_on(session.read_coils(range, callback));
}

Expand Down Expand Up @@ -98,6 +98,7 @@ pub(crate) unsafe fn channel_read_discrete_inputs_async(

channel
.runtime
.inner
.block_on(session.read_discrete_inputs(range, callback));
}

Expand Down Expand Up @@ -129,6 +130,7 @@ pub(crate) unsafe fn channel_read_holding_registers_async(

channel
.runtime
.inner
.block_on(session.read_holding_registers(range, callback));
}

Expand Down Expand Up @@ -160,6 +162,7 @@ pub(crate) unsafe fn channel_read_input_registers_async(

channel
.runtime
.inner
.block_on(session.read_input_registers(range, callback));
}

Expand All @@ -181,6 +184,7 @@ pub(crate) unsafe fn channel_write_single_coil_async(

channel
.runtime
.inner
.block_on(session.write_single_coil(bit.into(), callback.convert_to_fn_once()));
}

Expand All @@ -202,6 +206,7 @@ pub(crate) unsafe fn channel_write_single_register_async(

channel
.runtime
.inner
.block_on(session.write_single_register(register.into(), callback.convert_to_fn_once()));
}

Expand Down Expand Up @@ -242,6 +247,7 @@ pub(crate) unsafe fn channel_write_multiple_coils_async(

channel
.runtime
.inner
.block_on(session.write_multiple_coils(argument, callback));
}

Expand Down Expand Up @@ -282,5 +288,6 @@ pub(crate) unsafe fn channel_write_multiple_registers_async(

channel
.runtime
.inner
.block_on(session.write_multiple_registers(argument, callback));
}
17 changes: 11 additions & 6 deletions ffi/rodbus-ffi/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
pub use tokio::runtime::Runtime;
use std::sync::Arc;

#[derive(Clone)]
pub struct Runtime {
pub(crate) inner: Arc<tokio::runtime::Runtime>
}

pub(crate) unsafe fn runtime_new(
config: Option<&crate::ffi::RuntimeConfig>,
) -> *mut tokio::runtime::Runtime {
let mut builder = tokio::runtime::Builder::new();
) -> *mut crate::Runtime {
let mut builder = tokio::runtime::Builder::new_multi_thread();

builder.enable_all().threaded_scheduler();
builder.enable_all();

if let Some(x) = config.as_ref() {
if x.num_core_threads > 0 {
builder.core_threads(x.num_core_threads as usize);
builder.worker_threads(x.num_core_threads as usize);
}
}

match builder.build() {
Ok(r) => Box::into_raw(Box::new(r)),
Ok(r) => Box::into_raw(Box::new(crate::Runtime { inner: Arc::new(r) })),
Err(_) => std::ptr::null_mut(),
}
}
Expand Down
10 changes: 5 additions & 5 deletions ffi/rodbus-ffi/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl RequestHandler for RequestHandlerWrapper {
}

pub struct Server {
runtime: tokio::runtime::Handle,
runtime: crate::Runtime,
// never used but we have to hang onto it otherwise the server shuts down
_server: rodbus::shutdown::TaskHandle,
map: ServerHandlerMap<RequestHandlerWrapper>,
Expand Down Expand Up @@ -190,7 +190,7 @@ pub(crate) unsafe fn create_tcp_server(
};

// at this point, we know that all the arguments are good, so we can go ahead and try to bind a listener
let listener = match runtime.block_on(TcpListener::bind(address)) {
let listener = match runtime.inner.block_on(TcpListener::bind(address)) {
Ok(x) => x,
Err(err) => {
log::error!("error binding listener: {}", err);
Expand All @@ -207,11 +207,11 @@ pub(crate) unsafe fn create_tcp_server(
listener,
handler_map.clone(),
);
let join_handle = runtime.spawn(task);
let join_handle = runtime.inner.spawn(task);

let server_handle = Server {
_server: TaskHandle::new(tx, join_handle),
runtime: runtime.handle().clone(),
runtime: runtime.clone(),
map: handler_map,
};

Expand Down Expand Up @@ -244,7 +244,7 @@ pub(crate) unsafe fn server_update_database(
transaction.callback(&mut lock.database);
};

server.runtime.block_on(transaction);
server.runtime.inner.block_on(transaction);

true
}
Expand Down
2 changes: 1 addition & 1 deletion rodbus-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ path = "src/main.rs"
[dependencies]
rodbus = { path = "../rodbus", version = "0.1.1" }
clap = "2.33"
tokio = { version = "^0.2.11", features = ["macros", "time"] }
tokio = { version = "0.3", features = ["macros", "time"] }
simple_logger = "1.9"
log = "0.4"
4 changes: 2 additions & 2 deletions rodbus-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Args {
}
}

#[tokio::main(basic_scheduler)]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// print log messages to the console
SimpleLogger::new()
Expand All @@ -74,7 +74,7 @@ async fn run() -> Result<(), Error> {
None => run_command(&args.command, &mut session).await,
Some(period) => loop {
run_command(&args.command, &mut session).await?;
tokio::time::delay_for(period).await
tokio::time::sleep(period).await
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions rodbus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ codecov = { repository = "automatak/rodbus", branch = "master", service = "githu
maintenance = { status = "actively-developed" }

[dependencies]
tokio = { version = "^0.2.20", features = ["tcp", "sync", "io-util", "time", "rt-core", "rt-threaded", "macros"]}
tokio = { version = "0.3", features = ["net", "sync", "io-util", "time", "rt", "rt-multi-thread", "macros"]}
log = "0.4"
no-panic = { version = "0.1", optional = true }

[dev-dependencies]
tokio-test = "0.2"
tokio-test = "0.3"
simple_logger = "1.9"
2 changes: 1 addition & 1 deletion rodbus/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use rodbus::prelude::*;

#[tokio::main(basic_scheduler)]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
// Create a channel
let channel = spawn_tcp_client_task("127.0.0.1:502".parse().unwrap(), 1, strategy::default());
Expand Down
2 changes: 1 addition & 1 deletion rodbus/examples/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl RequestHandler for Handler {
}
}

#[tokio::main(threaded_scheduler)]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = std::env::args().collect();

Expand Down
4 changes: 2 additions & 2 deletions rodbus/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl RequestHandler for SimpleHandler {
}
}

#[tokio::main(threaded_scheduler)]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = std::env::args().collect();

Expand Down Expand Up @@ -127,6 +127,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
*c = !*c;
}
}
tokio::time::delay_until(next).await;
tokio::time::sleep_until(next).await;
}
}
8 changes: 4 additions & 4 deletions rodbus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
//!use std::str::FromStr;
//!
//!
//!use tokio::time::delay_for;
//!use tokio::time::sleep;
//!
//!#[tokio::main]
//!async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -70,7 +70,7 @@
//! Err(err) => println!("Error: {:?}", err)
//! }
//!
//! delay_for(std::time::Duration::from_secs(3)).await
//! sleep(std::time::Duration::from_secs(3)).await
//! }
//!}
//! ```
Expand Down Expand Up @@ -102,7 +102,7 @@
//! }
//! }
//!
//! #[tokio::main(threaded_scheduler)]
//! #[tokio::main(flavor = "multi_thread")]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!
//! let handler = CoilsOnlyHandler::new().wrap();
Expand All @@ -129,7 +129,7 @@
//! *c = !*c;
//! }
//! }
//! tokio::time::delay_until(next).await;
//! tokio::time::sleep_until(next).await;
//! }
//!}
//!```
Expand Down
2 changes: 1 addition & 1 deletion rodbus/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,6 @@ async fn test_requests_and_responses() {

#[test]
fn can_read_and_write_values() {
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(test_requests_and_responses())
}