diff --git a/src/generic_stake_pool.rs b/src/generic_stake_pool.rs index cacbda73..dcdbcfc9 100644 --- a/src/generic_stake_pool.rs +++ b/src/generic_stake_pool.rs @@ -17,7 +17,6 @@ pub struct ValidatorStake { pub identity: Pubkey, pub vote_address: Pubkey, pub stake_state: ValidatorStakeState, - pub memo: String, } pub trait GenericStakePool { @@ -26,5 +25,5 @@ pub trait GenericStakePool { rpc_client: &RpcClient, dry_run: bool, desired_validator_stake: &[ValidatorStake], - ) -> Result, Box>; + ) -> Result<(Vec, bool), Box>; } diff --git a/src/legacy_stake_pool.rs b/src/legacy_stake_pool.rs index 000017bd..13cc83b7 100644 --- a/src/legacy_stake_pool.rs +++ b/src/legacy_stake_pool.rs @@ -41,7 +41,7 @@ impl GenericStakePool for LegacyStakePool { rpc_client: &RpcClient, dry_run: bool, validator_stake: &[ValidatorStake], - ) -> Result, Box> { + ) -> Result<(Vec, bool), Box> { let (init_transactions, update_transactions) = self.build_transactions( rpc_client, self.authorized_staker.pubkey(), @@ -53,12 +53,14 @@ impl GenericStakePool for LegacyStakePool { dry_run, init_transactions, &self.authorized_staker, - &mut vec![], - )? { + )? + .failed + .is_empty() + { return Err("Failed to initialize stake pool. Unable to continue".into()); } - let mut notifications = vec![ + let notes = vec![ format!("Baseline stake amount: {}", Sol(self.baseline_stake_amount)), format!("Bonus stake amount: {}", Sol(self.bonus_stake_amount)), ]; @@ -67,25 +69,24 @@ impl GenericStakePool for LegacyStakePool { dry_run, update_transactions, &self.authorized_staker, - &mut notifications, - )?; + )? + .failed + .is_empty(); if !ok { error!("One or more transactions failed to execute") } - Ok(notifications) + Ok((notes, ok)) } } -type TransactionWithMemo = (Transaction, String); - impl LegacyStakePool { fn build_transactions( &mut self, rpc_client: &RpcClient, authorized_staker: Pubkey, validator_stake: &[ValidatorStake], - ) -> Result<(Vec, Vec), Box> { + ) -> Result<(Vec, Vec), Box> { let mut init_transactions = vec![]; let mut update_transactions = vec![]; let mut source_stake_lamports_required = 0; @@ -114,7 +115,6 @@ impl LegacyStakePool { for ValidatorStake { identity, vote_address, - memo, stake_state, } in validator_stake { @@ -153,27 +153,23 @@ impl LegacyStakePool { ) })?.state } else { - let memo = format!( + info!( "Creating baseline stake account for validator {} ({})", identity, baseline_stake_address ); - debug!("Adding transaction: {}", memo); source_stake_lamports_required += self.baseline_stake_amount; - init_transactions.push(( - Transaction::new_unsigned(Message::new( - &stake_instruction::split_with_seed( - &self.source_stake_address, - &authorized_staker, - self.baseline_stake_amount, - &baseline_stake_address, - &authorized_staker, - baseline_seed, - ), - Some(&authorized_staker), - )), - memo, - )); + init_transactions.push(Transaction::new_unsigned(Message::new( + &stake_instruction::split_with_seed( + &self.source_stake_address, + &authorized_staker, + self.baseline_stake_amount, + &baseline_stake_address, + &authorized_staker, + baseline_seed, + ), + Some(&authorized_staker), + ))); StakeActivationState::Inactive }; @@ -192,26 +188,22 @@ impl LegacyStakePool { })? .state } else { - let memo = format!( + info!( "Creating bonus stake account for validator {} ({})", identity, bonus_stake_address ); - debug!("Adding transaction: {}", memo); source_stake_lamports_required += self.bonus_stake_amount; - init_transactions.push(( - Transaction::new_unsigned(Message::new( - &stake_instruction::split_with_seed( - &self.source_stake_address, - &authorized_staker, - self.bonus_stake_amount, - &bonus_stake_address, - &authorized_staker, - bonus_seed, - ), - Some(&authorized_staker), - )), - memo, - )); + init_transactions.push(Transaction::new_unsigned(Message::new( + &stake_instruction::split_with_seed( + &self.source_stake_address, + &authorized_staker, + self.bonus_stake_amount, + &bonus_stake_address, + &authorized_staker, + bonus_seed, + ), + Some(&authorized_staker), + ))); StakeActivationState::Inactive }; @@ -224,7 +216,7 @@ impl LegacyStakePool { bonus_stake_activation_state, stake_state, ) { - update_transactions.push((transaction, memo.clone())) + update_transactions.push(transaction) } } diff --git a/src/main.rs b/src/main.rs index 406b2062..400ee6d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,8 +37,9 @@ use { std::{ collections::{HashMap, HashSet}, error, - fs::File, - path::PathBuf, + fs::{self, File}, + io::{self, Write}, + path::{Path, PathBuf}, process, str::FromStr, }, @@ -152,6 +153,7 @@ fn release_version_of(matches: &ArgMatches<'_>, name: &str) -> Option ValidatorList { match cluster { "mainnet-beta" => validator_list::mainnet_beta_validators(), "testnet" => validator_list::testnet_validators(), - "unknown" => { + "custom" => { let validator_list_file = File::open(value_t_or_exit!(matches, "--validator-list", PathBuf)).unwrap_or_else( |err| { @@ -308,8 +311,10 @@ fn get_config() -> BoxResult<(Config, RpcClient, ValidatorList, Box BoxResult<(Config, RpcClient, ValidatorList, Box BoxResult<(Config, RpcClient, ValidatorList, Box BoxResult<(Config, RpcClient, ValidatorList, Box value_t!(matches, "json_rpc_url", String) .unwrap_or_else(|_| "http://testnet.solana.com".into()), - "unknown" => value_t!(matches, "json_rpc_url", String) + "custom" => value_t!(matches, "json_rpc_url", String) .unwrap_or_else(|_| config.json_rpc_url.clone()), _ => unreachable!(), }; + let db_path = value_t_or_exit!(matches, "db_path", PathBuf); + let cluster_data_dir = db_path.join(format!("data-{}", cluster)); let confirmed_block_cache_path = matches .value_of("confirmed_block_cache_path") @@ -607,6 +622,7 @@ fn get_config() -> BoxResult<(Config, RpcClient, ValidatorList, Box, } -#[derive(Deserialize, Serialize)] +#[derive(Default, Deserialize, Serialize, Clone)] struct EpochClassificationV1 { + // Data Center observations for this epoch data_center_info: Vec, - paused: bool, // paused due to unusual information, to prevent accidental removal of too much stake - validator_classifications: HashMap, + // `None` indicates a pause due to unusual observations during classification + validator_classifications: Option>, + + // Informational notes regarding this epoch notes: Vec, } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] enum EpochClassification { V1(EpochClassificationV1), } impl EpochClassification { - fn new(v1: EpochClassificationV1) -> Self { + pub fn new(v1: EpochClassificationV1) -> Self { EpochClassification::V1(v1) } - fn into_current(self) -> EpochClassificationV1 { + pub fn into_current(self) -> EpochClassificationV1 { match self { EpochClassification::V1(v1) => v1, } } + + fn file_name

(epoch: Epoch, path: P) -> PathBuf + where + P: AsRef, + { + path.as_ref().join(format!("epoch-{}.yml", epoch)) + } + + pub fn exists

(epoch: Epoch, path: P) -> bool + where + P: AsRef, + { + Self::file_name(epoch, path).exists() + } + + pub fn load

(epoch: Epoch, path: P) -> Result + where + P: AsRef, + { + let file = File::open(Self::file_name(epoch, path))?; + serde_yaml::from_reader(file) + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err))) + } + + pub fn save

(&self, epoch: Epoch, path: P) -> Result<(), io::Error> + where + P: AsRef, + { + let serialized = serde_yaml::to_string(self) + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; + + fs::create_dir_all(&path)?; + let mut file = File::create(Self::file_name(epoch, path))?; + file.write_all(&serialized.into_bytes())?; + + Ok(()) + } } fn classify( @@ -1043,118 +1101,120 @@ fn classify( )); } - let paused = too_many_poor_voters || too_many_old_validators || too_many_poor_block_producers; - - let mut validator_classifications = HashMap::new(); - - if paused { - notes.push("Paused".to_string()); - } else { - for VoteAccountInfo { - identity, - vote_address, - commission, - epoch_credits, - } in vote_account_info - { - if !validator_list.contains(&identity) { - continue; - } + let validator_classifications = + if too_many_poor_voters || too_many_old_validators || too_many_poor_block_producers { + notes.push("Stake adjustments skipped this epoch".to_string()); + None + } else { + let mut validator_classifications = HashMap::new(); - let block_producer_classification_reason_msg = block_producer_classification_reason - .get(&identity) - .cloned() - .unwrap_or_default(); - let vote_credits_msg = - format!("{} credits earned in epoch {}", epoch_credits, last_epoch); - - let mut validator_notes = vec![]; - - let infrastructure_concentration_destake_reason = infrastructure_concentration_too_high - .get(&identity) - .map(|concentration| { - config - .infrastructure_concentration_affects - .memo(&identity, *concentration) - }) - .and_then(|affect| match affect { - InfrastructureConcentrationAffectKind::Destake(reason) => Some(reason), - InfrastructureConcentrationAffectKind::Warn(reason) => { - validator_notes.push(reason); - None - } - }); - - let (stake_state, reason) = - if let Some(reason) = infrastructure_concentration_destake_reason { - (ValidatorStakeState::No, reason) - } else if commission > config.max_commission { - ( - ValidatorStakeState::No, - format!("commission is too high: {}% commission", commission), - ) - } else if poor_voters.contains(&identity) { - ( - ValidatorStakeState::No, - format!("insufficient vote credits: {}", vote_credits_msg), - ) - } else if cluster_nodes_with_old_version.contains_key(&identity.to_string()) { - ( - ValidatorStakeState::No, - format!( - "Outdated solana release: {}", - cluster_nodes_with_old_version - .get(&identity.to_string()) - .unwrap() - ), - ) - } else if quality_block_producers.contains(&identity) { - ( - ValidatorStakeState::Bonus, - format!( - "good block production during epoch {}: {}", - last_epoch, block_producer_classification_reason_msg - ), - ) - } else if poor_block_producers.contains(&identity) { - ( - ValidatorStakeState::Baseline, - format!( - "poor block production during epoch {}: {} ", - last_epoch, block_producer_classification_reason_msg - ), - ) - } else { - assert!(!poor_voters.contains(&identity)); - (ValidatorStakeState::Baseline, vote_credits_msg) - }; + for VoteAccountInfo { + identity, + vote_address, + commission, + epoch_credits, + } in vote_account_info + { + if !validator_list.contains(&identity) { + continue; + } - debug!( - "\nidentity: {}\n - vote address: {}\n - stake state: {:?} - {}", - identity, vote_address, stake_state, reason - ); + let block_producer_classification_reason_msg = block_producer_classification_reason + .get(&identity) + .cloned() + .unwrap_or_default(); + let vote_credits_msg = + format!("{} credits earned in epoch {}", epoch_credits, last_epoch); + + let mut validator_notes = vec![]; + + let infrastructure_concentration_destake_reason = + infrastructure_concentration_too_high + .get(&identity) + .map(|concentration| { + config + .infrastructure_concentration_affects + .memo(&identity, *concentration) + }) + .and_then(|affect| match affect { + InfrastructureConcentrationAffectKind::Destake(reason) => Some(reason), + InfrastructureConcentrationAffectKind::Warn(reason) => { + validator_notes.push(reason); + None + } + }); + + let (stake_state, reason) = + if let Some(reason) = infrastructure_concentration_destake_reason { + (ValidatorStakeState::No, reason) + } else if commission > config.max_commission { + ( + ValidatorStakeState::No, + format!("commission is too high: {}% commission", commission), + ) + } else if poor_voters.contains(&identity) { + ( + ValidatorStakeState::No, + format!("insufficient vote credits: {}", vote_credits_msg), + ) + } else if cluster_nodes_with_old_version.contains_key(&identity.to_string()) { + ( + ValidatorStakeState::No, + format!( + "Outdated solana release: {}", + cluster_nodes_with_old_version + .get(&identity.to_string()) + .unwrap() + ), + ) + } else if quality_block_producers.contains(&identity) { + ( + ValidatorStakeState::Bonus, + format!( + "good block production during epoch {}: {}", + last_epoch, block_producer_classification_reason_msg + ), + ) + } else if poor_block_producers.contains(&identity) { + ( + ValidatorStakeState::Baseline, + format!( + "poor block production during epoch {}: {} ", + last_epoch, block_producer_classification_reason_msg + ), + ) + } else { + assert!(!poor_voters.contains(&identity)); + (ValidatorStakeState::Baseline, vote_credits_msg) + }; + + debug!( + "\nidentity: {}\n - vote address: {}\n - stake state: {:?} - {}", + identity, vote_address, stake_state, reason + ); - validator_classifications.insert( - identity, - ValidatorClassification { + validator_classifications.insert( identity, - vote_address, - stake_state, - stake_state_reason: reason, - notes: validator_notes, - }, - ); - } - notes.push(format!( - "{} validators processed", - validator_classifications.len() - )); - } + ValidatorClassification { + identity, + vote_address, + stake_state, + stake_state_reason: reason, + notes: validator_notes, + }, + ); + } + notes.push(format!( + "{} validators processed", + validator_classifications.len() + )); + + Some(validator_classifications) + }; Ok(EpochClassification::new(EpochClassificationV1 { data_center_info, validator_classifications, - paused, notes, })) } @@ -1173,48 +1233,132 @@ fn main() -> BoxResult<()> { return Err("A notifier must be active with --confirm".into()); } - let epoch_info = rpc_client.get_epoch_info()?; - info!("Epoch info: {:?}", epoch_info); - if epoch_info.epoch == 0 { + let epoch = rpc_client.get_epoch_info()?.epoch; + info!("Epoch: {:?}", epoch); + if epoch == 0 { return Ok(()); } + info!("Data directory: {}", config.cluster_data_dir.display()); + + let mut previous_epoch = epoch; + let previous_epoch_classification = loop { + if previous_epoch == 0 { + info!("No previous EpochClassification found"); + break EpochClassificationV1::default(); + } + previous_epoch -= 1; + + if EpochClassification::exists(previous_epoch, &config.cluster_data_dir) { + let previous_epoch_classification = + EpochClassification::load(previous_epoch, &config.cluster_data_dir)?.into_current(); + + if previous_epoch_classification + .validator_classifications + .is_some() + { + info!( + "Previous EpochClassification found for epoch {}", + previous_epoch + ); + break previous_epoch_classification; + } else { + info!( + "Skipping previous EpochClassification for epoch {}", + previous_epoch + ); + } + } + }; + + let (epoch_classification, first_time) = + if EpochClassification::exists(epoch, &config.cluster_data_dir) { + info!("Classification for {} already exists", epoch); + ( + EpochClassification::load(epoch, &config.cluster_data_dir)?, + false, + ) + } else { + let epoch_classification = classify(&rpc_client, &config, epoch, &validator_list)?; + epoch_classification.save(epoch, &config.cluster_data_dir)?; + (epoch_classification, true) + }; + let EpochClassificationV1 { - notes: mut classify_notifications, + mut notes, validator_classifications, .. - } = classify(&rpc_client, &config, epoch_info.epoch, &validator_list)?.into_current(); - - let desired_validator_stake: Vec<_> = validator_classifications - .values() - .map(|vc| { - classify_notifications.extend( - vc.notes - .iter() - .map(|note| format!("Note: {}: {}", vc.identity, note)), - ); - - ValidatorStake { - identity: vc.identity, - vote_address: vc.vote_address, - stake_state: vc.stake_state, - memo: format!( - "* {:?} stake: {}: {}", - vc.stake_state, vc.identity, vc.stake_state_reason - ), - } - }) - .collect(); + } = epoch_classification.into_current(); + + let mut validator_stake_change_notes = vec![]; + let mut validator_notes = vec![]; + let success = if let Some(validator_classifications) = validator_classifications { + let previous_validator_classifications = previous_epoch_classification + .validator_classifications + .unwrap_or_default(); + + let desired_validator_stake: Vec<_> = validator_classifications + .values() + .map(|vc| { + validator_notes.extend( + vc.notes + .iter() + .map(|note| format!("Note: {}: {}", vc.identity, note)), + ); + + let stake_state_changed = match previous_validator_classifications + .get(&vc.identity) + .map(|prev_vc| prev_vc.stake_state) + { + Some(previous_stake_state) => previous_stake_state != vc.stake_state, + None => true, + }; + + if stake_state_changed { + validator_stake_change_notes.push(format!( + "* {:?} stake: {}: {}", + vc.stake_state, vc.identity, vc.stake_state_reason + )); + } + + ValidatorStake { + identity: vc.identity, + vote_address: vc.vote_address, + stake_state: vc.stake_state, + } + }) + .collect(); - let apply_notifications = - stake_pool.apply(&rpc_client, config.dry_run, &desired_validator_stake)?; + let (stake_pool_notes, success) = + stake_pool.apply(&rpc_client, config.dry_run, &desired_validator_stake)?; + notes.extend(stake_pool_notes); - for notification in classify_notifications.iter().chain(&apply_notifications) { - info!("notification: {}", notification); - notifier.send(¬ification); + validator_notes.sort(); + notes.extend(validator_notes); + + validator_stake_change_notes.sort(); + notes.extend(validator_stake_change_notes); + + success + } else { + true + }; + + // Only notify the user if this is the first run for this epoch + if first_time { + for note in notes { + info!("notification: {}", note); + notifier.send(¬e); + } + } else { + info!("notifications skipped on re-run"); } - Ok(()) + if success { + Ok(()) + } else { + Err("something failed".into()) + } } #[cfg(test)] diff --git a/src/rpc_client_utils.rs b/src/rpc_client_utils.rs index bc1e744d..37617dc5 100644 --- a/src/rpc_client_utils.rs +++ b/src/rpc_client_utils.rs @@ -15,7 +15,13 @@ use { signature::{Keypair, Signature, Signer}, transaction::Transaction, }, - std::{collections::HashMap, error, str::FromStr, thread::sleep, time::Duration}, + std::{ + collections::{HashMap, HashSet}, + error, + str::FromStr, + thread::sleep, + time::Duration, + }, }; pub fn retry_rpc_operation(mut retries: usize, op: F) -> client_error::Result @@ -81,13 +87,17 @@ pub fn simulate_transactions( Ok(simulated_transactions) } +pub struct SendAndConfirmTransactionResult { + pub succeeded: HashSet, + pub failed: HashSet, +} + pub fn send_and_confirm_transactions( rpc_client: &RpcClient, dry_run: bool, - transactions: Vec<(Transaction, String)>, + transactions: Vec, authorized_staker: &Keypair, - notifications: &mut Vec, -) -> Result> { +) -> Result> { let authorized_staker_balance = rpc_client.get_balance(&authorized_staker.pubkey())?; info!( "Authorized staker balance: {} SOL", @@ -97,7 +107,7 @@ pub fn send_and_confirm_transactions( let (blockhash, fee_calculator) = rpc_client.get_recent_blockhash()?; info!("{} transactions to send", transactions.len()); - let required_fee = transactions.iter().fold(0, |fee, (transaction, _)| { + let required_fee = transactions.iter().fold(0, |fee, transaction| { fee + fee_calculator.calculate_fee(&transaction.message) }); info!("Required fee: {} SOL", lamports_to_sol(required_fee)); @@ -105,25 +115,20 @@ pub fn send_and_confirm_transactions( return Err("Authorized staker has insufficient funds".into()); } - let mut pending_transactions = HashMap::new(); - for (mut transaction, memo) in transactions.into_iter() { + let mut pending_signatures = HashSet::new(); + for mut transaction in transactions { transaction.sign(&[authorized_staker], blockhash); - pending_transactions.insert(transaction.signatures[0], memo); + pending_signatures.insert(transaction.signatures[0]); if !dry_run { rpc_client.send_transaction(&transaction)?; } } - struct ConfirmedTransaction { - success: bool, - signature: Signature, - memo: String, - } - - let mut confirmed_transactions = vec![]; + let mut succeeded_transactions = HashSet::new(); + let mut failed_transactions = HashSet::new(); loop { - if pending_transactions.is_empty() { + if pending_signatures.is_empty() { break; } @@ -134,23 +139,21 @@ pub fn send_and_confirm_transactions( error!( "Blockhash {} expired with {} pending transactions", blockhash, - pending_transactions.len() + pending_signatures.len() ); - for (signature, memo) in pending_transactions.into_iter() { - confirmed_transactions.push(ConfirmedTransaction { - success: false, - signature, - memo, - }); + for signature in pending_signatures.into_iter() { + failed_transactions.insert(signature); } break; } - let pending_signatures = pending_transactions.keys().cloned().collect::>(); let mut statuses = vec![]; - for pending_signatures_chunk in - pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS - 1) + for pending_signatures_chunk in pending_signatures + .iter() + .cloned() + .collect::>() + .chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS - 1) { trace!( "checking {} pending_signatures", @@ -158,13 +161,14 @@ pub fn send_and_confirm_transactions( ); statuses.extend( rpc_client - .get_signature_statuses(&pending_signatures_chunk)? + .get_signature_statuses(pending_signatures_chunk)? .value .into_iter(), ) } assert_eq!(statuses.len(), pending_signatures.len()); + let mut still_pending_signatures = HashSet::new(); for (signature, status) in pending_signatures.into_iter().zip(statuses.into_iter()) { info!("{}: status={:?}", signature, status); let completed = if dry_run { @@ -180,37 +184,24 @@ pub fn send_and_confirm_transactions( }; if let Some(success) = completed { - warn!("{}: completed. success={}", signature, success); - let memo = pending_transactions.remove(&signature).unwrap(); - confirmed_transactions.push(ConfirmedTransaction { - success, - signature, - memo, - }); + warn!("{}: completed. success={}", signature, success); + if success { + succeeded_transactions.insert(signature); + } else { + failed_transactions.insert(signature); + } + } else { + still_pending_signatures.insert(signature); } } + pending_signatures = still_pending_signatures; sleep(Duration::from_millis(250)); } - confirmed_transactions.sort_by(|a, b| a.memo.cmp(&b.memo)); - - let mut ok = true; - - for ConfirmedTransaction { - success, - signature, - memo, - } in confirmed_transactions - { - if success { - info!("OK: {}: {}", signature, memo); - notifications.push(memo) - } else { - error!("FAIL: {}: {}", signature, memo); - ok = false - } - } - Ok(ok) + Ok(SendAndConfirmTransactionResult { + succeeded: succeeded_transactions, + failed: failed_transactions, + }) } pub struct VoteAccountInfo { diff --git a/src/stake_pool.rs b/src/stake_pool.rs index 19766426..78bdb505 100644 --- a/src/stake_pool.rs +++ b/src/stake_pool.rs @@ -40,7 +40,7 @@ impl GenericStakePool for SplStakePool { _rpc_client: &RpcClient, _dry_run: bool, _desired_validator_stake: &[ValidatorStake], - ) -> Result, Box> { - todo!(); + ) -> Result<(Vec, bool), Box> { + Ok((vec!["TODO".to_string()], false)) } } diff --git a/src/stake_pool_v0.rs b/src/stake_pool_v0.rs index 2e001e7a..337f67e6 100644 --- a/src/stake_pool_v0.rs +++ b/src/stake_pool_v0.rs @@ -96,7 +96,7 @@ impl GenericStakePool for StakePool { rpc_client: &RpcClient, dry_run: bool, desired_validator_stake: &[ValidatorStake], - ) -> Result, Box> { + ) -> Result<(Vec, bool), Box> { if dry_run { return Err("dryrun not supported".into()); } @@ -197,23 +197,25 @@ impl GenericStakePool for StakePool { info!("Bonus stake amount: {}", Sol(bonus_stake_amount)); - let mut notifications = vec![ + let notes = vec![ format!("Baseline stake amount: {}", Sol(self.baseline_stake_amount)), format!("Bonus stake amount: {}", Sol(bonus_stake_amount)), ]; - notifications.extend(distribute_validator_stake( - rpc_client, - &self.authorized_staker, - desired_validator_stake - .iter() - .filter(|vs| !busy_validators.contains(&vs.identity)) - .cloned(), - self.reserve_stake_address, - self.min_reserve_stake_balance, - self.baseline_stake_amount, - bonus_stake_amount, - )?); - Ok(notifications) + Ok(( + notes, + distribute_validator_stake( + rpc_client, + &self.authorized_staker, + desired_validator_stake + .iter() + .filter(|vs| !busy_validators.contains(&vs.identity)) + .cloned(), + self.reserve_stake_address, + self.min_reserve_stake_balance, + self.baseline_stake_amount, + bonus_stake_amount, + )?, + )) } } @@ -296,43 +298,37 @@ fn merge_orphaned_stake_accounts( match stake_activation.state { StakeActivationState::Activating | StakeActivationState::Deactivating => {} StakeActivationState::Active => { - transactions.push(( - Transaction::new_with_payer( - &[stake_instruction::deactivate_stake( - &stake_address, - &authorized_staker.pubkey(), - )], - Some(&authorized_staker.pubkey()), - ), - format!("Deactivating stake {}", stake_address), + transactions.push(Transaction::new_with_payer( + &[stake_instruction::deactivate_stake( + &stake_address, + &authorized_staker.pubkey(), + )], + Some(&authorized_staker.pubkey()), )); + info!("Deactivating stake {}", stake_address); } StakeActivationState::Inactive => { - transactions.push(( - Transaction::new_with_payer( - &stake_instruction::merge( - &reserve_stake_address, - &stake_address, - &authorized_staker.pubkey(), - ), - Some(&authorized_staker.pubkey()), - ), - format!( - "Merging orphaned stake, {}, into reserve {}", - stake_address, reserve_stake_address + transactions.push(Transaction::new_with_payer( + &stake_instruction::merge( + &reserve_stake_address, + &stake_address, + &authorized_staker.pubkey(), ), + Some(&authorized_staker.pubkey()), )); + + info!( + "Merging orphaned stake, {}, into reserve {}", + stake_address, reserve_stake_address + ); } } } - if !send_and_confirm_transactions( - rpc_client, - false, - transactions, - authorized_staker, - &mut vec![], - )? { + if !send_and_confirm_transactions(rpc_client, false, transactions, authorized_staker)? + .failed + .is_empty() + { Err("Failed to merge orphaned stake accounts".into()) } else { Ok(()) @@ -391,17 +387,15 @@ fn merge_transient_stake_accounts( &stake_account, &transient_stake_account, )? { - transactions.push(( - Transaction::new_with_payer( - &stake_instruction::merge( - &stake_address, - &transient_stake_address, - &authorized_staker.pubkey(), - ), - Some(&authorized_staker.pubkey()), + transactions.push(Transaction::new_with_payer( + &stake_instruction::merge( + &stake_address, + &transient_stake_address, + &authorized_staker.pubkey(), ), - format!("Merging active transient stake for {}", identity), + Some(&authorized_staker.pubkey()), )); + info!("Merging active transient stake for {}", identity); } else { warn!( "Unable to merge active transient stake for {} due to credits observed mismatch", @@ -411,29 +405,24 @@ fn merge_transient_stake_accounts( } } StakeActivationState::Inactive => { - transactions.push(( - Transaction::new_with_payer( - &stake_instruction::merge( - &reserve_stake_address, - &transient_stake_address, - &authorized_staker.pubkey(), - ), - Some(&authorized_staker.pubkey()), + transactions.push(Transaction::new_with_payer( + &stake_instruction::merge( + &reserve_stake_address, + &transient_stake_address, + &authorized_staker.pubkey(), ), - format!("Merging inactive transient stake for {}", identity), + Some(&authorized_staker.pubkey()), )); + info!("Merging inactive transient stake for {}", identity); } } } } - if !send_and_confirm_transactions( - rpc_client, - false, - transactions, - authorized_staker, - &mut vec![], - )? { + if !send_and_confirm_transactions(rpc_client, false, transactions, authorized_staker)? + .failed + .is_empty() + { Err("Failed to merge transient stake".into()) } else { Ok(()) @@ -549,26 +538,24 @@ fn create_validator_stake_accounts( vote_address, )); - transactions.push(( - Transaction::new_with_payer(&instructions, Some(&authorized_staker.pubkey())), - format!( - "Creating stake account for validator {} ({})", - identity, stake_address - ), + transactions.push(Transaction::new_with_payer( + &instructions, + Some(&authorized_staker.pubkey()), )); + info!( + "Creating stake account for validator {} ({})", + identity, stake_address + ); } warn!("Validator {} busy due to no stake account", identity); busy_validators.insert(*identity); } } - if !send_and_confirm_transactions( - rpc_client, - false, - transactions, - authorized_staker, - &mut vec![], - )? { + if !send_and_confirm_transactions(rpc_client, false, transactions, authorized_staker)? + .failed + .is_empty() + { Err("Failed to create validator stake accounts".into()) } else { Ok(()) @@ -583,7 +570,7 @@ fn distribute_validator_stake( min_reserve_stake_balance: u64, baseline_stake_amount: u64, bonus_stake_amount: u64, -) -> Result, Box> +) -> Result> where V: IntoIterator, { @@ -605,7 +592,6 @@ where for ValidatorStake { identity, stake_state, - memo, vote_address, } in desired_validator_stake { @@ -659,9 +645,9 @@ where &authorized_staker.pubkey(), )); - transactions.push(( - Transaction::new_with_payer(&instructions, Some(&authorized_staker.pubkey())), - format!("{}. Removing {} stake", memo, Sol(amount_to_remove)), + transactions.push(Transaction::new_with_payer( + &instructions, + Some(&authorized_staker.pubkey()), )); } } else if balance < desired_balance { @@ -697,9 +683,9 @@ where &vote_address, )); - transactions.push(( - Transaction::new_with_payer(&instructions, Some(&authorized_staker.pubkey())), - format!("{}. Adding {} stake", memo, Sol(amount_to_add)), + transactions.push(Transaction::new_with_payer( + &instructions, + Some(&authorized_staker.pubkey()), )); } } @@ -709,19 +695,14 @@ where Sol(reserve_stake_balance) ); - let mut notifications = vec![]; - let ok = send_and_confirm_transactions( - rpc_client, - false, - transactions, - authorized_staker, - &mut notifications, - )?; + let ok = send_and_confirm_transactions(rpc_client, false, transactions, authorized_staker)? + .failed + .is_empty(); if !ok { error!("One or more transactions failed to execute") } - Ok(notifications) + Ok(ok) } #[cfg(test)]