From 19c6c33f91a19a7cbba5a977b1c458a887850c89 Mon Sep 17 00:00:00 2001 From: Honza <115138297+Supremesource@users.noreply.github.com> Date: Sat, 23 Nov 2024 15:40:54 -0300 Subject: [PATCH] feat: adding offworker ban queu --- pallets/offworker/src/lib.rs | 33 ++++++++++++- pallets/offworker/src/util.rs | 2 - pallets/subnet_emission/src/decryption.rs | 57 ++++++++++++++++++++++- pallets/subnet_emission/src/lib.rs | 17 +++++++ pallets/subspace/src/params/subnet.rs | 2 +- 5 files changed, 104 insertions(+), 7 deletions(-) diff --git a/pallets/offworker/src/lib.rs b/pallets/offworker/src/lib.rs index 897fd8598..2565b5245 100644 --- a/pallets/offworker/src/lib.rs +++ b/pallets/offworker/src/lib.rs @@ -211,6 +211,7 @@ pub mod pallet { let (valid_subnets, hanging_subnets) = pallet_subnet_emission::Pallet::::get_valid_subnets(Some(&acc_id)); + log::info!("Valid subnets: {:?}", valid_subnets); // The runtime mimics this logic, by deleting all storages related to consenus // parameters and weights hanging_subnets.iter().for_each(|subnet_id| { @@ -218,8 +219,14 @@ pub mod pallet { Self::delete_subnet_state(subnet_id); }); - log::info!("Valid subnets: {:?}", valid_subnets); - let deregistered_subnets = Self::process_subnets(valid_subnets, acc_id, block_number); + let (queued_for_ban, remaining_valid_subnets) = + Self::handle_banned_subnets(&valid_subnets, &acc_id); + + log::info!("Subnets queued for ban: {:?}", queued_for_ban); + log::info!("Valid subnets: {:?}", remaining_valid_subnets); + + let deregistered_subnets = + Self::process_subnets(remaining_valid_subnets, acc_id, block_number); deregistered_subnets.iter().for_each(|subnet_id| { log::info!("Deregistered subnet: {}", subnet_id); Self::delete_subnet_state(subnet_id); @@ -312,6 +319,28 @@ impl Pallet { .build() } + fn handle_banned_subnets(valid_subnets: &[u16], acc_id: &T::AccountId) -> (Vec, Vec) { + valid_subnets.iter().cloned().partition(|&subnet_id| { + if DecryptionNodeBanQueue::::contains_key(subnet_id, acc_id) { + log::info!( + "Node is in ban queue for subnet {} - sending weights early", + subnet_id + ); + + if let Err(e) = Self::do_send_weights(subnet_id, I64F64::from_num(0), false) { + log::error!( + "Error sending early weights for subnet {}: {:?}", + subnet_id, + e + ); + } + true + } else { + false + } + }) + } + /// Decrypts all subnet weights for a given netuid. Returns a vector of block weights, /// maintaining the length of consensus parameters. Failed decryptions result in empty vectors. /// V2 TODO: make more efficient in v2, we can be just caching this diff --git a/pallets/offworker/src/util.rs b/pallets/offworker/src/util.rs index 853e69b00..6c3b59d03 100644 --- a/pallets/offworker/src/util.rs +++ b/pallets/offworker/src/util.rs @@ -102,8 +102,6 @@ where decrypted_weights.len() ); - // log::info!("decrypted weights are: {:?}", decrypted_weights); - let weights_for_should_decrypt: Vec<_> = decrypted_weights .iter() .cloned() diff --git a/pallets/subnet_emission/src/decryption.rs b/pallets/subnet_emission/src/decryption.rs index 7f9afa1e2..52ad06a7e 100644 --- a/pallets/subnet_emission/src/decryption.rs +++ b/pallets/subnet_emission/src/decryption.rs @@ -447,6 +447,56 @@ impl Pallet { .unwrap_or_else(|| MaxEncryptionPeriodDefaultValue::get().unwrap_or(10_800)) } + fn put_offworker_to_ban_queue(subnet_id: u16, info: &SubnetDecryptionInfo, buffer: u64) { + let current_block = pallet_subspace::Pallet::::get_current_block_number(); + let ban_block = current_block.saturating_add(buffer); + + // Add the node to the ban queue with the calculated ban block + DecryptionNodeBanQueue::::insert(subnet_id, &info.node_id, ban_block); + + // Emit an event + Self::deposit_event(Event::::DecryptionNodeBanQueued { + subnet_id, + node_id: info.node_id.clone(), + ban_block, + }); + + log::info!( + "Offchain worker queued for ban: subnet {}, node {:?}, ban at block {}", + subnet_id, + info.node_id, + ban_block + ); + } + + pub(crate) fn process_ban_queue(block_number: &u64) { + DecryptionNodeBanQueue::::iter() + .filter(|(_, _, ban_block)| ban_block <= block_number) + .for_each(|(subnet_id, node_id, _ban_block)| { + // Get the node info before removing from queue + if let Some(info) = SubnetDecryptionData::::get(subnet_id) { + // Remove from ban queue first + DecryptionNodeBanQueue::::remove(subnet_id, &node_id); + + // Cancel and then ban + Self::cancel_offchain_worker(subnet_id, &info); + Self::ban_offchain_worker(&node_id); + + Self::deposit_event(Event::::DecryptionNodeBanned { + subnet_id, + node_id: node_id.clone(), + }); + + log::info!( + "Offchain worker banned at block {}: subnet {}, node {:?}", + block_number, + subnet_id, + node_id + ); + } + }); + } + pub fn cancel_expired_offchain_workers(block_number: u64) { let max_inactivity_blocks = T::PingInterval::get().saturating_mul(T::MissedPingsForInactivity::get() as u64); @@ -454,6 +504,8 @@ impl Pallet { // Get only subnets that use encryption and have encrypted weights let (with_encryption, _) = Self::get_valid_subnets(None); + let buffer = T::EncryptionPeriodBuffer::get(); + with_encryption .into_iter() .filter_map(|subnet_id| { @@ -463,9 +515,10 @@ impl Pallet { block_number.saturating_sub(info.last_keep_alive) > max_inactivity_blocks || block_number.saturating_sub(info.activation_block.unwrap_or(u64::MAX)) > Self::get_max_encryption_interval(subnet_id) - .saturating_add(T::EncryptionPeriodBuffer::get()) }) - .for_each(|(subnet_id, info)| Self::cancel_offchain_worker(subnet_id, &info)); + .for_each(|(subnet_id, info)| { + Self::put_offworker_to_ban_queue(subnet_id, &info, buffer) + }); } pub fn cleanup_weight_encryption_data(subnet_id: u16) { diff --git a/pallets/subnet_emission/src/lib.rs b/pallets/subnet_emission/src/lib.rs index 61744ebb6..9d12495ca 100644 --- a/pallets/subnet_emission/src/lib.rs +++ b/pallets/subnet_emission/src/lib.rs @@ -170,6 +170,13 @@ pub mod pallet { pub type BannedDecryptionNodes = StorageMap<_, Identity, T::AccountId, u64, ValueQuery>; + /// Stores offchain workers that are going to be banned, if their weights aren't received within + /// the buffer period + /// Subnet: u16 , Decryption Node: AccountId, Buffer: BlockNumber (current block + buffer) + #[pallet::storage] + pub type DecryptionNodeBanQueue = + StorageDoubleMap<_, Identity, u16, Identity, T::AccountId, u64, ValueQuery>; + #[pallet::storage] pub type PendingEmission = StorageMap<_, Identity, u16, u64, ValueQuery>; @@ -217,6 +224,7 @@ pub mod pallet { log::info!("Distributed subnets to nodes"); Self::assign_activation_blocks(block_number); Self::cancel_expired_offchain_workers(block_number); + Self::process_ban_queue(block_number); log::info!("Cancelled expired offchain workers"); let emission_per_block = Self::get_total_emission_per_block(); log::info!("Emission per block: {:?}", emission_per_block); @@ -259,6 +267,15 @@ pub mod pallet { previous_node_id: T::AccountId, new_node_id: T::AccountId, }, + DecryptionNodeBanQueued { + subnet_id: u16, + node_id: T::AccountId, + ban_block: u64, + }, + DecryptionNodeBanned { + subnet_id: u16, + node_id: T::AccountId, + }, } #[derive(Debug)] diff --git a/pallets/subspace/src/params/subnet.rs b/pallets/subspace/src/params/subnet.rs index 310fa37d3..a784ac299 100644 --- a/pallets/subspace/src/params/subnet.rs +++ b/pallets/subspace/src/params/subnet.rs @@ -233,7 +233,7 @@ impl ValidatedSubnetParams { if let Some(encryption_period) = max_encryption_period { ensure!( - *encryption_period >= u64::from(*tempo) + *encryption_period > u64::from(*tempo) && *encryption_period <= MAX_ENCRYPTION_DURATION, Error::::InvalidMaxEncryptionPeriod );