Skip to content

Commit

Permalink
Benchmark for measuring various performance aspects under high mempoo…
Browse files Browse the repository at this point in the history
…l pressure (kaspanet#275)

* simnet params: skip pow by default + use testnet11 bps

* initial bbt bmk code with simulated mining

* split mining to 2 async tasks (receiver and miner)

* extract is_connected_to_peers and reuse

* tweak template nonce

* ignore test

* fix clippy warning on devnet-prealloc feature code

* use devnet-prealloc feature for mempool bmk

* refactor generate prealloc utxos to a method

* initial tx dag generator

* rename files/mods

* use kaspa tempdir for test daemon

* some logs and exit logic

* stopwatch; use 3 clients to support concurrency; comments

* todos

* more timing

* rename to `has_sufficient_peer_connectivity`

* add `daemon_integration_tests`

* rename clients

* add daemon mining test from kaspanet#249

* rpc client: assert notify arg is none on multi-listeners mode
  • Loading branch information
michaelsutton authored Sep 6, 2023
1 parent ef234c8 commit 7cd9e15
Show file tree
Hide file tree
Showing 16 changed files with 505 additions and 146 deletions.
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 21 additions & 21 deletions consensus/core/src/config/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,24 +457,34 @@ pub const SIMNET_PARAMS: Params = Params {
dns_seeders: &[],
net: NetworkId::new(NetworkType::Simnet),
genesis: SIMNET_GENESIS,
ghostdag_k: LEGACY_DEFAULT_GHOSTDAG_K,
legacy_timestamp_deviation_tolerance: LEGACY_TIMESTAMP_DEVIATION_TOLERANCE,
new_timestamp_deviation_tolerance: NEW_TIMESTAMP_DEVIATION_TOLERANCE,
past_median_time_sample_rate: Bps::<1>::past_median_time_sample_rate(),
past_median_time_sampled_window_size: MEDIAN_TIME_SAMPLED_WINDOW_SIZE,
target_time_per_block: 1000,
sampling_activation_daa_score: u64::MAX,
sampling_activation_daa_score: 0, // Sampling is activated from network inception
max_difficulty_target: MAX_DIFFICULTY_TARGET,
max_difficulty_target_f64: MAX_DIFFICULTY_TARGET_AS_F64,
difficulty_sample_rate: Bps::<1>::difficulty_adjustment_sample_rate(),
sampled_difficulty_window_size: DIFFICULTY_SAMPLED_WINDOW_SIZE as usize,
legacy_difficulty_window_size: LEGACY_DIFFICULTY_WINDOW_SIZE,
min_difficulty_window_len: MIN_DIFFICULTY_WINDOW_LEN,
max_block_parents: 10,
mergeset_size_limit: (LEGACY_DEFAULT_GHOSTDAG_K as u64) * 10,
merge_depth: 3600,
finality_depth: 86400,
pruning_depth: 185798,

//
// ~~~~~~~~~~~~~~~~~~ BPS dependent constants ~~~~~~~~~~~~~~~~~~
//
// Note we use a 10 BPS configuration for simnet
ghostdag_k: Testnet11Bps::ghostdag_k(),
target_time_per_block: Testnet11Bps::target_time_per_block(),
past_median_time_sample_rate: Testnet11Bps::past_median_time_sample_rate(),
difficulty_sample_rate: Testnet11Bps::difficulty_adjustment_sample_rate(),
max_block_parents: Testnet11Bps::max_block_parents(),
mergeset_size_limit: Testnet11Bps::mergeset_size_limit(),
merge_depth: Testnet11Bps::merge_depth_bound(),
finality_depth: Testnet11Bps::finality_depth(),
pruning_depth: Testnet11Bps::pruning_depth(),
pruning_proof_m: Testnet11Bps::pruning_proof_m(),
deflationary_phase_daa_score: Testnet11Bps::deflationary_phase_daa_score(),
pre_deflationary_phase_base_subsidy: Testnet11Bps::pre_deflationary_phase_base_subsidy(),
coinbase_maturity: Testnet11Bps::coinbase_maturity(),

coinbase_payload_script_public_key_max_len: 150,
max_coinbase_payload_len: 204,

Expand All @@ -492,18 +502,8 @@ pub const SIMNET_PARAMS: Params = Params {
mass_per_sig_op: 1000,
max_block_mass: 500_000,

// deflationary_phase_daa_score is the DAA score after which the pre-deflationary period
// switches to the deflationary period. This number is calculated as follows:
// We define a year as 365.25 days
// Half a year in seconds = 365.25 / 2 * 24 * 60 * 60 = 15778800
// The network was down for three days shortly after launch
// Three days in seconds = 3 * 24 * 60 * 60 = 259200
deflationary_phase_daa_score: 15778800 - 259200,
pre_deflationary_phase_base_subsidy: 50000000000,
coinbase_maturity: 100,
skip_proof_of_work: false,
skip_proof_of_work: true, // For simnet only, PoW can be simulated by default
max_block_level: 250,
pruning_proof_m: 1000,
};

pub const DEVNET_PARAMS: Params = Params {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus/utxo_set_override.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "devnet-prealloc")]
pub mod utxo_set_override {
mod utxo_set_override_inner {
use std::sync::Arc;

use itertools::Itertools;
Expand Down Expand Up @@ -31,4 +31,4 @@ pub mod utxo_set_override {
}

#[cfg(feature = "devnet-prealloc")]
pub use utxo_set_override::*;
pub use utxo_set_override_inner::*;
29 changes: 28 additions & 1 deletion core/src/time.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

/// Returns the number of milliseconds since UNIX EPOCH
#[inline]
pub fn unix_now() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}

/// Stopwatch which reports on drop if the timed operation passed the threshold `TR` in milliseconds
pub struct Stopwatch<const TR: u64 = 1000> {
name: &'static str,
start: Instant,
}

impl Stopwatch {
pub fn new(name: &'static str) -> Self {
Self { name, start: Instant::now() }
}
}

impl<const TR: u64> Stopwatch<TR> {
pub fn with_threshold(name: &'static str) -> Self {
Self { name, start: Instant::now() }
}
}

impl<const TR: u64> Drop for Stopwatch<TR> {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
if elapsed > Duration::from_millis(TR) {
kaspa_core::warn!("\n[{}] Abnormal time: {:#?}", self.name, elapsed);
}
}
}
34 changes: 16 additions & 18 deletions kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use kaspa_core::kaspad_env::version;
use kaspa_utils::networking::{ContextualNetAddress, IpAddress};
use kaspa_wrpc_server::address::WrpcNetAddress;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Args {
// NOTE: it is best if property names match config file fields
pub appdir: Option<String>,
Expand Down Expand Up @@ -121,26 +121,24 @@ impl Args {

#[cfg(feature = "devnet-prealloc")]
if let Some(num_prealloc_utxos) = self.num_prealloc_utxos {
let addr = Address::try_from(&self.prealloc_address.as_ref().unwrap()[..]).unwrap();
let spk = pay_to_address_script(&addr);
config.initial_utxo_set = Arc::new(
(1..=num_prealloc_utxos)
.map(|i| {
(
TransactionOutpoint { transaction_id: i.into(), index: 0 },
UtxoEntry {
amount: self.prealloc_amount,
script_public_key: spk.clone(),
block_daa_score: 0,
is_coinbase: false,
},
)
})
.collect(),
);
config.initial_utxo_set = Arc::new(self.generate_prealloc_utxos(num_prealloc_utxos));
}
}

#[cfg(feature = "devnet-prealloc")]
pub fn generate_prealloc_utxos(&self, num_prealloc_utxos: u64) -> kaspa_consensus_core::utxo::utxo_collection::UtxoCollection {
let addr = Address::try_from(&self.prealloc_address.as_ref().unwrap()[..]).unwrap();
let spk = pay_to_address_script(&addr);
(1..=num_prealloc_utxos)
.map(|i| {
(
TransactionOutpoint { transaction_id: i.into(), index: 0 },
UtxoEntry { amount: self.prealloc_amount, script_public_key: spk.clone(), block_daa_score: 0, is_coinbase: false },
)
})
.collect()
}

pub fn network(&self) -> NetworkId {
match (self.testnet, self.devnet, self.simnet) {
(false, false, false) => NetworkId::new(NetworkType::Mainnet),
Expand Down
2 changes: 1 addition & 1 deletion kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn get_app_dir() -> PathBuf {
fn validate_args(args: &Args) -> ConfigResult<()> {
#[cfg(feature = "devnet-prealloc")]
{
if args.num_prealloc_utxos.is_some() && !args.devnet {
if args.num_prealloc_utxos.is_some() && !(args.devnet || args.simnet) {
return Err(ConfigError::PreallocUtxosOnNonDevnet);
}

Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub type GrpcClientNotifier = Notifier<Notification, ChannelConnection>;

type DirectSubscriptions = Mutex<EventArray<SingleSubscription>>;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct GrpcClient {
inner: Arc<Inner>,
/// In multi listener mode, a full-featured Notifier
Expand Down Expand Up @@ -122,6 +122,7 @@ impl GrpcClient {
pub async fn start(&self, notify: Option<GrpcClientNotify>) {
match &self.notification_mode {
NotificationMode::MultiListeners => {
assert!(notify.is_none(), "client is on multi-listeners mode");
self.notifier.clone().unwrap().start();
}
NotificationMode::Direct => {
Expand Down
17 changes: 10 additions & 7 deletions rpc/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ impl RpcCoreService {
.await
.unwrap_or_default()
}

fn has_sufficient_peer_connectivity(&self) -> bool {
// Other network types can be used in an isolated environment without peers
!matches!(self.flow_context.config.net.network_type, Mainnet | Testnet) || self.flow_context.hub().has_peers()
}
}

#[async_trait]
Expand All @@ -214,7 +219,7 @@ impl RpcApi for RpcCoreService {
let session = self.consensus_manager.consensus().session().await;

// TODO: consider adding an error field to SubmitBlockReport to document both the report and error fields
let is_synced: bool = self.flow_context.hub().has_peers() && session.async_is_nearly_synced().await;
let is_synced: bool = self.has_sufficient_peer_connectivity() && session.async_is_nearly_synced().await;

if !self.config.enable_unsynced_mining && !is_synced {
// error = "Block not submitted - node is not synced"
Expand Down Expand Up @@ -282,7 +287,7 @@ impl RpcApi for RpcCoreService {
self.config.is_nearly_synced(block_template.selected_parent_timestamp, block_template.selected_parent_daa_score);
Ok(GetBlockTemplateResponse {
block: (&block_template.block).into(),
is_synced: self.flow_context.hub().has_peers() && is_nearly_synced,
is_synced: self.has_sufficient_peer_connectivity() && is_nearly_synced,
})
}

Expand Down Expand Up @@ -354,9 +359,7 @@ impl RpcApi for RpcCoreService {
mempool_size: self.mining_manager.clone().transaction_count(true, false).await as u64,
server_version: version().to_string(),
is_utxo_indexed: self.config.utxoindex,
is_synced: (!matches!(self.flow_context.config.net.network_type, Mainnet | Testnet) // Other network types can be used in an isolated environments without peers
|| self.flow_context.hub().has_peers())
&& is_nearly_synced,
is_synced: self.has_sufficient_peer_connectivity() && is_nearly_synced,
has_notify_command: true,
has_message_id: true,
})
Expand Down Expand Up @@ -702,7 +705,7 @@ impl RpcApi for RpcCoreService {

async fn get_server_info_call(&self, _request: GetServerInfoRequest) -> RpcResult<GetServerInfoResponse> {
let session = self.consensus_manager.consensus().session().await;
let is_synced: bool = self.flow_context.hub().has_peers() && session.async_is_nearly_synced().await;
let is_synced: bool = self.has_sufficient_peer_connectivity() && session.async_is_nearly_synced().await;
let virtual_daa_score = session.async_get_virtual_daa_score().await;

Ok(GetServerInfoResponse {
Expand All @@ -717,7 +720,7 @@ impl RpcApi for RpcCoreService {

async fn get_sync_status_call(&self, _request: GetSyncStatusRequest) -> RpcResult<GetSyncStatusResponse> {
let session = self.consensus_manager.consensus().session().await;
let is_synced: bool = self.flow_context.hub().has_peers() && session.async_is_nearly_synced().await;
let is_synced: bool = self.has_sufficient_peer_connectivity() && session.async_is_nearly_synced().await;
Ok(GetSyncStatusResponse { is_synced })
}

Expand Down
3 changes: 2 additions & 1 deletion testing/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ kaspa-consensusmanager.workspace = true
kaspad.workspace = true
kaspa-grpc-client.workspace = true
kaspa-rpc-core.workspace = true
kaspa-notify.workspace = true

faster-hex.workspace = true
thiserror.workspace = true
Expand Down Expand Up @@ -58,7 +59,7 @@ criterion.workspace = true
rand = { workspace = true, features = ["small_rng"] }
tokio = { workspace = true, features = ["rt", "macros"] }
kaspa-txscript-errors.workspace = true
port-selector = "0.1.6"

[features]
html_reports = []
devnet-prealloc = ["kaspad/devnet-prealloc"]
86 changes: 86 additions & 0 deletions testing/integration/src/common/daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use kaspa_consensus_core::network::NetworkId;
use kaspa_core::{core::Core, signals::Shutdown};
use kaspa_database::utils::get_kaspa_tempdir;
use kaspa_grpc_client::GrpcClient;
use kaspa_rpc_core::notify::mode::NotificationMode;
use kaspad::{args::Args, daemon::create_core_with_runtime};
use std::{sync::Arc, time::Duration};
use tempfile::TempDir;

pub struct Daemon {
// Type and suffix of the daemon network
pub network: NetworkId,

// Daemon ports
pub rpc_port: u16,
pub p2p_port: u16,

core: Arc<Core>,
workers: Option<Vec<std::thread::JoinHandle<()>>>,

_appdir_tempdir: TempDir,
}

impl Daemon {
pub fn new_random() -> Daemon {
let args = Args { devnet: true, ..Default::default() };
Self::new_random_with_args(args)
}

pub fn new_random_with_args(mut args: Args) -> Daemon {
// This should ask the OS to allocate free port for socket 1 to 4.
let socket1 = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let rpc_port = socket1.local_addr().unwrap().port();

let socket2 = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let p2p_port = socket2.local_addr().unwrap().port();

let socket3 = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let rpc_json_port = socket3.local_addr().unwrap().port();

let socket4 = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let rpc_borsh_port = socket4.local_addr().unwrap().port();

drop(socket1);
drop(socket2);
drop(socket3);
drop(socket4);

args.rpclisten = Some(format!("0.0.0.0:{rpc_port}").try_into().unwrap());
args.listen = Some(format!("0.0.0.0:{p2p_port}").try_into().unwrap());
args.rpclisten_json = Some(format!("0.0.0.0:{rpc_json_port}").parse().unwrap());
args.rpclisten_borsh = Some(format!("0.0.0.0:{rpc_borsh_port}").parse().unwrap());
let appdir_tempdir = get_kaspa_tempdir();
args.appdir = Some(appdir_tempdir.path().to_str().unwrap().to_owned());

let network = args.network();
let core = create_core_with_runtime(&Default::default(), &args);
Daemon { network, rpc_port, p2p_port, core, workers: None, _appdir_tempdir: appdir_tempdir }
}

pub async fn start(&mut self) -> GrpcClient {
self.workers = Some(self.core.start());
// Wait for the node to initialize before connecting to RPC
tokio::time::sleep(Duration::from_secs(1)).await;
self.new_client().await
}

pub fn shutdown(&mut self) {
if let Some(workers) = self.workers.take() {
self.core.shutdown();
self.core.join(workers);
}
}

pub async fn new_client(&self) -> GrpcClient {
GrpcClient::connect(NotificationMode::Direct, format!("grpc://localhost:{}", self.rpc_port), true, None, false, Some(500_000))
.await
.unwrap()
}
}

impl Drop for Daemon {
fn drop(&mut self) {
self.shutdown()
}
}
2 changes: 2 additions & 0 deletions testing/integration/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
path::Path,
};

pub mod daemon;

pub fn open_file(file_path: &Path) -> File {
let file_res = File::open(file_path);
match file_res {
Expand Down
Loading

0 comments on commit 7cd9e15

Please sign in to comment.