From e4a4d61bb7b201eb24d4c7aee8da8e41f453bf61 Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 8 Mar 2024 15:44:13 -0600 Subject: [PATCH 1/7] Move default value for --rpc-pubsub-notification-threads to CLI The default value was previously being determined down where the thread pool is being created. Providing a default value at the CLI level is consistent with other args, and gives an operator better visibility into what the default will actually be --- Cargo.lock | 2 +- programs/sbf/Cargo.lock | 2 +- rpc/Cargo.toml | 1 - rpc/src/rpc_pubsub_service.rs | 6 +++--- rpc/src/rpc_subscriptions.rs | 3 +-- validator/Cargo.toml | 1 + validator/src/cli.rs | 4 ++++ validator/src/main.rs | 4 ++-- 8 files changed, 13 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 503b2280d86ec9..e009a1a50c3d17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-rayon-threadlimit", "solana-rpc", "solana-rpc-client", "solana-rpc-client-api", @@ -6856,7 +6857,6 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", - "solana-rayon-threadlimit", "solana-rpc-client-api", "solana-runtime", "solana-sdk", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 4d606fc4e9ed51..3c78b32b6b1dbe 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -118,6 +118,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-rayon-threadlimit", "solana-rpc", "solana-rpc-client", "solana-rpc-client-api", @@ -5589,7 +5590,6 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-poh", - "solana-rayon-threadlimit", "solana-rpc-client-api", "solana-runtime", "solana-sdk", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index d4f2648b6b1078..2f4441cd01fa48 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -40,7 +40,6 @@ solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } -solana-rayon-threadlimit = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 3e32503691d78e..a243eb91509b8c 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -43,7 +43,7 @@ pub struct PubSubConfig { pub queue_capacity_items: usize, pub queue_capacity_bytes: usize, pub worker_threads: usize, - pub notification_threads: Option, + pub notification_threads: usize, } impl Default for PubSubConfig { @@ -55,7 +55,7 @@ impl Default for PubSubConfig { queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: None, + notification_threads: 0, } } } @@ -69,7 +69,7 @@ impl PubSubConfig { queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: Some(2), + notification_threads: 2, } } } diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 7ecfd6a31a42cc..98b28a902c3ef5 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -19,7 +19,6 @@ use { solana_account_decoder::{parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding}, solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_measure::measure::Measure, - solana_rayon_threadlimit::get_thread_count, solana_rpc_client_api::response::{ ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate, RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext, @@ -631,7 +630,7 @@ impl RpcSubscriptions { config.queue_capacity_bytes, )), }; - let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count); + let notification_threads = config.notification_threads; let t_cleanup = if notification_threads == 0 { None } else { diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 74742c90faa29d..0a6324f454e2b2 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -50,6 +50,7 @@ solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } solana-program-runtime = { workspace = true } +solana-rayon-threadlimit = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index d1ad63b760f031..26af8dffd7a24c 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -26,6 +26,7 @@ use { solana_faucet::faucet::{self, FAUCET_PORT}, solana_ledger::use_snapshot_archives_at_startup, solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}, + solana_rayon_threadlimit::get_thread_count, solana_rpc::{rpc::MAX_REQUEST_BODY_SIZE, rpc_pubsub_service::PubSubConfig}, solana_rpc_client_api::request::MAX_MULTIPLE_ACCOUNTS, solana_runtime::{ @@ -1079,6 +1080,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .takes_value(true) .value_name("NUM_THREADS") .validator(is_parsable::) + .default_value(&default_args.rpc_pubsub_notification_threads) .help( "The maximum number of threads that RPC PubSub will use for generating \ notifications. 0 will disable RPC PubSub notifications", @@ -2138,6 +2140,7 @@ pub struct DefaultArgs { pub rpc_bigtable_max_message_size: String, pub rpc_max_request_body_size: String, pub rpc_pubsub_worker_threads: String, + pub rpc_pubsub_notification_threads: String, pub maximum_local_snapshot_age: String, pub maximum_full_snapshot_archives_to_retain: String, @@ -2225,6 +2228,7 @@ impl DefaultArgs { rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE .to_string(), rpc_pubsub_worker_threads: "4".to_string(), + rpc_pubsub_notification_threads: get_thread_count().to_string(), maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN .to_string(), maximum_incremental_snapshot_archives_to_retain: diff --git a/validator/src/main.rs b/validator/src/main.rs index b00eabfef9a7b0..5e5cb70a8c7b84 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1383,9 +1383,9 @@ pub fn main() { ), worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize), notification_threads: if full_api { - value_of(&matches, "rpc_pubsub_notification_threads") + value_t_or_exit!(matches, "rpc_pubsub_notification_threads", usize) } else { - Some(0) + 0 }, }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, From 5a6c4791bb354737dac3328f50d0e41c98e212c5 Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 8 Mar 2024 16:18:40 -0600 Subject: [PATCH 2/7] Switch from usize to Option --- rpc/src/rpc_pubsub_service.rs | 7 ++++--- rpc/src/rpc_subscriptions.rs | 20 ++++++-------------- validator/src/main.rs | 8 ++++++-- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index a243eb91509b8c..27ad76f88b1b0e 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -16,6 +16,7 @@ use { std::{ io, net::SocketAddr, + num::NonZeroUsize, str, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -43,7 +44,7 @@ pub struct PubSubConfig { pub queue_capacity_items: usize, pub queue_capacity_bytes: usize, pub worker_threads: usize, - pub notification_threads: usize, + pub notification_threads: Option, } impl Default for PubSubConfig { @@ -55,7 +56,7 @@ impl Default for PubSubConfig { queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: 0, + notification_threads: None, } } } @@ -69,7 +70,7 @@ impl PubSubConfig { queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: 2, + notification_threads: NonZeroUsize::new(2), } } } diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 98b28a902c3ef5..9ea7fb33e470d4 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -630,17 +630,14 @@ impl RpcSubscriptions { config.queue_capacity_bytes, )), }; - let notification_threads = config.notification_threads; - let t_cleanup = if notification_threads == 0 { - None - } else { + + let t_cleanup = config.notification_threads.map(|notification_threads| { let exit = exit.clone(); - Some( Builder::new() .name("solRpcNotifier".to_string()) .spawn(move || { let pool = rayon::ThreadPoolBuilder::new() - .num_threads(notification_threads) + .num_threads(notification_threads.get()) .thread_name(|i| format!("solRpcNotify{i:02}")) .build() .unwrap(); @@ -662,9 +659,8 @@ impl RpcSubscriptions { ) }); }) - .unwrap(), - ) - }; + .unwrap() + }); let control = SubscriptionControl::new( config.max_active_subscriptions, @@ -673,11 +669,7 @@ impl RpcSubscriptions { ); Self { - notification_sender: if notification_threads == 0 { - None - } else { - Some(notification_sender) - }, + notification_sender: config.notification_threads.map(|_| notification_sender), t_cleanup, exit, control, diff --git a/validator/src/main.rs b/validator/src/main.rs index 5e5cb70a8c7b84..c70fe938c75a71 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1383,9 +1383,13 @@ pub fn main() { ), worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize), notification_threads: if full_api { - value_t_or_exit!(matches, "rpc_pubsub_notification_threads", usize) + NonZeroUsize::new(value_t_or_exit!( + matches, + "rpc_pubsub_notification_threads", + usize + )) } else { - 0 + None }, }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, From 195f85106e381f51dc1d9657566137d15ee275d8 Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 8 Mar 2024 16:19:14 -0600 Subject: [PATCH 3/7] cargo fmt --- rpc/src/rpc_subscriptions.rs | 56 ++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 9ea7fb33e470d4..39d746c48049de 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -633,34 +633,34 @@ impl RpcSubscriptions { let t_cleanup = config.notification_threads.map(|notification_threads| { let exit = exit.clone(); - Builder::new() - .name("solRpcNotifier".to_string()) - .spawn(move || { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(notification_threads.get()) - .thread_name(|i| format!("solRpcNotify{i:02}")) - .build() - .unwrap(); - pool.install(|| { - if let Some(rpc_notifier_ready) = rpc_notifier_ready { - rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); - } - Self::process_notifications( - exit, - max_complete_transaction_status_slot, - max_complete_rewards_slot, - blockstore, - notifier, - notification_receiver, - subscriptions, - bank_forks, - block_commitment_cache, - optimistically_confirmed_bank, - ) - }); - }) - .unwrap() - }); + Builder::new() + .name("solRpcNotifier".to_string()) + .spawn(move || { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(notification_threads.get()) + .thread_name(|i| format!("solRpcNotify{i:02}")) + .build() + .unwrap(); + pool.install(|| { + if let Some(rpc_notifier_ready) = rpc_notifier_ready { + rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); + } + Self::process_notifications( + exit, + max_complete_transaction_status_slot, + max_complete_rewards_slot, + blockstore, + notifier, + notification_receiver, + subscriptions, + bank_forks, + block_commitment_cache, + optimistically_confirmed_bank, + ) + }); + }) + .unwrap() + }); let control = SubscriptionControl::new( config.max_active_subscriptions, From 920845b17bfc2940725bd4cb326dc2ffe8c47f67 Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 8 Mar 2024 19:10:06 -0600 Subject: [PATCH 4/7] Forgot to adjust the default struct value for new behavior --- Cargo.lock | 1 + programs/sbf/Cargo.lock | 1 + rpc/Cargo.toml | 1 + rpc/src/rpc_pubsub_service.rs | 3 ++- 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index e009a1a50c3d17..b92f4add5d401e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6857,6 +6857,7 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", + "solana-rayon-threadlimit", "solana-rpc-client-api", "solana-runtime", "solana-sdk", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 3c78b32b6b1dbe..9c78461b1b0a81 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5590,6 +5590,7 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-poh", + "solana-rayon-threadlimit", "solana-rpc-client-api", "solana-runtime", "solana-sdk", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 2f4441cd01fa48..d4f2648b6b1078 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -40,6 +40,7 @@ solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } +solana-rayon-threadlimit = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 27ad76f88b1b0e..99155e678675f5 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -12,6 +12,7 @@ use { jsonrpc_core::IoHandler, soketto::handshake::{server, Server}, solana_metrics::TokenCounter, + solana_rayon_threadlimit::get_thread_count, solana_sdk::timing::AtomicInterval, std::{ io, @@ -56,7 +57,7 @@ impl Default for PubSubConfig { queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: None, + notification_threads: NonZeroUsize::new(get_thread_count()), } } } From 493f469b8866d28673fc094f110ab5efd915879d Mon Sep 17 00:00:00 2001 From: steviez Date: Sat, 9 Mar 2024 00:49:57 -0600 Subject: [PATCH 5/7] Remove .requires(full_rpc_api) from pubsub_notification_threads arg --- validator/src/cli.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 26af8dffd7a24c..9bebc11c45427b 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1076,7 +1076,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .arg( Arg::with_name("rpc_pubsub_notification_threads") .long("rpc-pubsub-notification-threads") - .requires("full_rpc_api") .takes_value(true) .value_name("NUM_THREADS") .validator(is_parsable::) From 78645b7b210a966406557e380629ef93b1e90640 Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 11 Mar 2024 17:47:57 -0500 Subject: [PATCH 6/7] Add manual check for num threads requiring full RPC API --- validator/src/main.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/validator/src/main.rs b/validator/src/main.rs index c70fe938c75a71..28431f7fdaacd3 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1389,6 +1389,13 @@ pub fn main() { usize )) } else { + // Due to a CLAP bug, we can't use .requires("full_rpc_api") directly on + // the rpc_pubsub_notification_threads argument. Do the check manually here, + // and remove this when we get past 2.xy of CLAP + if matches.occurrences_of("rpc_pubsub_notification_threads") > 0 { + eprintln!("Use of --rpc_pubsub_notification_threads requires --full-rpc-api"); + exit(1); + } None }, }, From b9e71735f5cb50d1e6e10c3786eb377d4ce83116 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 12 Mar 2024 12:35:09 -0500 Subject: [PATCH 7/7] Use default_value_if() + required instead of the manual hack --- validator/src/cli.rs | 7 ++++++- validator/src/main.rs | 19 +++---------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 9bebc11c45427b..e9298d9c02928e 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1076,10 +1076,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .arg( Arg::with_name("rpc_pubsub_notification_threads") .long("rpc-pubsub-notification-threads") + .requires("full_rpc_api") .takes_value(true) .value_name("NUM_THREADS") .validator(is_parsable::) - .default_value(&default_args.rpc_pubsub_notification_threads) + .default_value_if( + "full_rpc_api", + None, + &default_args.rpc_pubsub_notification_threads, + ) .help( "The maximum number of threads that RPC PubSub will use for generating \ notifications. 0 will disable RPC PubSub notifications", diff --git a/validator/src/main.rs b/validator/src/main.rs index 28431f7fdaacd3..7f3de66b457c74 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1382,22 +1382,9 @@ pub fn main() { usize ), worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize), - notification_threads: if full_api { - NonZeroUsize::new(value_t_or_exit!( - matches, - "rpc_pubsub_notification_threads", - usize - )) - } else { - // Due to a CLAP bug, we can't use .requires("full_rpc_api") directly on - // the rpc_pubsub_notification_threads argument. Do the check manually here, - // and remove this when we get past 2.xy of CLAP - if matches.occurrences_of("rpc_pubsub_notification_threads") > 0 { - eprintln!("Use of --rpc_pubsub_notification_threads requires --full-rpc-api"); - exit(1); - } - None - }, + notification_threads: value_t!(matches, "rpc_pubsub_notification_threads", usize) + .ok() + .and_then(NonZeroUsize::new), }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),