diff --git a/Cargo.lock b/Cargo.lock index fb57e72b0..a730d805f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2739,18 +2739,23 @@ dependencies = [ "kaspa-consensusmanager", "kaspa-core", "kaspa-database", + "kaspa-grpc-client", "kaspa-hashes", "kaspa-index-processor", "kaspa-math", "kaspa-merkle", "kaspa-muhash", "kaspa-pow", + "kaspa-rpc-core", "kaspa-txscript", "kaspa-txscript-errors", "kaspa-utils", "kaspa-utxoindex", + "kaspa-wrpc-server", + "kaspad", "log", "parking_lot", + "port-selector", "rand 0.8.5", "rand_distr 0.4.3", "rayon", @@ -3899,6 +3904,15 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "port-selector" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd119ef551a50cd8939f0ff93bd062891f7b0dbb771b4a05df8a9c13aebaff68" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "portable-atomic" version = "1.4.2" diff --git a/Cargo.toml b/Cargo.toml index 539569ba6..d788e8ce5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +resolver = "2" members = [ "daemon", "cli", @@ -49,7 +50,7 @@ members = [ "testing/integration", "utils", "rothschild", - "metrics/perf_monitor" + "metrics/perf_monitor", ] [workspace.package] @@ -188,9 +189,13 @@ xxhash-rust = { version = "0.8.6", features = ["xxh3"] } rand_core = { version = "0.6", features = ["std"] } bs58 = { version = "0.4", features = ["check"], default-features = false } hmac = { version = "0.12", default-features = false } -secp256k1 = { version = "0.24", features = ["global-context", "rand-std", "serde"] } +secp256k1 = { version = "0.24", features = [ + "global-context", + "rand-std", + "serde", +] } #sha2 = {version = "0.10", default-features = false} -zeroize = { version = "1", default-features = false, features=["alloc"] } +zeroize = { version = "1", default-features = false, features = ["alloc"] } ripemd = { version = "0.1", default-features = false } subtle = { version = "2", default-features = false } once_cell = { version = "1" } diff --git a/cli/src/modules/wallet.rs b/cli/src/modules/wallet.rs index 214a57231..a5b00b27a 100644 --- a/cli/src/modules/wallet.rs +++ b/cli/src/modules/wallet.rs @@ -50,7 +50,7 @@ impl Wallet { } "hint" => { if !argv.is_empty() { - let re = regex::Regex::new(r#"wallet\s+hint\s+"#).unwrap(); + let re = regex::Regex::new(r"wallet\s+hint\s+").unwrap(); let hint = re.replace(cmd, ""); let hint = hint.trim(); let store = ctx.store(); diff --git a/cli/src/notifier.rs b/cli/src/notifier.rs index 71fc4f6f8..51bd8b87b 100644 --- a/cli/src/notifier.rs +++ b/cli/src/notifier.rs @@ -20,6 +20,9 @@ struct Inner { current: Mutex>, } +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + #[derive(Clone)] pub struct Notifier { inner: Arc, diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index b966745e8..9131ab593 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -268,8 +268,6 @@ impl From for Params { pub const MAINNET_PARAMS: Params = Params { dns_seeders: &[ - // This DNS seeder is run by Wolfie - "mainnet-dnsseed.kas.pa", // This DNS seeder is run by Denis Mashkevich "mainnet-dnsseed-1.kaspanet.org", // This DNS seeder is run by Denis Mashkevich @@ -342,7 +340,6 @@ pub const MAINNET_PARAMS: Params = Params { pub const TESTNET_PARAMS: Params = Params { dns_seeders: &[ - "testnet-10-dnsseed.kas.pa", // This DNS seeder is run by Tiram "seeder1-testnet.kaspad.net", ], diff --git a/consensus/core/src/tx/script_public_key.rs b/consensus/core/src/tx/script_public_key.rs index 2cd5c6209..f25cc007c 100644 --- a/consensus/core/src/tx/script_public_key.rs +++ b/consensus/core/src/tx/script_public_key.rs @@ -221,9 +221,7 @@ impl<'de: 'a, 'a> Deserialize<'de> for ScriptPublicKey { } impl From> for u16 { fn from(value: Value<'_>) -> Self { - let Value::U16(v) = value else { - panic!("unexpected conversion: {value:?}") - }; + let Value::U16(v) = value else { panic!("unexpected conversion: {value:?}") }; v } } diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index aea4a958a..cab615e8f 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -382,8 +382,12 @@ impl ConsensusApi for Consensus { fn get_virtual_merge_depth_root(&self) -> Option { // TODO: consider saving the merge depth root as part of virtual state // TODO: unwrap on pruning_point and virtual state reads when staging consensus is implemented - let Some(pruning_point) = self.pruning_point_store.read().pruning_point().unwrap_option() else { return None; }; - let Some(virtual_state) = self.virtual_stores.read().state.get().unwrap_option() else { return None; }; + let Some(pruning_point) = self.pruning_point_store.read().pruning_point().unwrap_option() else { + return None; + }; + let Some(virtual_state) = self.virtual_stores.read().state.get().unwrap_option() else { + return None; + }; let virtual_ghostdag_data = &virtual_state.ghostdag_data; let root = self.services.depth_manager.calc_merge_depth_root(virtual_ghostdag_data, pruning_point); if root.is_origin() { @@ -685,7 +689,9 @@ impl ConsensusApi for Consensus { // to k blocks back and then we would be able to safely unwrap here. For now we // just break the loop, since if the data was truly missing we wouldn't accept // the staging consensus in the first place - let Some(parent) = self.ghostdag_primary_store.get_selected_parent(current).unwrap_option() else { break; }; + let Some(parent) = self.ghostdag_primary_store.get_selected_parent(current).unwrap_option() else { + break; + }; current = parent; } Ok(hashes) diff --git a/consensus/src/model/stores/relations.rs b/consensus/src/model/stores/relations.rs index 640aa7780..69b99a1ff 100644 --- a/consensus/src/model/stores/relations.rs +++ b/consensus/src/model/stores/relations.rs @@ -53,8 +53,8 @@ impl DbRelationsStore { } pub fn with_prefix(db: Arc, prefix: &[u8], cache_size: u64) -> Self { - let parents_prefix = prefix.iter().copied().chain(DatabaseStorePrefixes::RelationsParents.into_iter()).collect_vec(); - let children_prefix = prefix.iter().copied().chain(DatabaseStorePrefixes::RelationsChildren.into_iter()).collect_vec(); + let parents_prefix = prefix.iter().copied().chain(DatabaseStorePrefixes::RelationsParents).collect_vec(); + let children_prefix = prefix.iter().copied().chain(DatabaseStorePrefixes::RelationsChildren).collect_vec(); Self { db: Arc::clone(&db), parents_access: CachedDbAccess::new(Arc::clone(&db), cache_size, parents_prefix), diff --git a/consensus/src/pipeline/header_processor/processor.rs b/consensus/src/pipeline/header_processor/processor.rs index f51d05a5e..80637f4ca 100644 --- a/consensus/src/pipeline/header_processor/processor.rs +++ b/consensus/src/pipeline/header_processor/processor.rs @@ -212,7 +212,9 @@ impl HeaderProcessor { pub fn worker(self: &Arc) { while let Ok(msg) = self.receiver.recv() { match msg { - BlockProcessingMessage::Exit => break, + BlockProcessingMessage::Exit => { + break; + } BlockProcessingMessage::Process(task, result_transmitter) => { if let Some(task_id) = self.task_manager.register(task, result_transmitter) { let processor = self.clone(); diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index a004451e0..27ba5c26b 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -108,7 +108,9 @@ impl PruningProcessor { } pub fn worker(self: &Arc) { - let Ok(PruningProcessingMessage::Process { sink_ghostdag_data }) = self.receiver.recv() else { return; }; + let Ok(PruningProcessingMessage::Process { sink_ghostdag_data }) = self.receiver.recv() else { + return; + }; // On start-up, check if any pruning workflows require recovery. We wait for the first processing message to arrive // in order to make sure the node is already connected and receiving blocks before we start background recovery operations @@ -258,7 +260,9 @@ impl PruningProcessor { let mut counter = 0; let mut batch = WriteBatch::default(); for kept in keep_relations.iter().copied() { - let Some(ghostdag) = self.ghostdag_primary_store.get_data(kept).unwrap_option() else { continue; }; + let Some(ghostdag) = self.ghostdag_primary_store.get_data(kept).unwrap_option() else { + continue; + }; if ghostdag.unordered_mergeset().any(|h| !keep_relations.contains(&h)) { let mut mutable_ghostdag: ExternalGhostdagData = ghostdag.as_ref().into(); mutable_ghostdag.mergeset_blues.retain(|h| keep_relations.contains(h)); diff --git a/consensus/src/processes/sync/mod.rs b/consensus/src/processes/sync/mod.rs index 9650360ec..7b8480111 100644 --- a/consensus/src/processes/sync/mod.rs +++ b/consensus/src/processes/sync/mod.rs @@ -170,7 +170,9 @@ impl< let mut backward_iterator = self.reachability_service.backward_chain_iterator(high, pp, true); loop { // We loop from both directions in parallel in order to use the shorter path - let Some((parent, current)) = forward_iterator.next() else { break; }; + let Some((parent, current)) = forward_iterator.next() else { + break; + }; let status = self.statuses_store.read().get(current).unwrap(); if status.is_header_only() { // Going up, the first parent which has a header-only child is our target @@ -178,7 +180,9 @@ impl< break; } - let Some(backward_current) = backward_iterator.next() else { break; }; + let Some(backward_current) = backward_iterator.next() else { + break; + }; let status = self.statuses_store.read().get(backward_current).unwrap(); if status.has_block_body() { // Since this iterator is going down, current must be the highest with body diff --git a/consensus/src/processes/transaction_validator/transaction_validator_populated.rs b/consensus/src/processes/transaction_validator/transaction_validator_populated.rs index fbaf04687..5e0443164 100644 --- a/consensus/src/processes/transaction_validator/transaction_validator_populated.rs +++ b/consensus/src/processes/transaction_validator/transaction_validator_populated.rs @@ -568,7 +568,7 @@ mod tests { let secp = Secp256k1::new(); let (secret_key, public_key) = secp.generate_keypair(&mut rand::thread_rng()); let (public_key, _) = public_key.x_only_public_key(); - let script_pub_key = once(0x20).chain(public_key.serialize().into_iter()).chain(once(0xac)).collect_vec(); + let script_pub_key = once(0x20).chain(public_key.serialize()).chain(once(0xac)).collect_vec(); let script_pub_key = ScriptVec::from_slice(&script_pub_key); let prev_tx_id = TransactionId::from_str("880eb9819a31821d9d2399e2f35e2433b72637e393d71ecc9b8d0250f49153c3").unwrap(); diff --git a/kaspad/Cargo.toml b/kaspad/Cargo.toml index 6551e7438..422fbc919 100644 --- a/kaspad/Cargo.toml +++ b/kaspad/Cargo.toml @@ -7,6 +7,9 @@ authors.workspace = true include.workspace = true license.workspace = true +[lib] +crate-type = ["cdylib", "lib"] + [dependencies] kaspa-hashes.workspace = true kaspa-utils.workspace = true @@ -42,7 +45,7 @@ workflow-log.workspace = true dirs = "4.0" num_cpus.workspace = true -dhat = {version = "0.3.2", optional = true} +dhat = { version = "0.3.2", optional = true } [features] heap = ["dhat"] diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index f27c54584..506e08561 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -1,16 +1,32 @@ use clap::ArgAction; #[allow(unused)] use clap::{arg, command, Arg, Command}; -use kaspa_consensus::config::Config; + +use kaspa_consensus_core::{ + config::Config, + networktype::{NetworkId, NetworkType}, +}; use kaspa_core::kaspad_env::version; use kaspa_utils::networking::{ContextualNetAddress, IpAddress}; use kaspa_wrpc_server::address::WrpcNetAddress; -pub struct Defaults { - pub appdir: &'static str, +#[derive(Debug)] +pub struct Args { + // NOTE: it is best if property names match config file fields + pub appdir: Option, + pub logdir: Option, pub no_log_files: bool, + pub rpclisten: Option, + pub rpclisten_borsh: Option, + pub rpclisten_json: Option, pub unsafe_rpc: bool, + pub wrpc_verbose: bool, + pub log_level: String, pub async_threads: usize, + pub connect_peers: Vec, + pub add_peers: Vec, + pub listen: Option, + pub user_agent_comments: Vec, pub utxoindex: bool, pub reset_db: bool, pub outbound_target: usize, @@ -25,15 +41,18 @@ pub struct Defaults { pub archival: bool, pub sanity: bool, pub yes: bool, + pub externalip: Option, pub perf_metrics: bool, pub perf_metrics_interval_sec: u64, } -impl Default for Defaults { +impl Default for Args { fn default() -> Self { - Defaults { - appdir: "datadir", + Self { + appdir: Some("datadir".into()), no_log_files: false, + rpclisten_borsh: Some(WrpcNetAddress::Default), + rpclisten_json: Some(WrpcNetAddress::Default), unsafe_rpc: false, async_threads: num_cpus::get(), utxoindex: false, @@ -49,50 +68,46 @@ impl Default for Defaults { simnet: false, archival: false, sanity: false, + logdir: Some("".into()), + rpclisten: None, + wrpc_verbose: false, + log_level: "INFO".into(), + connect_peers: vec![], + add_peers: vec![], + listen: None, + user_agent_comments: vec![], yes: false, perf_metrics: false, perf_metrics_interval_sec: 1, + externalip: None, } } } -#[derive(Debug)] -pub struct Args { - // NOTE: it is best if property names match config file fields - pub appdir: Option, - pub logdir: Option, - pub no_log_files: bool, - pub rpclisten: Option, - pub rpclisten_borsh: Option, - pub rpclisten_json: Option, - pub unsafe_rpc: bool, - pub wrpc_verbose: bool, - pub log_level: String, - pub async_threads: usize, - pub connect_peers: Vec, - pub add_peers: Vec, - pub listen: Option, - pub user_agent_comments: Vec, - pub utxoindex: bool, - pub reset_db: bool, - pub outbound_target: usize, - pub inbound_limit: usize, - pub rpc_max_clients: usize, - pub enable_unsynced_mining: bool, - pub enable_mainnet_mining: bool, - pub testnet: bool, - pub testnet_suffix: u32, - pub devnet: bool, - pub simnet: bool, - pub archival: bool, - pub sanity: bool, - pub yes: bool, - pub externalip: Option, - pub perf_metrics: bool, - pub perf_metrics_interval_sec: u64, +impl Args { + pub fn apply_to_config(&self, config: &mut Config) { + config.utxoindex = self.utxoindex; + config.unsafe_rpc = self.unsafe_rpc; + config.enable_unsynced_mining = self.enable_unsynced_mining; + config.is_archival = self.archival; + // TODO: change to `config.enable_sanity_checks = self.sanity` when we reach stable versions + config.enable_sanity_checks = true; + config.user_agent_comments = self.user_agent_comments.clone(); + } + + pub fn network(&self) -> NetworkId { + match (self.testnet, self.devnet, self.simnet) { + (false, false, false) => NetworkType::Mainnet.into(), + (true, false, false) => NetworkId::with_suffix(NetworkType::Testnet, self.testnet_suffix), + (false, true, false) => NetworkType::Devnet.into(), + (false, false, true) => NetworkType::Simnet.into(), + _ => panic!("only a single net should be activated"), + } + } } -pub fn cli(defaults: &Defaults) -> Command { +pub fn cli() -> Command { + let defaults: Args = Default::default(); Command::new("kaspad") .about(format!("{} (rusty-kaspa) v{}", env!("CARGO_PKG_DESCRIPTION"), version())) .version(env!("CARGO_PKG_VERSION")) @@ -132,9 +147,10 @@ pub fn cli(defaults: &Defaults) -> Command { .value_name("IP[:PORT]") .num_args(0..=1) .require_equals(true) - .default_missing_value("default") + .default_missing_value("default") // TODO: Find a way to use defaults.rpclisten_borsh .value_parser(clap::value_parser!(WrpcNetAddress)) .help("Interface:port to listen for wRPC Borsh connections (default port: 17110, testnet: 17210)."), + ) .arg( Arg::new("rpclisten-json") @@ -142,7 +158,7 @@ pub fn cli(defaults: &Defaults) -> Command { .value_name("IP[:PORT]") .num_args(0..=1) .require_equals(true) - .default_missing_value("default") + .default_missing_value("default") // TODO: Find a way to use defaults.rpclisten_json .value_parser(clap::value_parser!(WrpcNetAddress)) .help("Interface:port to listen for wRPC JSON connections (default port: 18110, testnet: 18210)."), ) @@ -247,58 +263,45 @@ pub fn cli(defaults: &Defaults) -> Command { ) } -impl Args { - pub fn parse(defaults: &Defaults) -> Args { - let m = cli(defaults).get_matches(); - Args { - appdir: m.get_one::("appdir").cloned(), - logdir: m.get_one::("logdir").cloned(), - no_log_files: m.get_one::("nologfiles").cloned().unwrap_or(defaults.no_log_files), - rpclisten: m.get_one::("rpclisten").cloned(), - rpclisten_borsh: m.get_one::("rpclisten-borsh").cloned(), - rpclisten_json: m.get_one::("rpclisten-json").cloned(), - unsafe_rpc: m.get_one::("unsaferpc").cloned().unwrap_or(defaults.unsafe_rpc), - wrpc_verbose: false, - log_level: m.get_one::("log_level").cloned().unwrap(), - async_threads: m.get_one::("async_threads").cloned().unwrap_or(defaults.async_threads), - connect_peers: m.get_many::("connect-peers").unwrap_or_default().copied().collect(), - add_peers: m.get_many::("add-peers").unwrap_or_default().copied().collect(), - listen: m.get_one::("listen").cloned(), - outbound_target: m.get_one::("outpeers").cloned().unwrap_or(defaults.outbound_target), - inbound_limit: m.get_one::("maxinpeers").cloned().unwrap_or(defaults.inbound_limit), - rpc_max_clients: m.get_one::("rpcmaxclients").cloned().unwrap_or(defaults.rpc_max_clients), - reset_db: m.get_one::("reset-db").cloned().unwrap_or(defaults.reset_db), - enable_unsynced_mining: m.get_one::("enable-unsynced-mining").cloned().unwrap_or(defaults.enable_unsynced_mining), - enable_mainnet_mining: m.get_one::("enable-mainnet-mining").cloned().unwrap_or(defaults.enable_mainnet_mining), - utxoindex: m.get_one::("utxoindex").cloned().unwrap_or(defaults.utxoindex), - testnet: m.get_one::("testnet").cloned().unwrap_or(defaults.testnet), - testnet_suffix: m.get_one::("netsuffix").cloned().unwrap_or(defaults.testnet_suffix), - devnet: m.get_one::("devnet").cloned().unwrap_or(defaults.devnet), - simnet: m.get_one::("simnet").cloned().unwrap_or(defaults.simnet), - archival: m.get_one::("archival").cloned().unwrap_or(defaults.archival), - sanity: m.get_one::("sanity").cloned().unwrap_or(defaults.sanity), - yes: m.get_one::("yes").cloned().unwrap_or(defaults.yes), - user_agent_comments: m.get_many::("user_agent_comments").unwrap_or_default().cloned().collect(), - externalip: m.get_one::("externalip").cloned(), - perf_metrics: m.get_one::("perf-metrics").cloned().unwrap_or(defaults.perf_metrics), - perf_metrics_interval_sec: m - .get_one::("perf-metrics-interval-sec") - .cloned() - .unwrap_or(defaults.perf_metrics_interval_sec), - } - } +pub fn parse_args() -> Args { + let m: clap::ArgMatches = cli().get_matches(); + let defaults: Args = Default::default(); - pub fn apply_to_config(&self, config: &mut Config) { - config.utxoindex = self.utxoindex; - config.unsafe_rpc = self.unsafe_rpc; - config.enable_unsynced_mining = self.enable_unsynced_mining; - config.enable_mainnet_mining = self.enable_mainnet_mining; - config.is_archival = self.archival; - // TODO: change to `config.enable_sanity_checks = self.sanity` when we reach stable versions - config.enable_sanity_checks = true; - config.user_agent_comments = self.user_agent_comments.clone(); - config.p2p_listen_address = self.listen.unwrap_or(ContextualNetAddress::unspecified()); - config.externalip = self.externalip; + Args { + appdir: m.get_one::("appdir").cloned(), + logdir: m.get_one::("logdir").cloned(), + no_log_files: m.get_one::("nologfiles").cloned().unwrap_or(defaults.no_log_files), + rpclisten: m.get_one::("rpclisten").cloned(), + rpclisten_borsh: m.get_one::("rpclisten-borsh").cloned(), + rpclisten_json: m.get_one::("rpclisten-json").cloned(), + unsafe_rpc: m.get_one::("unsaferpc").cloned().unwrap_or(defaults.unsafe_rpc), + wrpc_verbose: false, + log_level: m.get_one::("log_level").cloned().unwrap(), + async_threads: m.get_one::("async_threads").cloned().unwrap_or(defaults.async_threads), + connect_peers: m.get_many::("connect-peers").unwrap_or_default().copied().collect(), + add_peers: m.get_many::("add-peers").unwrap_or_default().copied().collect(), + listen: m.get_one::("listen").cloned(), + outbound_target: m.get_one::("outpeers").cloned().unwrap_or(defaults.outbound_target), + inbound_limit: m.get_one::("maxinpeers").cloned().unwrap_or(defaults.inbound_limit), + rpc_max_clients: m.get_one::("rpcmaxclients").cloned().unwrap_or(defaults.rpc_max_clients), + reset_db: m.get_one::("reset-db").cloned().unwrap_or(defaults.reset_db), + enable_unsynced_mining: m.get_one::("enable-unsynced-mining").cloned().unwrap_or(defaults.enable_unsynced_mining), + enable_mainnet_mining: m.get_one::("enable-mainnet-mining").cloned().unwrap_or(defaults.enable_mainnet_mining), + utxoindex: m.get_one::("utxoindex").cloned().unwrap_or(defaults.utxoindex), + testnet: m.get_one::("testnet").cloned().unwrap_or(defaults.testnet), + testnet_suffix: m.get_one::("netsuffix").cloned().unwrap_or(defaults.testnet_suffix), + devnet: m.get_one::("devnet").cloned().unwrap_or(defaults.devnet), + simnet: m.get_one::("simnet").cloned().unwrap_or(defaults.simnet), + archival: m.get_one::("archival").cloned().unwrap_or(defaults.archival), + sanity: m.get_one::("sanity").cloned().unwrap_or(defaults.sanity), + yes: m.get_one::("yes").cloned().unwrap_or(defaults.yes), + user_agent_comments: m.get_many::("user_agent_comments").unwrap_or_default().cloned().collect(), + externalip: m.get_one::("externalip").cloned(), + perf_metrics: m.get_one::("perf-metrics").cloned().unwrap_or(defaults.perf_metrics), + perf_metrics_interval_sec: m + .get_one::("perf-metrics-interval-sec") + .cloned() + .unwrap_or(defaults.perf_metrics_interval_sec), } } @@ -384,5 +387,4 @@ impl Args { --override-dag-params-file= Overrides DAG params (allowed only on devnet) -s, --service= Service command {install, remove, start, stop} - */ diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs new file mode 100644 index 000000000..6e51af673 --- /dev/null +++ b/kaspad/src/daemon.rs @@ -0,0 +1,343 @@ +use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration}; + +use async_channel::unbounded; +use kaspa_consensus_core::{ + config::{Config, ConfigBuilder}, + errors::config::{ConfigError, ConfigResult}, +}; +use kaspa_consensus_notify::{root::ConsensusNotificationRoot, service::NotifyService}; +use kaspa_core::{core::Core, info, trace}; +use kaspa_core::{kaspad_env::version, task::tick::TickService}; +use kaspa_grpc_server::service::GrpcService; +use kaspa_rpc_service::service::RpcCoreService; +use kaspa_utils::networking::ContextualNetAddress; + +use kaspa_addressmanager::AddressManager; +use kaspa_consensus::pipeline::monitor::ConsensusMonitor; +use kaspa_consensus::{consensus::factory::Factory as ConsensusFactory, pipeline::ProcessingCounters}; +use kaspa_consensusmanager::ConsensusManager; +use kaspa_core::task::runtime::AsyncRuntime; +use kaspa_index_processor::service::IndexService; +use kaspa_mining::manager::{MiningManager, MiningManagerProxy}; +use kaspa_p2p_flows::{flow_context::FlowContext, service::P2pService}; + +use kaspa_perf_monitor::builder::Builder as PerfMonitorBuilder; +use kaspa_utxoindex::{api::UtxoIndexProxy, UtxoIndex}; +use kaspa_wrpc_server::service::{Options as WrpcServerOptions, ServerCounters as WrpcServerCounters, WrpcEncoding, WrpcService}; + +use crate::args::Args; + +const DEFAULT_DATA_DIR: &str = "datadir"; +const CONSENSUS_DB: &str = "consensus"; +const UTXOINDEX_DB: &str = "utxoindex"; +const META_DB: &str = "meta"; +const DEFAULT_LOG_DIR: &str = "logs"; + +fn get_home_dir() -> PathBuf { + #[cfg(target_os = "windows")] + return dirs::data_local_dir().unwrap(); + #[cfg(not(target_os = "windows"))] + return dirs::home_dir().unwrap(); +} + +fn get_app_dir() -> PathBuf { + #[cfg(target_os = "windows")] + return get_home_dir().join("rusty-kaspa"); + #[cfg(not(target_os = "windows"))] + return get_home_dir().join(".rusty-kaspa"); +} + +fn validate_config_and_args(_config: &Arc, args: &Args) -> ConfigResult<()> { + if !args.connect_peers.is_empty() && !args.add_peers.is_empty() { + return Err(ConfigError::MixedConnectAndAddPeers); + } + if args.logdir.is_some() && args.no_log_files { + return Err(ConfigError::MixedLogDirAndNoLogFiles); + } + Ok(()) +} + +fn get_user_approval_or_exit(message: &str, approve: bool) { + if approve { + return; + } + println!("{}", message); + let mut input = String::new(); + match std::io::stdin().read_line(&mut input) { + Ok(_) => { + let lower = input.to_lowercase(); + let answer = lower.as_str().strip_suffix("\r\n").or(lower.as_str().strip_suffix('\n')).unwrap_or(lower.as_str()); + if answer == "y" || answer == "yes" { + // return + } else { + println!("Operation was rejected ({}), exiting..", answer); + exit(1); + } + } + Err(error) => { + println!("Error reading from console: {error}, exiting.."); + exit(1); + } + } +} + +#[derive(Default)] +pub struct Runtime { + log_dir: Option, +} + +fn get_app_dir_from_args(args: &Args) -> PathBuf { + let app_dir = args + .appdir + .clone() + .unwrap_or_else(|| get_app_dir().as_path().to_str().unwrap().to_string()) + .replace('~', get_home_dir().as_path().to_str().unwrap()); + if app_dir.is_empty() { + get_app_dir() + } else { + PathBuf::from(app_dir) + } +} + +impl Runtime { + pub fn from_args(args: &Args) -> Self { + // Configure the panic behavior + kaspa_core::panic::configure_panic(); + + let network = args.network(); + let app_dir = get_app_dir_from_args(args); + + // Logs directory is usually under the application directory, unless otherwise specified + let log_dir = args.logdir.clone().unwrap_or_default().replace('~', get_home_dir().as_path().to_str().unwrap()); + let log_dir = if log_dir.is_empty() { app_dir.join(network.name()).join(DEFAULT_LOG_DIR) } else { PathBuf::from(log_dir) }; + let log_dir = if args.no_log_files { None } else { log_dir.to_str() }; + + // Initialize the logger + kaspa_core::log::init_logger(log_dir, &args.log_level); + + Self { log_dir: log_dir.map(|log_dir| log_dir.to_owned()) } + } +} + +pub fn create_core(args: Args) -> Arc { + let rt = Runtime::from_args(&args); + create_core_with_runtime(&rt, &args) +} + +pub fn create_core_with_runtime(runtime: &Runtime, args: &Args) -> Arc { + let network = args.network(); + + let config = Arc::new( + ConfigBuilder::new(network.into()) + .adjust_perf_params_to_consensus_params() + .apply_args(|config| args.apply_to_config(config)) + .build(), + ); + + // Make sure config and args form a valid set of properties + if let Err(err) = validate_config_and_args(&config, args) { + println!("{}", err); + exit(1); + } + + let app_dir = get_app_dir_from_args(args); + let db_dir = app_dir.join(network.name()).join(DEFAULT_DATA_DIR); + + // Print package name and version + info!("{} v{}", env!("CARGO_PKG_NAME"), version()); + + assert!(!db_dir.to_str().unwrap().is_empty()); + info!("Application directory: {}", app_dir.display()); + info!("Data directory: {}", db_dir.display()); + match runtime.log_dir.as_ref() { + Some(s) => { + info!("Logs directory: {}", s); + } + None => { + info!("Logs to console only"); + } + } + + let consensus_db_dir = db_dir.join(CONSENSUS_DB); + let utxoindex_db_dir = db_dir.join(UTXOINDEX_DB); + let meta_db_dir = db_dir.join(META_DB); + + if args.reset_db && db_dir.exists() { + let msg = "Reset DB was requested -- this means the current databases will be fully deleted, +do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm all interactive questions)"; + get_user_approval_or_exit(msg, args.yes); + info!("Deleting databases"); + fs::remove_dir_all(&db_dir).unwrap(); + } + + fs::create_dir_all(consensus_db_dir.as_path()).unwrap(); + fs::create_dir_all(meta_db_dir.as_path()).unwrap(); + if args.utxoindex { + info!("Utxoindex Data directory {}", utxoindex_db_dir.display()); + fs::create_dir_all(utxoindex_db_dir.as_path()).unwrap(); + } + + // DB used for addresses store and for multi-consensus management + let mut meta_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(meta_db_dir.clone()).build(); + + // TEMP: upgrade from Alpha version or any version before this one + if meta_db.get_pinned(b"multi-consensus-metadata-key").is_ok_and(|r| r.is_some()) { + let msg = "Node database is from an older Kaspad version and needs to be fully deleted, do you confirm the delete? (y/n)"; + get_user_approval_or_exit(msg, args.yes); + + info!("Deleting databases from previous Kaspad version"); + + // Drop so that deletion works + drop(meta_db); + + // Delete + fs::remove_dir_all(db_dir).unwrap(); + + // Recreate the empty folders + fs::create_dir_all(consensus_db_dir.as_path()).unwrap(); + fs::create_dir_all(meta_db_dir.as_path()).unwrap(); + fs::create_dir_all(utxoindex_db_dir.as_path()).unwrap(); + + // Reopen the DB + meta_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(meta_db_dir).build(); + } + + let connect_peers = args.connect_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect::>(); + let add_peers = args.add_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect(); + let p2p_server_addr = args.listen.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_p2p_port()); + // connect_peers means no DNS seeding and no outbound peers + let outbound_target = if connect_peers.is_empty() { args.outbound_target } else { 0 }; + let dns_seeders = if connect_peers.is_empty() { config.dns_seeders } else { &[] }; + + let grpc_server_addr = args.rpclisten.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_rpc_port()); + + let core = Arc::new(Core::new()); + + // --- + + let tick_service = Arc::new(TickService::new()); + let (notification_send, notification_recv) = unbounded(); + let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_send)); + let processing_counters = Arc::new(ProcessingCounters::default()); + let wrpc_borsh_counters = Arc::new(WrpcServerCounters::default()); + let wrpc_json_counters = Arc::new(WrpcServerCounters::default()); + + // Use `num_cpus` background threads for the consensus database as recommended by rocksdb + let consensus_db_parallelism = num_cpus::get(); + let consensus_factory = Arc::new(ConsensusFactory::new( + meta_db.clone(), + &config, + consensus_db_dir, + consensus_db_parallelism, + notification_root.clone(), + processing_counters.clone(), + )); + let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory)); + let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone())); + + let perf_monitor_builder = PerfMonitorBuilder::new() + .with_fetch_interval(Duration::from_secs(args.perf_metrics_interval_sec)) + .with_tick_service(tick_service.clone()); + let perf_monitor = if args.perf_metrics { + let cb = move |counters| { + trace!("[{}] metrics: {:?}", kaspa_perf_monitor::SERVICE_NAME, counters); + #[cfg(feature = "heap")] + trace!("heap stats: {:?}", dhat::HeapStats::get()); + }; + Arc::new(perf_monitor_builder.with_fetch_cb(cb).build()) + } else { + Arc::new(perf_monitor_builder.build()) + }; + + let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv)); + let index_service: Option> = if args.utxoindex { + // Use only a single thread for none-consensus databases + let utxoindex_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(utxoindex_db_dir).build(); + let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap()); + let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), Some(utxoindex))); + Some(index_service) + } else { + None + }; + + let address_manager = AddressManager::new(config.clone(), meta_db); + let mining_manager = + MiningManagerProxy::new(Arc::new(MiningManager::new(config.target_time_per_block, false, config.max_block_mass, None))); + + let flow_context = Arc::new(FlowContext::new( + consensus_manager.clone(), + address_manager, + config.clone(), + mining_manager.clone(), + tick_service.clone(), + notification_root, + )); + let p2p_service = Arc::new(P2pService::new( + flow_context.clone(), + connect_peers, + add_peers, + p2p_server_addr, + outbound_target, + args.inbound_limit, + dns_seeders, + config.default_p2p_port(), + )); + + let rpc_core_service = Arc::new(RpcCoreService::new( + consensus_manager.clone(), + notify_service.notifier(), + index_service.as_ref().map(|x| x.notifier()), + mining_manager, + flow_context, + index_service.as_ref().map(|x| x.utxoindex().unwrap()), + config, + core.clone(), + processing_counters, + wrpc_borsh_counters.clone(), + wrpc_json_counters.clone(), + perf_monitor.clone(), + )); + let grpc_service = Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients)); + + // Create an async runtime and register the top-level async services + let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads)); + async_runtime.register(tick_service); + async_runtime.register(notify_service); + if let Some(index_service) = index_service { + async_runtime.register(index_service) + }; + async_runtime.register(rpc_core_service.clone()); + async_runtime.register(grpc_service); + async_runtime.register(p2p_service); + async_runtime.register(consensus_monitor); + async_runtime.register(perf_monitor); + let wrpc_service_tasks: usize = 2; // num_cpus::get() / 2; + // Register wRPC servers based on command line arguments + [ + (args.rpclisten_borsh.clone(), WrpcEncoding::Borsh, wrpc_borsh_counters), + (args.rpclisten_json.clone(), WrpcEncoding::SerdeJson, wrpc_json_counters), + ] + .into_iter() + .filter_map(|(listen_address, encoding, wrpc_server_counters)| { + listen_address.map(|listen_address| { + Arc::new(WrpcService::new( + wrpc_service_tasks, + Some(rpc_core_service.clone()), + &encoding, + wrpc_server_counters, + WrpcServerOptions { + listen_address: listen_address.to_address(&network.network_type, &encoding).to_string(), // TODO: use a normalized ContextualNetAddress instead of a String + verbose: args.wrpc_verbose, + ..WrpcServerOptions::default() + }, + )) + }) + }) + .for_each(|server| async_runtime.register(server)); + + // Consensus must start first in order to init genesis in stores + core.bind(consensus_manager); + core.bind(async_runtime); + + core +} diff --git a/kaspad/src/lib.rs b/kaspad/src/lib.rs new file mode 100644 index 000000000..aa5106724 --- /dev/null +++ b/kaspad/src/lib.rs @@ -0,0 +1,2 @@ +pub mod args; +pub mod daemon; diff --git a/kaspad/src/main.rs b/kaspad/src/main.rs index 3af10b89f..844990dc2 100644 --- a/kaspad/src/main.rs +++ b/kaspad/src/main.rs @@ -2,350 +2,21 @@ extern crate kaspa_consensus; extern crate kaspa_core; extern crate kaspa_hashes; -use kaspa_addressmanager::AddressManager; -use kaspa_consensus::consensus::factory::Factory as ConsensusFactory; -use kaspa_consensus::pipeline::monitor::ConsensusMonitor; -use kaspa_consensus::pipeline::ProcessingCounters; -use kaspa_consensus_core::config::Config; -use kaspa_consensus_core::errors::config::{ConfigError, ConfigResult}; -use kaspa_consensus_core::networktype::{NetworkId, NetworkType}; -use kaspa_consensus_notify::root::ConsensusNotificationRoot; -use kaspa_consensus_notify::service::NotifyService; -use kaspa_consensusmanager::ConsensusManager; -use kaspa_core::kaspad_env::version; -use kaspa_core::task::tick::TickService; -use kaspa_core::{core::Core, signals::Signals, task::runtime::AsyncRuntime}; -use kaspa_index_processor::service::IndexService; -use kaspa_mining::manager::{MiningManager, MiningManagerProxy}; -use kaspa_p2p_flows::flow_context::FlowContext; -use kaspa_rpc_service::service::RpcCoreService; -use kaspa_utils::networking::ContextualNetAddress; -use kaspa_utxoindex::api::UtxoIndexProxy; - -use std::fs; -use std::path::PathBuf; -use std::process::exit; use std::sync::Arc; -use std::time::Duration; - -use args::{Args, Defaults}; - -use kaspa_consensus::config::ConfigBuilder; -use kaspa_utxoindex::UtxoIndex; - -use async_channel::unbounded; -use kaspa_core::{info, trace}; -use kaspa_grpc_server::service::GrpcService; -use kaspa_p2p_flows::service::P2pService; -use kaspa_perf_monitor::builder::Builder as PerfMonitorBuilder; -use kaspa_wrpc_server::service::{Options as WrpcServerOptions, ServerCounters as WrpcServerCounters, WrpcEncoding, WrpcService}; - -mod args; - -const DEFAULT_DATA_DIR: &str = "datadir"; -const CONSENSUS_DB: &str = "consensus"; -const UTXOINDEX_DB: &str = "utxoindex"; -const META_DB: &str = "meta"; -const DEFAULT_LOG_DIR: &str = "logs"; - -fn get_home_dir() -> PathBuf { - #[cfg(target_os = "windows")] - return dirs::data_local_dir().unwrap(); - #[cfg(not(target_os = "windows"))] - return dirs::home_dir().unwrap(); -} - -fn get_app_dir() -> PathBuf { - #[cfg(target_os = "windows")] - return get_home_dir().join("rusty-kaspa"); - #[cfg(not(target_os = "windows"))] - return get_home_dir().join(".rusty-kaspa"); -} - -fn validate_config_and_args(_config: &Arc, args: &Args) -> ConfigResult<()> { - if !args.connect_peers.is_empty() && !args.add_peers.is_empty() { - return Err(ConfigError::MixedConnectAndAddPeers); - } - if args.logdir.is_some() && args.no_log_files { - return Err(ConfigError::MixedLogDirAndNoLogFiles); - } - Ok(()) -} -fn get_user_approval_or_exit(message: &str, approve: bool) { - if approve { - return; - } - println!("{}", message); - let mut input = String::new(); - match std::io::stdin().read_line(&mut input) { - Ok(_) => { - let lower = input.to_lowercase(); - let answer = lower.as_str().strip_suffix("\r\n").or(lower.as_str().strip_suffix('\n')).unwrap_or(lower.as_str()); - if answer == "y" || answer == "yes" { - // return - } else { - println!("Operation was rejected ({}), exiting..", answer); - exit(1); - } - } - Err(error) => { - println!("Error reading from console: {error}, exiting.."); - exit(1); - } - } -} - -#[cfg(feature = "heap")] -#[global_allocator] -static ALLOC: dhat::Alloc = dhat::Alloc; +use kaspa_core::{info, signals::Signals}; +use kaspad::{args::parse_args, daemon::create_core}; pub fn main() { #[cfg(feature = "heap")] let _profiler = dhat::Profiler::builder().file_name("kaspad-heap.json").build(); - let args = Args::parse(&Defaults::default()); - - // Configure the panic behavior - kaspa_core::panic::configure_panic(); - - let network = match (args.testnet, args.devnet, args.simnet) { - (false, false, false) => NetworkType::Mainnet.into(), - (true, false, false) => NetworkId::with_suffix(NetworkType::Testnet, args.testnet_suffix), - (false, true, false) => NetworkType::Devnet.into(), - (false, false, true) => NetworkType::Simnet.into(), - _ => panic!("only a single net should be activated"), - }; - - let config = Arc::new( - ConfigBuilder::new(network.into()) - .adjust_perf_params_to_consensus_params() - .apply_args(|config| args.apply_to_config(config)) - .build(), - ); - - // Make sure config and args form a valid set of properties - if let Err(err) = validate_config_and_args(&config, &args) { - println!("{}", err); - exit(1); - } - - // TODO: Refactor all this quick-and-dirty code - let app_dir = args - .appdir - .unwrap_or_else(|| get_app_dir().as_path().to_str().unwrap().to_string()) - .replace('~', get_home_dir().as_path().to_str().unwrap()); - let app_dir = if app_dir.is_empty() { get_app_dir() } else { PathBuf::from(app_dir) }; - let db_dir = app_dir.join(config.network_name()).join(DEFAULT_DATA_DIR); - - // Logs directory is usually under the application directory, unless otherwise specified - let log_dir = args.logdir.unwrap_or_default().replace('~', get_home_dir().as_path().to_str().unwrap()); - let log_dir = if log_dir.is_empty() { app_dir.join(config.network_name()).join(DEFAULT_LOG_DIR) } else { PathBuf::from(log_dir) }; - let log_dir = if args.no_log_files { None } else { log_dir.to_str() }; - - // Initialize the logger - kaspa_core::log::init_logger(log_dir, &args.log_level); - - // Print package name and version - info!("{} v{}", env!("CARGO_PKG_NAME"), version()); - - assert!(!db_dir.to_str().unwrap().is_empty()); - info!("Application directory: {}", app_dir.display()); - info!("Data directory: {}", db_dir.display()); - match log_dir { - Some(s) => { - info!("Logs directory: {}", s); - } - None => { - info!("Logs to console only"); - } - } - - let consensus_db_dir = db_dir.join(CONSENSUS_DB); - let utxoindex_db_dir = db_dir.join(UTXOINDEX_DB); - let meta_db_dir = db_dir.join(META_DB); - if args.reset_db && db_dir.exists() { - let msg = "Reset DB was requested -- this means the current databases will be fully deleted, -do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm all interactive questions)"; - get_user_approval_or_exit(msg, args.yes); - info!("Deleting databases"); - fs::remove_dir_all(db_dir.clone()).unwrap(); - } - - fs::create_dir_all(consensus_db_dir.as_path()).unwrap(); - fs::create_dir_all(meta_db_dir.as_path()).unwrap(); - if args.utxoindex { - info!("Utxoindex Data directory {}", utxoindex_db_dir.display()); - fs::create_dir_all(utxoindex_db_dir.as_path()).unwrap(); - } - - // DB used for addresses store and for multi-consensus management - let mut meta_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(meta_db_dir.clone()).build(); - - // TEMP: upgrade from Alpha version or any version before this one - if meta_db.get_pinned(b"multi-consensus-metadata-key").is_ok_and(|r| r.is_some()) { - let msg = "Node database is from an older Kaspad version and needs to be fully deleted, do you confirm the delete? (y/n)"; - get_user_approval_or_exit(msg, args.yes); - - info!("Deleting databases from previous Kaspad version"); - - // Drop so that deletion works - drop(meta_db); - - // Delete - fs::remove_dir_all(db_dir).unwrap(); - - // Recreate the empty folders - fs::create_dir_all(consensus_db_dir.as_path()).unwrap(); - fs::create_dir_all(meta_db_dir.as_path()).unwrap(); - fs::create_dir_all(utxoindex_db_dir.as_path()).unwrap(); - - // Reopen the DB - meta_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(meta_db_dir).build(); - } - - let connect_peers = args.connect_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect::>(); - let add_peers = args.add_peers.iter().map(|x| x.normalize(config.default_p2p_port())).collect(); - let p2p_server_addr = args.listen.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_p2p_port()); - // connect_peers means no DNS seeding and no outbound peers - let outbound_target = if connect_peers.is_empty() { args.outbound_target } else { 0 }; - let dns_seeders = if connect_peers.is_empty() { config.dns_seeders } else { &[] }; - - let grpc_server_addr = args.rpclisten.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_rpc_port()); - - let core = Arc::new(Core::new()); - - // --- - - let tick_service = Arc::new(TickService::new()); - let (notification_send, notification_recv) = unbounded(); - let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_send)); - let processing_counters = Arc::new(ProcessingCounters::default()); - let wrpc_borsh_counters = Arc::new(WrpcServerCounters::default()); - let wrpc_json_counters = Arc::new(WrpcServerCounters::default()); - - // Use `num_cpus` background threads for the consensus database as recommended by rocksdb - let consensus_db_parallelism = num_cpus::get(); - let consensus_factory = Arc::new(ConsensusFactory::new( - meta_db.clone(), - &config, - consensus_db_dir, - consensus_db_parallelism, - notification_root.clone(), - processing_counters.clone(), - )); - - let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory)); - let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone())); - - let perf_monitor_builder = PerfMonitorBuilder::new() - .with_fetch_interval(Duration::from_secs(args.perf_metrics_interval_sec)) - .with_tick_service(tick_service.clone()); - let perf_monitor = if args.perf_metrics { - let cb = move |counters| { - trace!("[{}] metrics: {:?}", kaspa_perf_monitor::SERVICE_NAME, counters); - #[cfg(feature = "heap")] - trace!("heap stats: {:?}", dhat::HeapStats::get()); - }; - Arc::new(perf_monitor_builder.with_fetch_cb(cb).build()) - } else { - Arc::new(perf_monitor_builder.build()) - }; - - let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv)); - let index_service: Option> = if args.utxoindex { - // Use only a single thread for none-consensus databases - let utxoindex_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(utxoindex_db_dir).build(); - let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap()); - let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), Some(utxoindex))); - Some(index_service) - } else { - None - }; - - let address_manager = AddressManager::new(config.clone(), meta_db); - let mining_manager = - MiningManagerProxy::new(Arc::new(MiningManager::new(config.target_time_per_block, false, config.max_block_mass, None))); - - let flow_context = Arc::new(FlowContext::new( - consensus_manager.clone(), - address_manager, - config.clone(), - mining_manager.clone(), - tick_service.clone(), - notification_root, - )); - let p2p_service = Arc::new(P2pService::new( - flow_context.clone(), - connect_peers, - add_peers, - p2p_server_addr, - outbound_target, - args.inbound_limit, - dns_seeders, - config.default_p2p_port(), - )); - - let rpc_core_service = Arc::new(RpcCoreService::new( - consensus_manager.clone(), - notify_service.notifier(), - index_service.as_ref().map(|x| x.notifier()), - mining_manager, - flow_context, - index_service.as_ref().map(|x| x.utxoindex().unwrap()), - config, - core.clone(), - processing_counters, - wrpc_borsh_counters.clone(), - wrpc_json_counters.clone(), - perf_monitor.clone(), - )); - let grpc_service = Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients)); - - // Create an async runtime and register the top-level async services - let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads)); - async_runtime.register(tick_service); - async_runtime.register(notify_service); - if let Some(index_service) = index_service { - async_runtime.register(index_service) - }; - async_runtime.register(rpc_core_service.clone()); - async_runtime.register(grpc_service); - async_runtime.register(p2p_service); - async_runtime.register(consensus_monitor); - async_runtime.register(perf_monitor); - let wrpc_service_tasks: usize = 2; // num_cpus::get() / 2; - // Register wRPC servers based on command line arguments - [ - (args.rpclisten_borsh, WrpcEncoding::Borsh, wrpc_borsh_counters), - (args.rpclisten_json, WrpcEncoding::SerdeJson, wrpc_json_counters), - ] - .into_iter() - .filter_map(|(listen_address, encoding, wrpc_server_counters)| { - listen_address.map(|listen_address| { - Arc::new(WrpcService::new( - wrpc_service_tasks, - Some(rpc_core_service.clone()), - &encoding, - wrpc_server_counters, - WrpcServerOptions { - listen_address: listen_address.to_address(&network.network_type, &encoding).to_string(), // TODO: use a normalized ContextualNetAddress instead of a String - verbose: args.wrpc_verbose, - ..WrpcServerOptions::default() - }, - )) - }) - }) - .for_each(|server| async_runtime.register(server)); + let args = parse_args(); + let core = create_core(args); // Bind the keyboard signal to the core Arc::new(Signals::new(&core)).init(); - // Consensus must start first in order to init genesis in stores - core.bind(consensus_manager); - core.bind(async_runtime); - core.run(); - - info!("Kaspad has stopped"); + info!("Kaspad has stopped..."); } diff --git a/kos/src/metrics/toolbar.rs b/kos/src/metrics/toolbar.rs index c1fcf1566..fda2f681f 100644 --- a/kos/src/metrics/toolbar.rs +++ b/kos/src/metrics/toolbar.rs @@ -50,7 +50,7 @@ pub struct ToolbarInner { pub callbacks: CallbackMap, pub container: Arc>>>, pub graphs: Arc>>>, - pub controls: Arc>>>, + pub controls: Arc>>>, pub layout: Arc>, } @@ -109,11 +109,11 @@ impl Toolbar { self.inner.layout.lock().unwrap() } - pub fn controls(&self) -> MutexGuard>> { + pub fn controls(&self) -> MutexGuard>> { self.inner.controls.lock().unwrap() } - pub fn push(&self, control: impl Control + 'static) { + pub fn push(&self, control: impl Control + Send + Sync + 'static) { let control = Arc::new(control); self.controls().push(control); } @@ -246,7 +246,7 @@ impl Toolbar { } } -type ButtonCallback = dyn Fn(&Button) + 'static; +type ButtonCallback = dyn Fn(&Button) + Send + Sync + 'static; #[derive(Clone)] pub struct Button { @@ -276,7 +276,7 @@ impl Button { } } -type RadioButtonCallback = dyn Fn(&RadioButton) + 'static; +type RadioButtonCallback = dyn Fn(&RadioButton) + Send + Sync + 'static; #[derive(Clone)] pub struct RadioButton { @@ -284,6 +284,9 @@ pub struct RadioButton { pub element: Element, } +unsafe impl Send for RadioButton {} +unsafe impl Sync for RadioButton {} + impl RadioButton { pub fn try_new( toolbar: &Toolbar, diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 143c2920f..236a34bf6 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -184,7 +184,9 @@ impl HandleRelayInvsFlow { async fn request_block(&mut self, requested_hash: Hash) -> Result)>, ProtocolError> { // Note: the request scope is returned and should be captured until block processing is completed - let Some(request_scope) = self.ctx.try_adding_block_request(requested_hash) else { return Ok(None); }; + let Some(request_scope) = self.ctx.try_adding_block_request(requested_hash) else { + return Ok(None); + }; self.router .enqueue(make_message!(Payload::RequestRelayBlocks, RequestRelayBlocksMessage { hashes: vec![requested_hash.into()] })) .await?; diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index cd8cbb0a3..ffddbe225 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -214,7 +214,9 @@ impl IbdFlow { debug!("received trusted data with {} daa entries and {} ghostdag entries", pkg.daa_window.len(), pkg.ghostdag_window.len()); let mut entry_stream = TrustedEntryStream::new(&self.router, &mut self.incoming_route); - let Some(pruning_point_entry) = entry_stream.next().await? else { return Err(ProtocolError::Other("got `done` message before receiving the pruning point")); }; + let Some(pruning_point_entry) = entry_stream.next().await? else { + return Err(ProtocolError::Other("got `done` message before receiving the pruning point")); + }; if pruning_point_entry.block.hash() != proof_pruning_point { return Err(ProtocolError::Other("the proof pruning point is not equal to the expected trusted entry")); diff --git a/protocol/flows/src/v5/ping.rs b/protocol/flows/src/v5/ping.rs index 6a14bb22a..fe2e1e2e8 100644 --- a/protocol/flows/src/v5/ping.rs +++ b/protocol/flows/src/v5/ping.rs @@ -86,7 +86,9 @@ impl SendPingsFlow { let nonce = rand::thread_rng().gen::(); let ping = make_message!(Payload::Ping, PingMessage { nonce }); let ping_time = Instant::now(); - let Some(router) = self.router.upgrade() else { return Err(ProtocolError::ConnectionClosed); }; + let Some(router) = self.router.upgrade() else { + return Err(ProtocolError::ConnectionClosed); + }; router.enqueue(ping).await?; let pong = dequeue_with_timeout!(self.incoming_route, Payload::Pong)?; if pong.nonce != nonce { diff --git a/protocol/flows/src/v5/txrelay/flow.rs b/protocol/flows/src/v5/txrelay/flow.rs index 1c75e5710..ce9ba3c21 100644 --- a/protocol/flows/src/v5/txrelay/flow.rs +++ b/protocol/flows/src/v5/txrelay/flow.rs @@ -175,7 +175,9 @@ impl RelayTransactionsFlow { request.req, transaction_id ))); } - let Response::Transaction(transaction) = response else { continue; }; + let Response::Transaction(transaction) = response else { + continue; + }; match self .ctx .mining_manager() diff --git a/protocol/p2p/src/core/connection_handler.rs b/protocol/p2p/src/core/connection_handler.rs index 1eaf9ceda..a3e14cde7 100644 --- a/protocol/p2p/src/core/connection_handler.rs +++ b/protocol/p2p/src/core/connection_handler.rs @@ -80,7 +80,9 @@ impl ConnectionHandler { /// Connect to a new peer pub(crate) async fn connect(&self, peer_address: String) -> Result, ConnectionError> { - let Some(socket_address) = peer_address.to_socket_addrs()?.next() else { return Err(ConnectionError::NoAddress); }; + let Some(socket_address) = peer_address.to_socket_addrs()?.next() else { + return Err(ConnectionError::NoAddress); + }; let peer_address = format!("http://{}", peer_address); // Add scheme prefix as required by Tonic let channel = tonic::transport::Endpoint::new(peer_address)? diff --git a/rpc/grpc/server/src/tests/mod.rs b/rpc/grpc/server/src/tests/mod.rs index faca71e91..de473666d 100644 --- a/rpc/grpc/server/src/tests/mod.rs +++ b/rpc/grpc/server/src/tests/mod.rs @@ -1,3 +1,3 @@ -pub(self) mod rpc_core_mock; +mod rpc_core_mock; mod client_server; diff --git a/rpc/wrpc/server/src/address.rs b/rpc/wrpc/server/src/address.rs index 00ac9566b..2bfd981d1 100644 --- a/rpc/wrpc/server/src/address.rs +++ b/rpc/wrpc/server/src/address.rs @@ -45,3 +45,19 @@ impl FromStr for WrpcNetAddress { } } } + +impl TryFrom<&str> for WrpcNetAddress { + type Error = AddrParseError; + + fn try_from(s: &str) -> Result { + WrpcNetAddress::from_str(s) + } +} + +impl TryFrom for WrpcNetAddress { + type Error = AddrParseError; + + fn try_from(s: String) -> Result { + WrpcNetAddress::from_str(&s) + } +} diff --git a/simpa/src/simulator/miner.rs b/simpa/src/simulator/miner.rs index 90a851513..9bc3aae64 100644 --- a/simpa/src/simulator/miner.rs +++ b/simpa/src/simulator/miner.rs @@ -64,7 +64,7 @@ impl Miner { target_blocks: Option, ) -> Self { let (schnorr_public_key, _) = pk.x_only_public_key(); - let script_pub_key_script = once(0x20).chain(schnorr_public_key.serialize().into_iter()).chain(once(0xac)).collect_vec(); // TODO: Use script builder when available to create p2pk properly + let script_pub_key_script = once(0x20).chain(schnorr_public_key.serialize()).chain(once(0xac)).collect_vec(); // TODO: Use script builder when available to create p2pk properly let script_pub_key_script_vec = ScriptVec::from_slice(&script_pub_key_script); Self { id, @@ -108,7 +108,9 @@ impl Miner { .possible_unspent_outpoints .iter() .filter_map(|&outpoint| { - let Some(entry) = self.get_spendable_entry(virtual_utxo_view, outpoint, virtual_state.daa_score) else { return None; }; + let Some(entry) = self.get_spendable_entry(virtual_utxo_view, outpoint, virtual_state.daa_score) else { + return None; + }; let unsigned_tx = self.create_unsigned_tx(outpoint, entry.amount, multiple_outputs); Some(MutableTransaction::with_entries(unsigned_tx, vec![entry])) }) @@ -130,7 +132,9 @@ impl Miner { outpoint: TransactionOutpoint, virtual_daa_score: u64, ) -> Option { - let Some(entry) = utxo_view.get(&outpoint) else { return None; }; + let Some(entry) = utxo_view.get(&outpoint) else { + return None; + }; if entry.amount < 2 || (entry.is_coinbase && (virtual_daa_score as i64 - entry.block_daa_score as i64) <= self.params.coinbase_maturity as i64) { diff --git a/testing/integration/Cargo.toml b/testing/integration/Cargo.toml index fedbf0ddb..753c363fb 100644 --- a/testing/integration/Cargo.toml +++ b/testing/integration/Cargo.toml @@ -18,6 +18,9 @@ kaspa-consensus-core.workspace = true kaspa-consensus-notify.workspace = true kaspa-consensus.workspace = true kaspa-consensusmanager.workspace = true +kaspad.workspace = true +kaspa-grpc-client.workspace = true +kaspa-rpc-core.workspace = true faster-hex.workspace = true thiserror.workspace = true @@ -41,6 +44,7 @@ kaspa-database.workspace = true kaspa-utxoindex.workspace = true kaspa-index-processor.workspace = true kaspa-bip32.workspace = true +kaspa-wrpc-server.workspace = true crossbeam-channel = "0.5" async-channel = "1.8.0" @@ -54,6 +58,7 @@ criterion.workspace = true rand = { workspace = true, features = ["small_rng"] } tokio = { workspace = true, features = ["rt", "macros"] } kaspa-txscript-errors.workspace = true +port-selector = "0.1.6" [features] html_reports = [] diff --git a/testing/integration/src/integration_tests.rs b/testing/integration/src/integration_tests.rs index 4c5a8815b..1b4e35c9c 100644 --- a/testing/integration/src/integration_tests.rs +++ b/testing/integration/src/integration_tests.rs @@ -38,6 +38,7 @@ use kaspa_consensusmanager::ConsensusManager; use kaspa_core::task::tick::TickService; use kaspa_core::time::unix_now; use kaspa_database::utils::get_kaspa_tempdir; +use kaspa_grpc_client::GrpcClient; use kaspa_hashes::Hash; use flate2::read::GzDecoder; @@ -52,13 +53,17 @@ use kaspa_database::prelude::ConnBuilder; use kaspa_index_processor::service::IndexService; use kaspa_math::Uint256; use kaspa_muhash::MuHash; +use kaspa_rpc_core::notify::mode::NotificationMode; use kaspa_utxoindex::api::{UtxoIndexApi, UtxoIndexProxy}; use kaspa_utxoindex::UtxoIndex; +use kaspad::args::Args; +use kaspad::daemon::create_core_with_runtime; use serde::{Deserialize, Serialize}; use std::cmp::{max, Ordering}; use std::collections::HashSet; use std::path::Path; use std::sync::Arc; +use std::time::Duration; use std::{ collections::HashMap, fs::File, @@ -66,6 +71,7 @@ use std::{ io::{BufRead, BufReader}, str::{from_utf8, FromStr}, }; +use tempfile::{tempdir, TempDir}; use crate::common; @@ -132,7 +138,7 @@ fn reachability_stretch_test(use_attack_json: bool) { let validation_freq = usize::max(1, num_chains / 100); use rand::prelude::*; - let mut rng = StdRng::seed_from_u64(22322); + let mut rng: StdRng = StdRng::seed_from_u64(22322); for i in 0..num_chains { let rand_idx = rng.gen_range(0..blocks.len()); @@ -1703,3 +1709,79 @@ async fn staging_consensus_test() { core.shutdown(); core.join(joins); } + +#[tokio::test] +async fn sanity_integration_test() { + let core1 = DaemonWithRpc::new_random(); + let (workers1, rpc_client1) = core1.start().await; + + let core2 = DaemonWithRpc::new_random(); + let (workers2, rpc_client2) = core2.start().await; + + tokio::time::sleep(Duration::from_secs(1)).await; + rpc_client1.disconnect().await.unwrap(); + drop(rpc_client1); + core1.core.shutdown(); + core1.core.join(workers1); + + rpc_client2.disconnect().await.unwrap(); + drop(rpc_client2); + core2.core.shutdown(); + core2.core.join(workers2); +} + +struct DaemonWithRpc { + core: Arc, + rpc_port: u16, + _appdir_tempdir: TempDir, +} + +impl DaemonWithRpc { + fn new_random() -> DaemonWithRpc { + let mut args = Args { devnet: true, ..Default::default() }; + + // This should ask the OS to allocate free port for socket 1 to 4. + let socket1 = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let rpc_port = socket1.local_addr().unwrap().port(); + + let socket2 = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let p2p_port = socket2.local_addr().unwrap().port(); + + let socket3 = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let rpc_json_port = socket3.local_addr().unwrap().port(); + + let socket4 = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let rpc_borsh_port = socket4.local_addr().unwrap().port(); + + drop(socket1); + drop(socket2); + drop(socket3); + drop(socket4); + + args.rpclisten = Some(format!("0.0.0.0:{rpc_port}").try_into().unwrap()); + args.listen = Some(format!("0.0.0.0:{p2p_port}").try_into().unwrap()); + args.rpclisten_json = Some(format!("0.0.0.0:{rpc_json_port}").parse().unwrap()); + args.rpclisten_borsh = Some(format!("0.0.0.0:{rpc_borsh_port}").parse().unwrap()); + let appdir_tempdir = tempdir().unwrap(); + args.appdir = Some(appdir_tempdir.path().to_str().unwrap().to_owned()); + + let core = create_core_with_runtime(&Default::default(), &args); + DaemonWithRpc { core, rpc_port, _appdir_tempdir: appdir_tempdir } + } + + async fn start(&self) -> (Vec>, GrpcClient) { + let workers = self.core.start(); + tokio::time::sleep(Duration::from_secs(1)).await; + let rpc_client = GrpcClient::connect( + NotificationMode::Direct, + format!("grpc://localhost:{}", self.rpc_port), + true, + None, + false, + Some(500_000), + ) + .await + .unwrap(); + (workers, rpc_client) + } +} diff --git a/utils/src/networking.rs b/utils/src/networking.rs index 02539a2d3..cb1d26208 100644 --- a/utils/src/networking.rs +++ b/utils/src/networking.rs @@ -300,6 +300,22 @@ impl FromStr for ContextualNetAddress { } } +impl TryFrom<&str> for ContextualNetAddress { + type Error = AddrParseError; + + fn try_from(s: &str) -> Result { + ContextualNetAddress::from_str(s) + } +} + +impl TryFrom for ContextualNetAddress { + type Error = AddrParseError; + + fn try_from(s: String) -> Result { + ContextualNetAddress::from_str(&s) + } +} + impl Display for ContextualNetAddress { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.port { diff --git a/utils/src/serde_bytes_fixed/de.rs b/utils/src/serde_bytes_fixed/de.rs index af74e58f1..6cc0e251f 100644 --- a/utils/src/serde_bytes_fixed/de.rs +++ b/utils/src/serde_bytes_fixed/de.rs @@ -15,7 +15,7 @@ pub trait Deserialize<'de, const N: usize>: Sized + crate::hex::FromHex + From<[ macro_rules! deser_fixed_bytes { ($size: expr) => { impl<'de, T: $crate::hex::FromHex + From<[u8; $size]>> $crate::serde_bytes_fixed::Deserialize<'de, $size> for T { - /// Deserialization function for types `T` + /// Deserialization function for types `T` fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -76,8 +76,8 @@ macro_rules! deser_fixed_bytes { A: serde::de::SeqAccess<'de>, { let Some(value): Option<[u8; $size]> = seq.next_element()? else { - return Err(serde::de::Error::invalid_length(0usize, &"tuple struct fixed array with 1 element")); - } ; + return Err(serde::de::Error::invalid_length(0usize, &"tuple struct fixed array with 1 element")); + }; Ok(Self::Value::from(value)) } } diff --git a/utils/src/serde_bytes_fixed_ref/de.rs b/utils/src/serde_bytes_fixed_ref/de.rs index d5b5067e3..bdc0f904b 100644 --- a/utils/src/serde_bytes_fixed_ref/de.rs +++ b/utils/src/serde_bytes_fixed_ref/de.rs @@ -67,8 +67,8 @@ macro_rules! serde_impl_deser_fixed_bytes_ref { A: serde::de::SeqAccess<'de>, { let Some(value): Option<[u8; $size]> = seq.next_element()? else { - return Err(serde::de::Error::invalid_length(0usize, &"tuple struct fixed array with 1 element")); - } ; + return Err(serde::de::Error::invalid_length(0usize, &"tuple struct fixed array with 1 element")); + }; Ok(Self::Value::from(value)) } } diff --git a/wallet/bip32/src/xprivate_key.rs b/wallet/bip32/src/xprivate_key.rs index f82399bee..97a5bce4c 100644 --- a/wallet/bip32/src/xprivate_key.rs +++ b/wallet/bip32/src/xprivate_key.rs @@ -99,7 +99,7 @@ where } pub fn derive_path(self, path: DerivationPath) -> Result { - path.iter().fold(Ok(self), |maybe_key, child_num| maybe_key.and_then(|key| key.derive_child(child_num))) + path.iter().try_fold(self, |key, child_num| key.derive_child(child_num)) } /// Borrow the derived private key value. diff --git a/wallet/bip32/src/xpublic_key.rs b/wallet/bip32/src/xpublic_key.rs index adb5c4fd5..21ca98eff 100644 --- a/wallet/bip32/src/xpublic_key.rs +++ b/wallet/bip32/src/xpublic_key.rs @@ -76,7 +76,7 @@ where } pub fn derive_path(self, path: DerivationPath) -> Result { - path.iter().fold(Ok(self), |maybe_key, child_num| maybe_key.and_then(|key| key.derive_child(child_num))) + path.iter().try_fold(self, |key, child_num| key.derive_child(child_num)) } /// Serialize the raw public key as a byte array (e.g. SEC1-encoded).