From 81aab3ecf0c341792af4439b67dd7d8d2234341f Mon Sep 17 00:00:00 2001 From: Luiz Carvalho Date: Wed, 11 Dec 2024 10:36:05 -0300 Subject: [PATCH] feat(node): add aura authoring --- node/src/chain_spec.rs | 33 +++-- node/src/cli.rs | 10 +- node/src/command.rs | 10 +- node/src/service.rs | 279 ++++++++++++++++++++++++++++++++++++----- rustfmt.toml | 1 + 5 files changed, 286 insertions(+), 47 deletions(-) create mode 100644 rustfmt.toml diff --git a/node/src/chain_spec.rs b/node/src/chain_spec.rs index 3cf8e3d..b4655de 100644 --- a/node/src/chain_spec.rs +++ b/node/src/chain_spec.rs @@ -17,11 +17,10 @@ use polkadot_sdk::{ sc_service::{ChainType, Properties}, - sp_keyring::AccountKeyring, *, }; use serde_json::{json, Value}; -use torus_runtime::{BalancesConfig, SudoConfig, WASM_BINARY}; +use torus_runtime::WASM_BINARY; /// This is a specialization of the general Substrate ChainSpec type. pub type ChainSpec = sc_service::GenericChainSpec; @@ -38,8 +37,8 @@ pub fn development_config() -> Result { WASM_BINARY.expect("Development wasm not available"), Default::default(), ) - .with_name("Development") - .with_id("dev") + .with_name("Torus") + .with_id("torus") .with_chain_type(ChainType::Development) .with_genesis_config_patch(testnet_genesis()) .with_properties(props()) @@ -48,14 +47,32 @@ pub fn development_config() -> Result { /// Configure initial storage state for FRAME pallets. fn testnet_genesis() -> Value { - use polkadot_sdk::polkadot_sdk_frame::traits::Get; - use torus_runtime::interface::{Balance, MinimumBalance}; + use polkadot_sdk::{ + polkadot_sdk_frame::traits::Get, + sp_keyring::{Ed25519Keyring, Sr25519Keyring}, + }; + + use torus_runtime::{ + interface::{Balance, MinimumBalance}, + BalancesConfig, SudoConfig, + }; + let endowment = >::get().max(1) * 1000; - let balances = AccountKeyring::iter() + let balances = Sr25519Keyring::iter() .map(|a| (a.to_account_id(), endowment)) .collect::>(); + + let aura = [Sr25519Keyring::Alice, Sr25519Keyring::Bob]; + let grandpa = [Ed25519Keyring::Alice, Ed25519Keyring::Bob]; + json!({ "balances": BalancesConfig { balances }, - "sudo": SudoConfig { key: Some(AccountKeyring::Alice.to_account_id()) }, + "sudo": SudoConfig { key: Some(Sr25519Keyring::Alice.to_account_id()) }, + "aura": { + "authorities": aura.iter().map(|x| (dbg!(x.public().to_string()))).collect::>(), + }, + "grandpa": { + "authorities": grandpa.iter().map(|x| (x.public().to_string(), 1)).collect::>(), + }, }) } diff --git a/node/src/cli.rs b/node/src/cli.rs index c7a9eae..fbd3940 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -17,8 +17,10 @@ use polkadot_sdk::{sc_cli::RunCmd, *}; -#[derive(Debug, Clone)] +#[derive(Clone, Copy, Debug, Default)] pub enum Consensus { + #[default] + Aura, ManualSeal(u64), InstantSeal, } @@ -27,7 +29,9 @@ impl std::str::FromStr for Consensus { type Err = String; fn from_str(s: &str) -> Result { - Ok(if s == "instant-seal" { + Ok(if s == "aura" { + Consensus::Aura + } else if s == "instant-seal" { Consensus::InstantSeal } else if let Some(block_time) = s.strip_prefix("manual-seal-") { Consensus::ManualSeal(block_time.parse().map_err(|_| "invalid block time")?) @@ -42,7 +46,7 @@ pub struct Cli { #[command(subcommand)] pub subcommand: Option, - #[clap(long, default_value = "manual-seal-3000")] + #[clap(long, default_value = "aura")] pub consensus: Consensus, #[clap(flatten)] diff --git a/node/src/command.rs b/node/src/command.rs index cb2e6af..c1df5c1 100644 --- a/node/src/command.rs +++ b/node/src/command.rs @@ -75,7 +75,7 @@ pub fn run() -> sc_cli::Result<()> { task_manager, import_queue, .. - } = service::new_partial(&config)?; + } = service::new_partial(&config, Default::default())?; Ok((cmd.run(client, import_queue), task_manager)) }) } @@ -86,7 +86,7 @@ pub fn run() -> sc_cli::Result<()> { client, task_manager, .. - } = service::new_partial(&config)?; + } = service::new_partial(&config, Default::default())?; Ok((cmd.run(client, config.database), task_manager)) }) } @@ -97,7 +97,7 @@ pub fn run() -> sc_cli::Result<()> { client, task_manager, .. - } = service::new_partial(&config)?; + } = service::new_partial(&config, Default::default())?; Ok((cmd.run(client, config.chain_spec), task_manager)) }) } @@ -109,7 +109,7 @@ pub fn run() -> sc_cli::Result<()> { task_manager, import_queue, .. - } = service::new_partial(&config)?; + } = service::new_partial(&config, Default::default())?; Ok((cmd.run(client, import_queue), task_manager)) }) } @@ -125,7 +125,7 @@ pub fn run() -> sc_cli::Result<()> { task_manager, backend, .. - } = service::new_partial(&config)?; + } = service::new_partial(&config, Default::default())?; Ok((cmd.run(client, backend, None), task_manager)) }) } diff --git a/node/src/service.rs b/node/src/service.rs index dd68602..121c313 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -17,27 +17,37 @@ use futures::FutureExt; use polkadot_sdk::{ - sc_client_api::backend::Backend, + sc_client_api::{Backend, BlockBackend}, sc_executor::WasmExecutor, + sc_network_sync::strategy::warp::WarpSyncProvider, + sc_service::WarpSyncConfig, sc_service::{error::Error as ServiceError, Configuration, TaskManager}, + sc_telemetry::TelemetryHandle, sc_telemetry::{Telemetry, TelemetryWorker}, sc_transaction_pool_api::OffchainTransactionPoolFactory, + sp_consensus_aura::sr25519::AuthorityPair, sp_runtime::traits::Block as BlockT, *, }; -use std::sync::Arc; +use std::{pin::Pin, sync::Arc, time::Duration}; use torus_runtime::{apis::RuntimeApi, interface::OpaqueBlock as Block}; use crate::cli::Consensus; type HostFunctions = sp_io::SubstrateHostFunctions; -pub(crate) type FullClient = - sc_service::TFullClient>; +pub type FullClient = sc_service::TFullClient>; type FullBackend = sc_service::TFullBackend; type FullSelectChain = sc_consensus::LongestChain; +type BasicImportQueue = sc_consensus::DefaultImportQueue; +type BoxBlockImport = sc_consensus::BoxBlockImport; + +type GrandpaBlockImport = + sc_consensus_grandpa::GrandpaBlockImport; +type GrandpaLinkHalf = sc_consensus_grandpa::LinkHalf; + /// Assembly of PartialComponents (enough to run chain ops subcommands) pub type Service = sc_service::PartialComponents< FullClient, @@ -45,29 +55,19 @@ pub type Service = sc_service::PartialComponents< FullSelectChain, sc_consensus::DefaultImportQueue, sc_transaction_pool::FullPool, - Option, + (Option, BoxBlockImport, GrandpaLinkHalf), >; -pub fn new_partial(config: &Configuration) -> Result { - let telemetry = config - .telemetry_endpoints - .clone() - .filter(|x| !x.is_empty()) - .map(|endpoints| -> Result<_, sc_telemetry::Error> { - let worker = TelemetryWorker::new(16)?; - let telemetry = worker.handle().new_telemetry(endpoints); - Ok((worker, telemetry)) - }) - .transpose()?; - - let executor = sc_service::new_wasm_executor(&config.executor); +pub fn new_partial(config: &Configuration, consensus: Consensus) -> Result { + let telemetry = set_telemetry(config)?; let (client, backend, keystore_container, task_manager) = sc_service::new_full_parts::( config, telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()), - executor, + sc_service::new_wasm_executor(&config.executor), )?; + let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { @@ -87,11 +87,26 @@ pub fn new_partial(config: &Configuration) -> Result { client.clone(), ); - let import_queue = sc_consensus_manual_seal::import_queue( - Box::new(client.clone()), - &task_manager.spawn_essential_handle(), - config.prometheus_registry(), - ); + let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import( + client.clone(), + 512, + &client, + select_chain.clone(), + telemetry.as_ref().map(|x| x.handle()), + )?; + + let import_queue_builder = match consensus { + Consensus::Aura => build_aura_grandpa_import_queue, + Consensus::ManualSeal(_) | Consensus::InstantSeal => build_manual_seal_import_queue, + }; + + let (import_queue, block_import) = import_queue_builder( + client.clone(), + config, + &task_manager, + telemetry.as_ref().map(|x| x.handle()), + grandpa_block_import, + )?; Ok(sc_service::PartialComponents { client, @@ -101,10 +116,103 @@ pub fn new_partial(config: &Configuration) -> Result { keystore_container, select_chain, transaction_pool, - other: (telemetry), + other: (telemetry, block_import, grandpa_link), }) } +fn set_telemetry( + config: &Configuration, +) -> Result, ServiceError> { + config + .telemetry_endpoints + .clone() + .filter(|x| !x.is_empty()) + .map(|endpoints| -> Result<_, sc_telemetry::Error> { + let worker = TelemetryWorker::new(16)?; + let telemetry = worker.handle().new_telemetry(endpoints); + Ok((worker, telemetry)) + }) + .transpose() + .map_err(Into::into) +} + +fn aura_data_provider( + slot_duration: sp_consensus_aura::SlotDuration, +) -> impl Fn( + sp_core::H256, + (), +) -> Pin< + Box< + dyn std::future::Future< + Output = Result< + ( + sc_consensus_aura::InherentDataProvider, + sp_timestamp::InherentDataProvider, + ), + Box, + >, + > + Send + + Sync, + >, +> { + move |_, ()| { + Box::pin(async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + Ok((slot, timestamp)) + }) + } +} + +/// Build the import queue for the template runtime (aura + grandpa). +fn build_aura_grandpa_import_queue( + client: Arc, + config: &Configuration, + task_manager: &TaskManager, + telemetry: Option, + grandpa_block_import: GrandpaBlockImport, +) -> Result<(BasicImportQueue, BoxBlockImport), ServiceError> { + let slot_duration = sc_consensus_aura::slot_duration(&*client)?; + + let import_queue = sc_consensus_aura::import_queue::( + sc_consensus_aura::ImportQueueParams { + block_import: grandpa_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import.clone())), + client, + create_inherent_data_providers: aura_data_provider(slot_duration), + spawner: &task_manager.spawn_essential_handle(), + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + telemetry, + compatibility_mode: sc_consensus_aura::CompatibilityMode::None, + }, + ) + .map_err::(Into::into)?; + + Ok((import_queue, Box::new(grandpa_block_import))) +} + +/// Build the import queue for the template runtime (manual seal). +fn build_manual_seal_import_queue( + client: Arc, + config: &Configuration, + task_manager: &TaskManager, + _telemetry: Option, + _grandpa_block_import: GrandpaBlockImport, +) -> Result<(BasicImportQueue, BoxBlockImport), ServiceError> { + let import_queue = sc_consensus_manual_seal::import_queue( + Box::new(client.clone()), + &task_manager.spawn_essential_handle(), + config.prometheus_registry(), + ); + + Ok((import_queue, Box::new(client))) +} + /// Builds a new service for a full client. pub fn new_full::Hash>>( config: Configuration, @@ -118,10 +226,10 @@ pub fn new_full::Ha keystore_container, select_chain, transaction_pool, - other: mut telemetry, - } = new_partial(&config)?; + other: (mut telemetry, block_import, grandpa_link), + } = new_partial(&config, consensus)?; - let net_config = sc_network::config::FullNetworkConfiguration::< + let mut net_config = sc_network::config::FullNetworkConfiguration::< Block, ::Hash, Network, @@ -132,10 +240,36 @@ pub fn new_full::Ha .as_ref() .map(|cfg| cfg.registry.clone()), ); + let peer_store_handle = net_config.peer_store_handle(); let metrics = Network::register_notification_metrics( config.prometheus_config.as_ref().map(|cfg| &cfg.registry), ); + let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name( + &client.block_hash(0)?.expect("Genesis block exists; qed"), + &config.chain_spec, + ); + + let (grandpa_protocol_config, grandpa_notification_service) = + sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>( + grandpa_protocol_name.clone(), + metrics.clone(), + peer_store_handle, + ); + + let warp_sync_config = if matches!(consensus, Consensus::Aura) { + net_config.add_notification_protocol(grandpa_protocol_config); + let warp_sync: Arc> = + Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new( + backend.clone(), + grandpa_link.shared_authority_set().clone(), + Vec::default(), + )); + Some(WarpSyncConfig::WithProvider(warp_sync)) + } else { + None + }; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -145,7 +279,7 @@ pub fn new_full::Ha import_queue, net_config, block_announce_validator_builder: None, - warp_sync_config: None, + warp_sync_config, block_relay: None, metrics, })?; @@ -184,10 +318,14 @@ pub fn new_full::Ha }) }; + let role = config.role; + let force_authoring = config.force_authoring; + let name = config.network.node_name.clone(); + let prometheus_registry = config.prometheus_registry().cloned(); let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { - network, + network: network.clone(), client: client.clone(), keystore: keystore_container.keystore(), task_manager: &mut task_manager, @@ -196,7 +334,7 @@ pub fn new_full::Ha backend, system_rpc_tx, tx_handler_controller, - sync_service, + sync_service: sync_service.clone(), config, telemetry: telemetry.as_mut(), })?; @@ -210,6 +348,85 @@ pub fn new_full::Ha ); match consensus { + Consensus::Aura => { + let slot_duration = sc_consensus_aura::slot_duration(&*client)?; + + let aura = sc_consensus_aura::start_aura::( + sc_consensus_aura::StartAuraParams { + slot_duration, + client, + select_chain, + block_import, + proposer_factory: proposer, + sync_oracle: sync_service.clone(), + justification_sync_link: sync_service.clone(), + create_inherent_data_providers: aura_data_provider(slot_duration), + force_authoring, + backoff_authoring_blocks: Option::<()>::None, + keystore: keystore_container.keystore(), + block_proposal_slot_portion: sc_consensus_aura::SlotProportion::new( + 2f32 / 3f32, + ), + max_block_proposal_slot_portion: None, + telemetry: telemetry.as_ref().map(|x| x.handle()), + compatibility_mode: sc_consensus_aura::CompatibilityMode::None, + }, + )?; + + task_manager.spawn_essential_handle().spawn_blocking( + "aura", + Some("block-authoring"), + aura, + ); + + // if the node isn't actively participating in consensus then it doesn't + // need a keystore, regardless of which protocol we use below. + let keystore = if role.is_authority() { + Some(keystore_container.keystore()) + } else { + None + }; + + let grandpa_config = sc_consensus_grandpa::Config { + // FIXME #1578 make this available through chainspec + gossip_duration: Duration::from_millis(333), + justification_generation_period: 512, + name: Some(name), + observer_enabled: false, + keystore, + local_role: role, + telemetry: telemetry.as_ref().map(|x| x.handle()), + protocol_name: grandpa_protocol_name, + }; + + // start the full GRANDPA voter + // NOTE: non-authorities could run the GRANDPA observer protocol, but at + // this point the full voter should provide better guarantees of block + // and vote data availability than the observer. The observer has not + // been tested extensively yet and having most nodes in a network run it + // could lead to finality stalls. + let grandpa_voter = + sc_consensus_grandpa::run_grandpa_voter(sc_consensus_grandpa::GrandpaParams { + config: grandpa_config, + link: grandpa_link, + network, + sync: sync_service, + notification_service: grandpa_notification_service, + voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(), + prometheus_registry, + shared_voter_state: sc_consensus_grandpa::SharedVoterState::empty(), + telemetry: telemetry.as_ref().map(|x| x.handle()), + offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool), + })?; + + // the GRANDPA voter task is considered infallible, i.e. + // if it fails we take down the service with it. + task_manager.spawn_essential_handle().spawn_blocking( + "grandpa-voter", + None, + grandpa_voter, + ); + } Consensus::InstantSeal => { let params = sc_consensus_manual_seal::InstantSealParams { block_import: client.clone(), diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..758d417 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 100