Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync state subscription #244

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cabbf26
Add sync state change notifications in consensus process
biryukovmaxim Aug 23, 2023
76910de
"Add IBD progress reporting to notification system"
biryukovmaxim Aug 23, 2023
717ad0f
Add sync state `Blocks` for block synchronization
biryukovmaxim Aug 23, 2023
869f2ec
Add UTXO resync notification feature
biryukovmaxim Aug 24, 2023
5cf40d6
Add UtxoSync to SyncStateChangedNotification
biryukovmaxim Aug 24, 2023
5663b00
Add TrustSync state to SyncStateChangedNotification
biryukovmaxim Aug 24, 2023
01019f0
fix integration_tests providing notifiaction root as parameter
biryukovmaxim Aug 24, 2023
4bd6210
Refactor notification_root usage in ibd/flow.rs
biryukovmaxim Aug 24, 2023
8f5523d
"Add Synced state to SyncStateChangedNotification"
biryukovmaxim Aug 26, 2023
6779125
Refactor DAA parameters into dedicated struct
biryukovmaxim Aug 26, 2023
7eb1abf
Refactor sync state initialisation and value capture
biryukovmaxim Aug 26, 2023
0431b0e
Simplify condition check in sync_state.rs
biryukovmaxim Aug 26, 2023
f01fc2b
Remove unused SYNC_STATE import from integration tests
biryukovmaxim Aug 26, 2023
82bce93
Add SyncStateChanged to GRPC core
biryukovmaxim Aug 26, 2023
1fe5bd8
Add SyncStateChangedNotification to RpcApiOps
biryukovmaxim Aug 26, 2023
05adc29
Fix incorrect function call for SerdeJson encoding
biryukovmaxim Aug 26, 2023
420a046
Fix regex pattern for block headers in IBD
biryukovmaxim Aug 26, 2023
0af3950
"Remove unused sync_state.rs file and refactor consensus"
biryukovmaxim Aug 27, 2023
9d7d602
"Refactor DAA sync condition for readability"
biryukovmaxim Aug 27, 2023
8787288
Refactor sink handling in processor.rs
biryukovmaxim Aug 27, 2023
4053ddd
Refactor checking state sync in pruning_proof
biryukovmaxim Aug 27, 2023
c8a045e
fixes after rebase
biryukovmaxim Sep 1, 2023
80eb07e
fix clippy::redundant_locals
biryukovmaxim Oct 7, 2023
1961717
fix comma after rebase
biryukovmaxim Oct 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

153 changes: 87 additions & 66 deletions consensus/core/src/config/params.rs

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub enum Notification {

#[display(fmt = "NewBlockTemplate notification")]
NewBlockTemplate(NewBlockTemplateNotification),
#[display(fmt = "SyncStateChanged notification")]
SyncStateChanged(SyncStateChangedNotification),
}
}

Expand Down Expand Up @@ -172,3 +174,44 @@ pub struct PruningPointUtxoSetOverrideNotification {}

#[derive(Debug, Clone)]
pub struct NewBlockTemplateNotification {}

#[derive(Debug, Clone)]
pub enum SyncStateChangedNotification {
Proof { current: u8, max: u8 },
Headers { headers: u64, progress: i64 },
Blocks { blocks: u64, progress: i64 },
UtxoResync,
UtxoSync { chunks: u64, total: u64 },
TrustSync { processed: u64, total: u64 },
Synced,
}

impl SyncStateChangedNotification {
pub fn new_proof(current: u8, max: u8) -> Self {
Self::Proof { current, max }
}

pub fn new_headers(headers: u64, progress: i64) -> Self {
Self::Headers { headers, progress }
}

pub fn new_blocks(blocks: u64, progress: i64) -> Self {
Self::Blocks { blocks, progress }
}

pub fn new_utxo_resync() -> Self {
Self::UtxoResync
}

pub fn new_utxo_sync(chunks: u64, total: u64) -> Self {
Self::UtxoSync { chunks, total }
}

pub fn new_trust_sync(processed: u64, total: u64) -> Self {
Self::TrustSync { processed, total }
}

pub fn new_synced() -> Self {
Self::Synced
}
}
3 changes: 2 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ impl Consensus {
// Services and managers
//

let services = ConsensusServices::new(db.clone(), storage.clone(), config.clone(), tx_script_cache_counters);
let services =
ConsensusServices::new(db.clone(), storage.clone(), config.clone(), tx_script_cache_counters, notification_root.clone());

//
// Processor channels
Expand Down
16 changes: 10 additions & 6 deletions consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
};

use itertools::Itertools;
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_txscript::caches::TxScriptCacheCounters;
use std::sync::Arc;

Expand Down Expand Up @@ -71,6 +72,7 @@ impl ConsensusServices {
storage: Arc<ConsensusStorage>,
config: Arc<Config>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
notification_root: Arc<ConsensusNotificationRoot>,
) -> Arc<Self> {
let params = &config.params;

Expand All @@ -93,12 +95,12 @@ impl ConsensusServices {
storage.block_window_cache_for_difficulty.clone(),
storage.block_window_cache_for_past_median_time.clone(),
params.max_difficulty_target,
params.target_time_per_block,
params.sampling_activation_daa_score,
params.legacy_difficulty_window_size,
params.sampled_difficulty_window_size,
params.daa_window_params.target_time_per_block,
params.daa_window_params.sampling_activation_daa_score,
params.daa_window_params.legacy_difficulty_window_size,
params.daa_window_params.sampled_difficulty_window_size,
params.min_difficulty_window_len,
params.difficulty_sample_rate,
params.daa_window_params.difficulty_sample_rate,
params.legacy_past_median_time_window_size(),
params.sampled_past_median_time_window_size(),
params.past_median_time_sample_rate,
Expand Down Expand Up @@ -136,7 +138,7 @@ impl ConsensusServices {
params.max_coinbase_payload_len,
params.deflationary_phase_daa_score,
params.pre_deflationary_phase_base_subsidy,
params.target_time_per_block,
params.daa_window_params.target_time_per_block,
);

let mass_calculator =
Expand Down Expand Up @@ -185,6 +187,8 @@ impl ConsensusServices {
params.pruning_proof_m,
params.anticone_finalization_depth(),
params.ghostdag_k,
notification_root,
params.daa_window_params,
));

let sync_manager = SyncManager::new(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl HeaderProcessor {
counters,
// TODO (HF): make sure to also pass `new_timestamp_deviation_tolerance` and use according to HF activation score
timestamp_deviation_tolerance: params.timestamp_deviation_tolerance(0),
target_time_per_block: params.target_time_per_block,
target_time_per_block: params.daa_window_params.target_time_per_block,
max_block_parents: params.max_block_parents,
mergeset_size_limit: params.mergeset_size_limit,
skip_proof_of_work: params.skip_proof_of_work,
Expand Down
25 changes: 24 additions & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_notify::notifier::Notify;

use crate::model::stores::headers::CompactHeaderData;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use itertools::Itertools;
use kaspa_consensus_core::config::params::DAAWindowParams;
use kaspa_utils::binary_heap::BinaryHeapExtensions;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use rand::seq::SliceRandom;
Expand All @@ -86,6 +88,7 @@ use rayon::{
ThreadPool,
};
use rocksdb::WriteBatch;
use std::sync::atomic::AtomicBool;
use std::{
cmp::min,
collections::{BinaryHeap, HashMap, VecDeque},
Expand All @@ -112,6 +115,7 @@ pub struct VirtualStateProcessor {
pub(super) max_block_parents: u8,
pub(super) mergeset_size_limit: u64,
pub(super) pruning_depth: u64,
pub(super) daa_window_params: DAAWindowParams,

// Stores
pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
Expand Down Expand Up @@ -152,6 +156,8 @@ pub struct VirtualStateProcessor {

// Counters
counters: Arc<ProcessingCounters>,

previous_synced: AtomicBool,
}

impl VirtualStateProcessor {
Expand Down Expand Up @@ -179,6 +185,7 @@ impl VirtualStateProcessor {
max_block_parents: params.max_block_parents,
mergeset_size_limit: params.mergeset_size_limit,
pruning_depth: params.pruning_depth,
daa_window_params: params.daa_window_params,

db,
statuses_store: storage.statuses_store.clone(),
Expand Down Expand Up @@ -211,6 +218,7 @@ impl VirtualStateProcessor {
pruning_lock,
notification_root,
counters,
previous_synced: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -288,7 +296,6 @@ impl VirtualStateProcessor {
&chain_path,
)
.expect("all possible rule errors are unexpected here");

// Update the pruning processor about the virtual state change
let sink_ghostdag_data = self.ghostdag_primary_store.get_compact_data(new_sink).unwrap();
// Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
Expand Down Expand Up @@ -318,6 +325,22 @@ impl VirtualStateProcessor {
Arc::new(added_chain_blocks_acceptance_data),
)))
.expect("expecting an open unbounded channel");

{
let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(new_sink).unwrap();
match (self.daa_window_params.is_nearly_synced(timestamp, daa_score), self.previous_synced.load(Ordering::Relaxed)) {
(true, false) => {
self.notification_root
.notify(Notification::SyncStateChanged(
kaspa_consensus_notify::notification::SyncStateChangedNotification::new_synced(),
))
.expect("expecting an open unbounded channel");
self.previous_synced.store(true, Ordering::Relaxed);
}
(false, true) => self.previous_synced.store(false, Ordering::Relaxed),
_ => {}
}
}
}

pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/pipeline/virtual_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl TestContext {

pub fn build_block_template_row(&mut self, nonces: impl Iterator<Item = usize>) -> &mut Self {
for nonce in nonces {
self.simulated_time += self.consensus.params().target_time_per_block;
self.simulated_time += self.consensus.params().daa_window_params.target_time_per_block;
self.current_templates.push_back(self.build_block_template(nonce as u64, self.simulated_time));
}
self
Expand All @@ -93,7 +93,7 @@ impl TestContext {
pub async fn build_and_insert_disqualified_chain(&mut self, mut parents: Vec<Hash>, len: usize) -> Hash {
// The chain will be disqualified since build_block_with_parents builds utxo-invalid blocks
for _ in 0..len {
self.simulated_time += self.consensus.params().target_time_per_block;
self.simulated_time += self.consensus.params().daa_window_params.target_time_per_block;
let b = self.build_block_with_parents(parents, 0, self.simulated_time);
parents = vec![b.header.hash];
self.validate_and_insert_block(b.to_immutable()).await;
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn double_search_disqualified_test() {
.edit_consensus_params(|p| {
p.max_block_parents = 4;
p.mergeset_size_limit = 10;
p.min_difficulty_window_len = p.legacy_difficulty_window_size;
p.min_difficulty_window_len = p.daa_window_params.legacy_difficulty_window_size;
})
.build();
let mut ctx = TestContext::new(TestConsensus::new(&config));
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/coinbase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ mod tests {
params.max_coinbase_payload_len,
params.deflationary_phase_daa_score,
params.pre_deflationary_phase_base_subsidy,
params.target_time_per_block,
params.daa_window_params.target_time_per_block,
)
}

Expand Down
27 changes: 27 additions & 0 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use kaspa_math::int::SignedInteger;
use parking_lot::RwLock;
use rocksdb::WriteBatch;

use kaspa_consensus_core::config::params::DAAWindowParams;
use kaspa_consensus_core::{
blockhash::{BlockHashExtensions, BlockHashes, ORIGIN},
errors::{
Expand All @@ -22,12 +23,18 @@ use kaspa_consensus_core::{
trusted::{TrustedBlock, TrustedGhostdagData, TrustedHeader},
BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
};
use kaspa_consensus_notify::{
notification::{Notification, SyncStateChangedNotification},
root::ConsensusNotificationRoot,
};
use kaspa_core::{debug, info, trace};
use kaspa_database::prelude::{ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions};
use kaspa_hashes::Hash;
use kaspa_notify::notifier::Notify;
use kaspa_pow::calc_block_level;
use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};

use crate::model::stores::headers::CompactHeaderData;
use crate::{
consensus::{
services::{DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbWindowManager},
Expand Down Expand Up @@ -102,6 +109,8 @@ pub struct PruningProofManager {
pruning_proof_m: u64,
anticone_finalization_depth: u64,
ghostdag_k: KType,
notification_root: Arc<ConsensusNotificationRoot>,
daa_window_params: DAAWindowParams,
}

impl PruningProofManager {
Expand All @@ -119,6 +128,8 @@ impl PruningProofManager {
pruning_proof_m: u64,
anticone_finalization_depth: u64,
ghostdag_k: KType,
notification_root: Arc<ConsensusNotificationRoot>,
daa_window_params: DAAWindowParams,
) -> Self {
Self {
db,
Expand Down Expand Up @@ -149,6 +160,8 @@ impl PruningProofManager {
pruning_proof_m,
anticone_finalization_depth,
ghostdag_k,
notification_root,
daa_window_params,
}
}

Expand Down Expand Up @@ -406,6 +419,20 @@ impl PruningProofManager {

let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
for level in (0..=self.max_block_level).rev() {
{
let is_synced = self.virtual_stores.read().state.get().is_ok_and(|state| {
let sink = state.ghostdag_data.selected_parent;
let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(sink).unwrap();
self.daa_window_params.is_nearly_synced(timestamp, daa_score)
});

if !is_synced {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level)))
.expect("expecting an open unbounded channel");
}
}

info!("Validating level {level} from the pruning point proof");
let level_idx = level as usize;
let mut selected_tip = None;
Expand Down
3 changes: 2 additions & 1 deletion indexes/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ mod tests {
let tc = TestConsensus::new(&config);
tc.init();
let consensus_manager = Arc::new(ConsensusManager::from_consensus(tc.consensus_clone()));
let utxoindex = Some(UtxoIndexProxy::new(UtxoIndex::new(consensus_manager, utxoindex_db).unwrap()));
let utxoindex =
Some(UtxoIndexProxy::new(UtxoIndex::new(consensus_manager, utxoindex_db, tc.notification_root()).unwrap()));
let processor = Arc::new(Processor::new(utxoindex, consensus_receiver));
let (processor_sender, processor_receiver) = unbounded();
let notifier = Arc::new(NotifyMock::new(processor_sender));
Expand Down
7 changes: 5 additions & 2 deletions indexes/utxoindex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ license.workspace = true

[dependencies]
kaspa-hashes.workspace = true
thiserror.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensusmanager.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-notify.workspace = true
kaspa-core.workspace = true
kaspa-utils.workspace = true
kaspa-index-core.workspace = true
kaspa-database.workspace = true
kaspa-consensusmanager.workspace = true

thiserror.workspace = true
log.workspace = true
rocksdb.workspace = true
serde.workspace = true
Expand Down
Loading