Skip to content

Commit

Permalink
feat: adding offworker ban queu
Browse files Browse the repository at this point in the history
  • Loading branch information
functor-flow committed Nov 23, 2024
1 parent 1b4374b commit 19c6c33
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 7 deletions.
33 changes: 31 additions & 2 deletions pallets/offworker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,22 @@ pub mod pallet {
let (valid_subnets, hanging_subnets) =
pallet_subnet_emission::Pallet::<T>::get_valid_subnets(Some(&acc_id));

log::info!("Valid subnets: {:?}", valid_subnets);
// The runtime mimics this logic, by deleting all storages related to consenus
// parameters and weights
hanging_subnets.iter().for_each(|subnet_id| {
log::warn!("Deleting subnet: {}", subnet_id);
Self::delete_subnet_state(subnet_id);
});

log::info!("Valid subnets: {:?}", valid_subnets);
let deregistered_subnets = Self::process_subnets(valid_subnets, acc_id, block_number);
let (queued_for_ban, remaining_valid_subnets) =
Self::handle_banned_subnets(&valid_subnets, &acc_id);

log::info!("Subnets queued for ban: {:?}", queued_for_ban);
log::info!("Valid subnets: {:?}", remaining_valid_subnets);

let deregistered_subnets =
Self::process_subnets(remaining_valid_subnets, acc_id, block_number);
deregistered_subnets.iter().for_each(|subnet_id| {
log::info!("Deregistered subnet: {}", subnet_id);
Self::delete_subnet_state(subnet_id);
Expand Down Expand Up @@ -312,6 +319,28 @@ impl<T: Config> Pallet<T> {
.build()
}

fn handle_banned_subnets(valid_subnets: &[u16], acc_id: &T::AccountId) -> (Vec<u16>, Vec<u16>) {
valid_subnets.iter().cloned().partition(|&subnet_id| {
if DecryptionNodeBanQueue::<T>::contains_key(subnet_id, acc_id) {
log::info!(
"Node is in ban queue for subnet {} - sending weights early",
subnet_id
);

if let Err(e) = Self::do_send_weights(subnet_id, I64F64::from_num(0), false) {
log::error!(
"Error sending early weights for subnet {}: {:?}",
subnet_id,
e
);
}
true
} else {
false
}
})
}

/// Decrypts all subnet weights for a given netuid. Returns a vector of block weights,
/// maintaining the length of consensus parameters. Failed decryptions result in empty vectors.
/// V2 TODO: make more efficient in v2, we can be just caching this
Expand Down
2 changes: 0 additions & 2 deletions pallets/offworker/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ where
decrypted_weights.len()
);

// log::info!("decrypted weights are: {:?}", decrypted_weights);

let weights_for_should_decrypt: Vec<_> = decrypted_weights
.iter()
.cloned()
Expand Down
57 changes: 55 additions & 2 deletions pallets/subnet_emission/src/decryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,65 @@ impl<T: Config> Pallet<T> {
.unwrap_or_else(|| MaxEncryptionPeriodDefaultValue::get().unwrap_or(10_800))
}

fn put_offworker_to_ban_queue(subnet_id: u16, info: &SubnetDecryptionInfo<T>, buffer: u64) {
let current_block = pallet_subspace::Pallet::<T>::get_current_block_number();
let ban_block = current_block.saturating_add(buffer);

// Add the node to the ban queue with the calculated ban block
DecryptionNodeBanQueue::<T>::insert(subnet_id, &info.node_id, ban_block);

// Emit an event
Self::deposit_event(Event::<T>::DecryptionNodeBanQueued {
subnet_id,
node_id: info.node_id.clone(),
ban_block,
});

log::info!(
"Offchain worker queued for ban: subnet {}, node {:?}, ban at block {}",
subnet_id,
info.node_id,
ban_block
);
}

pub(crate) fn process_ban_queue(block_number: &u64) {
DecryptionNodeBanQueue::<T>::iter()
.filter(|(_, _, ban_block)| ban_block <= block_number)
.for_each(|(subnet_id, node_id, _ban_block)| {
// Get the node info before removing from queue
if let Some(info) = SubnetDecryptionData::<T>::get(subnet_id) {
// Remove from ban queue first
DecryptionNodeBanQueue::<T>::remove(subnet_id, &node_id);

// Cancel and then ban
Self::cancel_offchain_worker(subnet_id, &info);
Self::ban_offchain_worker(&node_id);

Self::deposit_event(Event::<T>::DecryptionNodeBanned {
subnet_id,
node_id: node_id.clone(),
});

log::info!(
"Offchain worker banned at block {}: subnet {}, node {:?}",
block_number,
subnet_id,
node_id
);
}
});
}

pub fn cancel_expired_offchain_workers(block_number: u64) {
let max_inactivity_blocks =
T::PingInterval::get().saturating_mul(T::MissedPingsForInactivity::get() as u64);

// Get only subnets that use encryption and have encrypted weights
let (with_encryption, _) = Self::get_valid_subnets(None);

let buffer = T::EncryptionPeriodBuffer::get();

with_encryption
.into_iter()
.filter_map(|subnet_id| {
Expand All @@ -463,9 +515,10 @@ impl<T: Config> Pallet<T> {
block_number.saturating_sub(info.last_keep_alive) > max_inactivity_blocks
|| block_number.saturating_sub(info.activation_block.unwrap_or(u64::MAX))
> Self::get_max_encryption_interval(subnet_id)
.saturating_add(T::EncryptionPeriodBuffer::get())
})
.for_each(|(subnet_id, info)| Self::cancel_offchain_worker(subnet_id, &info));
.for_each(|(subnet_id, info)| {
Self::put_offworker_to_ban_queue(subnet_id, &info, buffer)
});
}

pub fn cleanup_weight_encryption_data(subnet_id: u16) {
Expand Down
17 changes: 17 additions & 0 deletions pallets/subnet_emission/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ pub mod pallet {
pub type BannedDecryptionNodes<T: Config> =
StorageMap<_, Identity, T::AccountId, u64, ValueQuery>;

/// Stores offchain workers that are going to be banned, if their weights aren't received within
/// the buffer period
/// Subnet: u16 , Decryption Node: AccountId, Buffer: BlockNumber (current block + buffer)
#[pallet::storage]
pub type DecryptionNodeBanQueue<T: Config> =
StorageDoubleMap<_, Identity, u16, Identity, T::AccountId, u64, ValueQuery>;

#[pallet::storage]
pub type PendingEmission<T> = StorageMap<_, Identity, u16, u64, ValueQuery>;

Expand Down Expand Up @@ -217,6 +224,7 @@ pub mod pallet {
log::info!("Distributed subnets to nodes");
Self::assign_activation_blocks(block_number);
Self::cancel_expired_offchain_workers(block_number);
Self::process_ban_queue(block_number);
log::info!("Cancelled expired offchain workers");
let emission_per_block = Self::get_total_emission_per_block();
log::info!("Emission per block: {:?}", emission_per_block);
Expand Down Expand Up @@ -259,6 +267,15 @@ pub mod pallet {
previous_node_id: T::AccountId,
new_node_id: T::AccountId,
},
DecryptionNodeBanQueued {
subnet_id: u16,
node_id: T::AccountId,
ban_block: u64,
},
DecryptionNodeBanned {
subnet_id: u16,
node_id: T::AccountId,
},
}

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion pallets/subspace/src/params/subnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<T: Config> ValidatedSubnetParams<T> {

if let Some(encryption_period) = max_encryption_period {
ensure!(
*encryption_period >= u64::from(*tempo)
*encryption_period > u64::from(*tempo)
&& *encryption_period <= MAX_ENCRYPTION_DURATION,
Error::<T>::InvalidMaxEncryptionPeriod
);
Expand Down

0 comments on commit 19c6c33

Please sign in to comment.