Skip to content

Commit

Permalink
Using futures 0.3.x and tokio 0.2.9
Browse files Browse the repository at this point in the history
- Latest stable version for futures and tokio
- Updated code and tests where applicable
- Removed yamux submodule
- Updated rust to nightly-2020-01-08
  • Loading branch information
sdbondi committed Jan 14, 2020
1 parent 6bab35f commit b19173b
Show file tree
Hide file tree
Showing 117 changed files with 793 additions and 708 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: 2.1

defaults:
rust_image: &rust_image quay.io/tarilabs/rust_tari-build-with-deps:nightly-2019-10-04
rust_image: &rust_image quay.io/tarilabs/rust_tari-build-with-deps:nightly-2020-01-08

jobs:
test-docs:
Expand Down
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ members = [
"comms",
"comms/dht",
"comms/middleware",
# TODO: Remove this once tower filter (0.3.0-alpha.3) is released
"comms/middleware/tower-filter",
"digital_assets_layer/core",
"infrastructure/broadcast_channel",
"infrastructure/crypto",
Expand Down
4 changes: 2 additions & 2 deletions applications/console_text_messenger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ clap = "2.33.0"
config = { version = "0.9.3" }
crossbeam-channel = "0.3.8"
ctrlc = "3.1.3"
futures = { version = "=0.3.0-alpha.19", package = "futures-preview", features =["compat", "std"]}
futures = { version = "^0.3.1", features =["compat", "std"]}
log = { version = "0.4.0", features = ["std"] }
log4rs = {version ="0.8.3",features = ["console_appender", "file_appender", "file", "yaml_format"]}
pnet = "0.22.0"
serde = "1.0.90"
serde_derive = "1.0.90"
simple_logger = "1.2.0"
tokio = "0.2.0-alpha.6"
tokio = "0.2.9"
2 changes: 1 addition & 1 deletion applications/grpc_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ crossbeam-channel = "0.3.8"
bytes = "0.4"
derive-error = "0.0.4"
futures = "0.1"
futures03 = { version = "=0.3.0-alpha.18", package = "futures-preview", features =["compat", "std"]}
futures03 = { version = "^0.3.1", features =["compat", "std"]}
http = "0.1"
log = { version = "0.4.0", features = ["std"] }
prost = "0.5"
Expand Down
5 changes: 2 additions & 3 deletions applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ tari_mmr = { path = "../../base_layer/mmr", version = "^0.0" }
clap = "2.33.0"
config = { version = "0.9.3" }
dirs = "2.0.2"
futures-preview = { version = "=0.3.0-alpha.19", default-features = false, features = ["alloc"]}
futures = { version = "^0.3.1", default-features = false, features = ["alloc"]}
log = { version = "0.4.8", features = ["std"] }
log4rs = { version = "0.8.3", features = ["toml_format"] }
rand = "0.5.5"
serde_json = "1.0"
tokio = { version="=0.2.0-alpha.6", features = ["signal"] }
tokio-executor = { version ="=0.2.0-alpha.6", features = ["threadpool"] }
tokio = { version="0.2.9", features = ["signal"] }
14 changes: 7 additions & 7 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub fn create_and_save_id(path: &Path, control_addr: &str) -> Result<NodeIdentit
pub fn configure_and_initialize_node(
config: &GlobalConfig,
id: NodeIdentity,
rt: &Runtime,
rt: &mut Runtime,
) -> Result<(CommsNode, NodeType), String>
{
let id = Arc::new(id);
Expand All @@ -181,7 +181,7 @@ pub fn configure_and_initialize_node(
let diff_adj_manager = DiffAdjManager::new(db.clone()).map_err(|e| e.to_string())?;
rules.set_diff_manager(diff_adj_manager).map_err(|e| e.to_string())?;
let (comms, handles) =
setup_comms_services(&rt, id.clone(), peers, &config.peer_db_path, db.clone(), mempool, rules);
setup_comms_services(rt, id.clone(), peers, &config.peer_db_path, db.clone(), mempool, rules);
let outbound_interface = handles.get_handle::<OutboundNodeCommsInterface>().unwrap();
(
comms,
Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn configure_and_initialize_node(
let diff_adj_manager = DiffAdjManager::new(db.clone()).map_err(|e| e.to_string())?;
rules.set_diff_manager(diff_adj_manager).map_err(|e| e.to_string())?;
let (comms, handles) =
setup_comms_services(&rt, id.clone(), peers, &config.peer_db_path, db.clone(), mempool, rules);
setup_comms_services(rt, id.clone(), peers, &config.peer_db_path, db.clone(), mempool, rules);
let outbound_interface = handles.get_handle::<OutboundNodeCommsInterface>().unwrap();
(
comms,
Expand Down Expand Up @@ -281,7 +281,7 @@ fn assign_peers(seeds: &[String]) -> Vec<Peer> {
}

fn setup_comms_services<T>(
rt: &Runtime,
rt: &mut Runtime,
id: Arc<NodeIdentity>,
peers: Vec<Peer>,
peer_db_path: &str,
Expand All @@ -293,7 +293,7 @@ where
T: BlockchainBackend + 'static,
{
let node_config = BaseNodeServiceConfig::default(); // TODO - make this configurable
let (publisher, subscription_factory) = pubsub_connector(rt.executor(), 100);
let (publisher, subscription_factory) = pubsub_connector(rt.handle().clone(), 100);
let subscription_factory = Arc::new(subscription_factory);
let comms_config = CommsConfig {
node_identity: id.clone(),
Expand All @@ -314,14 +314,14 @@ where
dht: Default::default(), // TODO - make this configurable
};

let (comms, dht) = initialize_comms(rt.executor(), comms_config, publisher).unwrap();
let (comms, dht) = initialize_comms(rt.handle().clone(), comms_config, publisher).unwrap();

for p in peers {
debug!(target: LOG_TARGET, "Adding seed peer [{}]", p.node_id);
comms.peer_manager().add_peer(p).unwrap();
}

let fut = StackBuilder::new(rt.executor(), comms.shutdown_signal())
let fut = StackBuilder::new(rt.handle().clone(), comms.shutdown_signal())
.add_initializer(CommsOutboundServiceInitializer::new(dht.outbound_requester()))
.add_initializer(BaseNodeServiceInitializer::new(
subscription_factory,
Expand Down
31 changes: 13 additions & 18 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ mod cli;
mod consts;

use crate::builder::{create_and_save_id, load_identity};
use futures::{future, StreamExt};
use log::*;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tari_common::{load_configuration, GlobalConfig};
use tari_utilities::hex::Hex;
use tokio::{net::signal, runtime::Runtime};
use tokio::{runtime::Runtime, signal, task};

const LOG_TARGET: &str = "base_node::app";

Expand Down Expand Up @@ -111,7 +110,7 @@ fn main() {
};

// Set up the Tokio runtime
let rt = match setup_runtime(&node_config) {
let mut rt = match setup_runtime(&node_config) {
Ok(rt) => rt,
Err(s) => {
error!(target: LOG_TARGET, "{}", s);
Expand All @@ -120,7 +119,7 @@ fn main() {
};

// Build, node, build!
let (comms, node) = match builder::configure_and_initialize_node(&node_config, node_id, &rt) {
let (comms, node) = match builder::configure_and_initialize_node(&node_config, node_id, &mut rt) {
Ok(n) => n,
Err(e) => {
error!(target: LOG_TARGET, "Could not instantiate node instance. {}", e);
Expand All @@ -130,10 +129,7 @@ fn main() {

// Configure the shutdown daemon to listen for CTRL-C
let flag = node.get_flag();
if let Err(e) = handle_ctrl_c(&rt, flag) {
error!(target: LOG_TARGET, "Could not configure Ctrl-C handling. {}", e);
return;
};
handle_ctrl_c(&rt, flag);

// Run, node, run!
let main = async move {
Expand All @@ -151,8 +147,7 @@ fn main() {
),
}
};
rt.spawn(main);
rt.shutdown_on_idle();
rt.block_on(main);
println!("Goodbye!");
}

Expand All @@ -167,23 +162,23 @@ fn setup_runtime(config: &GlobalConfig) -> Result<Runtime, String> {
num_blocking_threads
);
tokio::runtime::Builder::new()
.blocking_threads(num_blocking_threads)
.threaded_scheduler()
.enable_all()
.max_threads(num_core_threads + num_blocking_threads)
.core_threads(num_core_threads)
.build()
.map_err(|e| format!("There was an error while building the node runtime. {}", e.to_string()))
}

/// Set the interrupt flag on the node when Ctrl-C is entered
fn handle_ctrl_c(rt: &Runtime, flag: Arc<AtomicBool>) -> Result<(), String> {
let ctrl_c = signal::ctrl_c().map_err(|e| e.to_string())?;
let s = ctrl_c.take(1).for_each(move |_| {
fn handle_ctrl_c(rt: &Runtime, flag: Arc<AtomicBool>) -> task::JoinHandle<Result<(), String>> {
rt.spawn(async move {
signal::ctrl_c().await.map_err(|e| e.to_string())?;
info!(
target: LOG_TARGET,
"Termination signal received from user. Shutting node down."
);
flag.store(true, Ordering::SeqCst);
future::ready(())
});
rt.spawn(s);
Ok(())
Ok(())
})
}
5 changes: 2 additions & 3 deletions applications/tari_testnet_miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ edition = "2018"
[dependencies]
log = { version = "0.4.8", features = ["std"] }
tonic = "0.1.0-alpha.3"
tokio-executor= "=0.2.0-alpha.6"
bytes = "0.4"
prost = "0.5"
tokio = "=0.2.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.19", default-features = false, features = ["alloc"]}
tokio = "0.2.9"
futures= { version = "^0.3.1", default-features = false, features = ["alloc"]}
async-stream = "0.1.2"
http = "0.1"
tower = "=0.3.0-alpha.2"
Expand Down
6 changes: 3 additions & 3 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ blake2 = "^0.8.0"
bigint = "^4.4.1"
ttl_cache = "0.5.1"
croaring = "=0.3.9"
tokio = { version="^0.2.0-alpha.6" }
tokio-executor = { version ="^0.2.0-alpha.6", features = ["threadpool"] }
futures-preview = {version = "0.3.0-alpha.19", features = ["async-await"] }
tokio = { version="^0.2" }
futures = {version = "^0.3.1", features = ["async-await"] }
lmdb-zero = "0.4.4"
tower-service = { version="0.3.0-alpha.2" }
crossbeam-channel = "0.3.8"
Expand All @@ -60,6 +59,7 @@ tari_p2p = {path = "../../base_layer/p2p", version = "^0.0", features=["test-moc
tari_test_utils = { path = "../../infrastructure/test_utils", version = "^0.0" }
env_logger = "0.7.0"
tempdir = "0.3.7"
tokio-macros = "0.2.3"

[build-dependencies]
tari_protobuf_build = { version = "^0.0", path="../../infrastructure/protobuf_build"}
10 changes: 4 additions & 6 deletions base_layer/core/src/base_node/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

use std::time::{Duration, Instant};
use tokio::timer;
use std::time::Duration;
use tokio::time;

/// A simple back-off strategy. `BackOff` is typically used in situations where you want to retry an operation a
/// number of times, with an increasing delay between attempts
Expand Down Expand Up @@ -105,9 +105,7 @@ impl BackOff {
if self.is_finished() {
return;
}
let deadline = Instant::now() + self.delay;
let delay = timer::delay(deadline);
delay.await;
time::delay_for(self.delay).await;
self.current_attempts += 1;
self.delay = self.delay.mul_f64(self.backoff);
}
Expand All @@ -118,7 +116,7 @@ mod test {
use crate::base_node::BackOff;
use std::time::Duration;

#[tokio::test]
#[tokio_macros::test]
async fn retry() {
let mut retry = BackOff::new(3, Duration::from_millis(100), 1.5);
assert_eq!(retry.attempts(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tari_broadcast_channel as broadcast_channel;
use tari_p2p::services::liveness::LivenessHandle;
use tari_service_framework::{handles::ServiceHandlesFuture, ServiceInitializationError, ServiceInitializer};
use tari_shutdown::ShutdownSignal;
use tokio::runtime::TaskExecutor;
use tokio::runtime;

const BROADCAST_EVENT_BUFFER_SIZE: usize = 10;

Expand All @@ -40,7 +40,7 @@ impl ServiceInitializer for ChainMetadataServiceInitializer {

fn initialize(
&mut self,
executor: TaskExecutor,
executor: runtime::Handle,
handles_fut: ServiceHandlesFuture,
shutdown: ShutdownSignal,
) -> Self::Future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ mod test {
});
}

#[tokio::test]
#[tokio_macros::test]
async fn handle_liveness_event_ok() {
let (liveness_handle, _) = create_p2p_liveness_mock(1);
let mut metadata = Metadata::new();
Expand Down Expand Up @@ -344,7 +344,7 @@ mod test {
);
}

#[tokio::test]
#[tokio_macros::test]
async fn handle_liveness_event_no_metadata() {
let (liveness_handle, _) = create_p2p_liveness_mock(1);
let metadata = Metadata::new();
Expand All @@ -367,7 +367,7 @@ mod test {
assert_eq!(service.peer_chain_metadata.len(), 0);
}

#[tokio::test]
#[tokio_macros::test]
async fn handle_liveness_event_not_neighbour() {
let (liveness_handle, _) = create_p2p_liveness_mock(1);
let metadata = Metadata::new();
Expand All @@ -389,7 +389,7 @@ mod test {
assert_eq!(service.peer_chain_metadata.len(), 0);
}

#[tokio::test]
#[tokio_macros::test]
async fn handle_liveness_event_bad_metadata() {
let (liveness_handle, _) = create_p2p_liveness_mock(1);
let mut metadata = Metadata::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where T: BlockchainBackend
}

impl<T> InboundNodeCommsHandlers<T>
where T: BlockchainBackend
where T: BlockchainBackend + 'static
{
/// Construct a new InboundNodeCommsInterface.
pub fn new(
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use tari_service_framework::{
ServiceInitializer,
};
use tari_shutdown::ShutdownSignal;
use tokio::runtime::TaskExecutor;
use tokio::runtime;

const LOG_TARGET: &'static str = "base_node::service::initializer";

Expand Down Expand Up @@ -149,7 +149,7 @@ where T: BlockchainBackend + 'static

fn initialize(
&mut self,
executor: TaskExecutor,
executor: runtime::Handle,
handles_fut: ServiceHandlesFuture,
shutdown: ShutdownSignal,
) -> Self::Future
Expand Down
Loading

0 comments on commit b19173b

Please sign in to comment.