From 91d2b5b01d7643705e1991dd7465aae6d1e4aced Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Fri, 23 Aug 2024 18:02:35 +0800 Subject: [PATCH 01/28] Make xudt cross-chain type one-to-one. --- aggregator/aggregator-main/src/lib.rs | 14 +++-- .../aggregator-main/src/transaction/leap.rs | 6 +- aggregator/util/rgbpp-tx/src/custodian.rs | 8 ++- aggregator/util/rgbpp-tx/src/lib.rs | 56 ++++++++++++++++++- resource/ckb-aggregator.toml | 22 ++++++-- util/app-config/src/configs/aggregator.rs | 12 ++-- 6 files changed, 97 insertions(+), 21 deletions(-) diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index 6aa5f934ef..759d087103 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -30,8 +30,8 @@ pub struct Aggregator { rgbpp_rpc_client: RpcClient, branch_rpc_client: RpcClient, branch_scripts: HashMap, - rgbpp_assets: HashMap, - rgbpp_locks: HashMap, + asset_types: HashMap, + asset_locks: HashMap, rgbpp_tx_builder: RgbppTxBuilder, } @@ -61,6 +61,8 @@ impl Aggregator { config.rgbpp_custodian_lock_key_path.clone(), config.rgbpp_queue_lock_key_path.clone(), config.rgbpp_ckb_provider_key_path.clone(), + config.asset_types.clone(), + config.asset_locks.clone(), ); Aggregator { config: config.clone(), @@ -68,8 +70,8 @@ impl Aggregator { rgbpp_rpc_client, branch_rpc_client, branch_scripts: get_script_map(config.branch_scripts), - rgbpp_assets: get_asset_map(config.rgbpp_assets), - rgbpp_locks: get_rgbpp_locks(config.rgbpp_asset_locks), + asset_types: get_asset_types(config.asset_types), + asset_locks: get_asset_locks(config.asset_locks), rgbpp_tx_builder, } } @@ -175,7 +177,7 @@ fn get_script_map(scripts: Vec) -> HashMap { .collect() } -fn get_asset_map(asset_configs: Vec) -> HashMap { +fn get_asset_types(asset_configs: Vec) -> HashMap { let mut is_capacity_found = false; asset_configs @@ -202,7 +204,7 @@ fn get_asset_map(asset_configs: Vec) -> HashMap { .collect() } -fn get_rgbpp_locks(lock_configs: Vec) -> HashMap { +fn get_asset_locks(lock_configs: Vec) -> HashMap { lock_configs .iter() .map(|lock_config| { diff --git a/aggregator/aggregator-main/src/transaction/leap.rs b/aggregator/aggregator-main/src/transaction/leap.rs index ca2f5100f4..36c1b62323 100644 --- a/aggregator/aggregator-main/src/transaction/leap.rs +++ b/aggregator/aggregator-main/src/transaction/leap.rs @@ -94,14 +94,14 @@ impl Aggregator { } } }; - let is_capacity = if let Some(asset) = self.rgbpp_assets.get(&asset_id) { + let is_capacity = if let Some(asset) = self.asset_types.get(&asset_id) { asset.is_capacity } else { false }; if is_capacity { let lock = self - .rgbpp_locks + .asset_locks .get(&owner_lock_hash) .ok_or(Error::LockNotFound(owner_lock_hash.to_string()))?; let output = CellOutput::new_builder() @@ -112,7 +112,7 @@ impl Aggregator { outputs_data.push(PackedBytes::default()); } else { let lock = self - .rgbpp_locks + .asset_locks .get(&owner_lock_hash) .ok_or(Error::LockNotFound(owner_lock_hash.to_string()))?; let xudt = &self diff --git a/aggregator/util/rgbpp-tx/src/custodian.rs b/aggregator/util/rgbpp-tx/src/custodian.rs index 250c1bc93a..843d003e3b 100644 --- a/aggregator/util/rgbpp-tx/src/custodian.rs +++ b/aggregator/util/rgbpp-tx/src/custodian.rs @@ -385,7 +385,7 @@ impl RgbppTxBuilder { match message_union { MessageUnion::Transfer(transfer) => { let transfer_amount: u128 = transfer.amount().unpack(); - let check_message = cell + let check_amount = cell .clone() .output_data .and_then(|data| decode_udt_amount(data.as_bytes())) @@ -396,7 +396,11 @@ impl RgbppTxBuilder { ); transfer_amount <= amount }); - (check_message, transfer) + let lock_hash: H256 = transfer.owner_lock_hash().unpack(); + let type_hash: H256 = transfer.asset_type().unpack(); + let check_lock = self.asset_locks.contains_key(&lock_hash); + let check_type = self.asset_types.contains_key(&type_hash); + (check_amount && check_lock && check_type, transfer) } } }; diff --git a/aggregator/util/rgbpp-tx/src/lib.rs b/aggregator/util/rgbpp-tx/src/lib.rs index fa8a274967..366445cf2d 100644 --- a/aggregator/util/rgbpp-tx/src/lib.rs +++ b/aggregator/util/rgbpp-tx/src/lib.rs @@ -10,7 +10,7 @@ use aggregator_common::{ error::Error, utils::{privkey::get_sighash_lock_args_from_privkey, QUEUE_TYPE}, }; -use ckb_app_config::ScriptConfig; +use ckb_app_config::{AssetConfig, LockConfig, ScriptConfig}; use ckb_logger::{info, warn}; use ckb_sdk::{ rpc::ckb_indexer::{Cell, Order}, @@ -28,6 +28,7 @@ use ckb_types::{ use std::collections::{HashMap, HashSet}; use std::path::PathBuf; +use std::str::FromStr; #[derive(Clone, Debug, PartialEq, Eq)] pub struct ScriptInfo { @@ -43,6 +44,13 @@ const CONFIRMATION_THRESHOLD: u64 = 24; /// CKB fee rate limit const CKB_FEE_RATE_LIMIT: u64 = 5000; +#[derive(Clone, Debug, PartialEq, Eq)] +struct AssetInfo { + pub script: Script, + pub is_capacity: bool, + pub script_name: String, +} + #[derive(Clone)] pub struct RgbppTxBuilder { chain_id: String, @@ -52,6 +60,8 @@ pub struct RgbppTxBuilder { rgbpp_custodian_lock_key_path: PathBuf, rgbpp_queue_lock_key_path: PathBuf, rgbpp_ckb_provider_key_path: PathBuf, + asset_types: HashMap, + asset_locks: HashMap, } impl RgbppTxBuilder { @@ -62,6 +72,8 @@ impl RgbppTxBuilder { rgbpp_custodian_lock_key_path: PathBuf, rgbpp_queue_lock_key_path: PathBuf, rgbpp_ckb_provider_key_path: PathBuf, + asset_types: Vec, + asset_locks: Vec, ) -> Self { let rgbpp_rpc_client = RpcClient::new(&rgbpp_uri); Self { @@ -72,6 +84,8 @@ impl RgbppTxBuilder { rgbpp_custodian_lock_key_path, rgbpp_queue_lock_key_path, rgbpp_ckb_provider_key_path, + asset_types: get_asset_types(asset_types), + asset_locks: get_asset_locks(asset_locks), } } @@ -252,3 +266,43 @@ fn get_script_map(scripts: Vec) -> HashMap { }) .collect() } + +fn get_asset_types(asset_configs: Vec) -> HashMap { + let mut is_capacity_found = false; + + asset_configs + .into_iter() + .map(|asset_config| { + let script = serde_json::from_str::(&asset_config.script) + .expect("config string to script") + .into(); + let script_name = asset_config.asset_name.clone(); + let is_capacity = asset_config.is_capacity && !is_capacity_found; + if is_capacity { + is_capacity_found = true; + } + let asset_id = asset_config.asset_id.clone(); + ( + H256::from_str(&asset_id).expect("asset id to h256"), + AssetInfo { + script, + is_capacity, + script_name, + }, + ) + }) + .collect() +} + +fn get_asset_locks(lock_configs: Vec) -> HashMap { + lock_configs + .iter() + .map(|lock_config| { + let lock_hash = H256::from_str(&lock_config.lock_hash).expect("lock hash to h256"); + let script = serde_json::from_str::(&lock_config.script) + .expect("config string to script") + .into(); + (lock_hash, script) + }) + .collect() +} diff --git a/resource/ckb-aggregator.toml b/resource/ckb-aggregator.toml index 442c2fe435..39264f041a 100644 --- a/resource/ckb-aggregator.toml +++ b/resource/ckb-aggregator.toml @@ -185,8 +185,8 @@ cell_dep = ''' } ''' -[[aggregator.rgbpp_assets]] -asset_name = "f091" +[[aggregator.asset_types]] +asset_name = "RGTT" # Only the first is_capacity = true is valid; others should be treated as false is_capacity = true asset_id = "29b0b1a449b0e7fb08881e1d810a6abbedb119e9c4ffc76eebbc757fb214f091" @@ -198,7 +198,19 @@ script = ''' } ''' -[[aggregator.rgbpp_asset_locks]] +[[aggregator.asset_types]] +asset_name = "tUTXO" +is_capacity = false +asset_id = "37b6748d268d4aa62445d546bac1f90ccbc02cbbcecc7831aca3b77d70304e0f" +script = ''' +{ + "args": "0x92b419a8d8e03c683a47b960f707f2b866f6114b70327b6628762719b243c5ca", + "code_hash": "0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb", + "hash_type": "type" +} +''' + +[[aggregator.asset_locks]] lock_hash = "562e4e8a2f64a3e9c24beb4b7dd002d0ad3b842d0cc77924328e36ad114e3ebe" script = ''' { @@ -208,7 +220,7 @@ script = ''' } ''' -[[aggregator.rgbpp_asset_locks]] +[[aggregator.asset_locks]] lock_hash = "3494ad80b80de5ce7bcbbbdac53d888666c576ea8a49b83ca108fc18641eb278" script = ''' { @@ -218,7 +230,7 @@ script = ''' } ''' -[[aggregator.rgbpp_asset_locks]] +[[aggregator.asset_locks]] lock_hash = "0a97a585361f3deabb69f28b2bc08d05f3a224cd1ce805996526c8f041a733fb" script = ''' { diff --git a/util/app-config/src/configs/aggregator.rs b/util/app-config/src/configs/aggregator.rs index 0e1eb64c6e..4da8c5bc89 100644 --- a/util/app-config/src/configs/aggregator.rs +++ b/util/app-config/src/configs/aggregator.rs @@ -30,6 +30,9 @@ pub struct AggregatorConfig { /// Branch Chain token manager lock key path #[serde(default)] pub branch_chain_token_manager_lock_key_path: PathBuf, + /// Branch Chain token manager with outbox key path + #[serde(default)] + pub branch_chain_token_manager_outbox_lock_key_path: PathBuf, /// RGB++ scripts #[serde(default)] pub rgbpp_scripts: Vec, @@ -38,10 +41,10 @@ pub struct AggregatorConfig { pub branch_scripts: Vec, /// Asset configs #[serde(default)] - pub rgbpp_assets: Vec, + pub asset_types: Vec, /// Lock configs #[serde(default)] - pub rgbpp_asset_locks: Vec, + pub asset_locks: Vec, } /// Script config options. @@ -88,10 +91,11 @@ impl Default for AggregatorConfig { branch_uri: "http://localhost:8114".to_string(), branch_chain_capacity_provider_key_path: PathBuf::new(), branch_chain_token_manager_lock_key_path: PathBuf::new(), + branch_chain_token_manager_outbox_lock_key_path: PathBuf::new(), rgbpp_scripts: Vec::new(), branch_scripts: Vec::new(), - rgbpp_assets: Vec::new(), - rgbpp_asset_locks: Vec::new(), + asset_types: Vec::new(), + asset_locks: Vec::new(), } } } From 492a63d3df5a6fdc4047b6112da07338d3609537 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Fri, 23 Aug 2024 21:03:47 +0800 Subject: [PATCH 02/28] rename tx to rgbpp_to_branch --- aggregator/aggregator-main/src/lib.rs | 4 ++-- .../src/{transaction => rgbpp_to_branch}/leap.rs | 2 +- .../src/{transaction => rgbpp_to_branch}/mod.rs | 0 ckb-bin/src/subcommand/aggregator.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename aggregator/aggregator-main/src/{transaction => rgbpp_to_branch}/leap.rs (99%) rename aggregator/aggregator-main/src/{transaction => rgbpp_to_branch}/mod.rs (100%) diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index 759d087103..bd7300c752 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -1,7 +1,7 @@ //! Branch Chain Aggregator +pub(crate) mod rgbpp_to_branch; pub(crate) mod schemas; -pub(crate) mod transaction; use crate::schemas::leap::Request; @@ -77,7 +77,7 @@ impl Aggregator { } /// Run the Aggregator - pub fn run(&self, stop_rx: Receiver<()>) { + pub fn collect_rgbpp_requests(&self, stop_rx: Receiver<()>) { let poll_interval = self.poll_interval; let poll_service: Aggregator = self.clone(); diff --git a/aggregator/aggregator-main/src/transaction/leap.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs similarity index 99% rename from aggregator/aggregator-main/src/transaction/leap.rs rename to aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs index 36c1b62323..f2ab363d43 100644 --- a/aggregator/aggregator-main/src/transaction/leap.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs @@ -1,5 +1,5 @@ +use crate::rgbpp_to_branch::SIGHASH_TYPE_HASH; use crate::schemas::leap::{MessageUnion, Request}; -use crate::transaction::SIGHASH_TYPE_HASH; use crate::{encode_udt_amount, Aggregator}; use aggregator_common::{ diff --git a/aggregator/aggregator-main/src/transaction/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs similarity index 100% rename from aggregator/aggregator-main/src/transaction/mod.rs rename to aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs diff --git a/ckb-bin/src/subcommand/aggregator.rs b/ckb-bin/src/subcommand/aggregator.rs index 9a5674d304..54356be1c2 100644 --- a/ckb-bin/src/subcommand/aggregator.rs +++ b/ckb-bin/src/subcommand/aggregator.rs @@ -18,7 +18,7 @@ pub fn aggregator(args: AggregatorArgs, chain_id: String) -> Result<(), ExitCode let aggregator_jh = thread::Builder::new() .name(THREAD_NAME.into()) .spawn(move || { - aggregator.run(stop_rx); + aggregator.collect_rgbpp_requests(stop_rx); }) .expect("Start aggregator failed!"); register_thread(THREAD_NAME, aggregator_jh); From d20090dadb6628576756c06e19ceb974bec4ce83 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Fri, 23 Aug 2024 21:30:10 +0800 Subject: [PATCH 03/28] refactoring. --- aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs | 1 + aggregator/aggregator-main/src/lib.rs | 4 +++- aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs | 2 +- aggregator/util/rgbpp-tx/src/branch_to_rgbpp/mod.rs | 1 + aggregator/util/rgbpp-tx/src/lib.rs | 5 +++-- aggregator/util/rgbpp-tx/src/{ => rgbpp_to_branch}/clear.rs | 0 .../util/rgbpp-tx/src/{ => rgbpp_to_branch}/custodian.rs | 0 aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs | 2 ++ 8 files changed, 11 insertions(+), 4 deletions(-) create mode 100644 aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs create mode 100644 aggregator/util/rgbpp-tx/src/branch_to_rgbpp/mod.rs rename aggregator/util/rgbpp-tx/src/{ => rgbpp_to_branch}/clear.rs (100%) rename aggregator/util/rgbpp-tx/src/{ => rgbpp_to_branch}/custodian.rs (100%) create mode 100644 aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -0,0 +1 @@ + diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index bd7300c752..820bf2cd21 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -1,5 +1,6 @@ //! Branch Chain Aggregator +pub(crate) mod branch_to_rgbpp; pub(crate) mod rgbpp_to_branch; pub(crate) mod schemas; @@ -29,6 +30,7 @@ pub struct Aggregator { poll_interval: Duration, rgbpp_rpc_client: RpcClient, branch_rpc_client: RpcClient, + branch_scripts: HashMap, asset_types: HashMap, asset_locks: HashMap, @@ -122,7 +124,7 @@ impl Aggregator { } }; match wait_for_tx_confirmation( - poll_service.rgbpp_rpc_client.clone(), + poll_service.branch_rpc_client.clone(), leap_tx, Duration::from_secs(600), ) { diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index 1471e23235..c5556e39ac 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -19,7 +19,7 @@ impl Aggregator { fn fee_rate(&self) -> Result { let value = { let dynamic = self - .rgbpp_rpc_client + .branch_rpc_client .get_fee_rate_statistics(None) .map_err(|e| Error::RpcError(format!("get dynamic fee rate error: {}", e)))? .ok_or_else(|| Error::RpcError("get dynamic fee rate error: None".to_string())) diff --git a/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/mod.rs b/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/mod.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/mod.rs @@ -0,0 +1 @@ + diff --git a/aggregator/util/rgbpp-tx/src/lib.rs b/aggregator/util/rgbpp-tx/src/lib.rs index 366445cf2d..4234d81ea8 100644 --- a/aggregator/util/rgbpp-tx/src/lib.rs +++ b/aggregator/util/rgbpp-tx/src/lib.rs @@ -1,7 +1,7 @@ #![allow(missing_docs)] -mod clear; -mod custodian; +mod branch_to_rgbpp; +mod rgbpp_to_branch; mod schemas; pub use crate::schemas::leap::{self, CrossChainQueue, Request, Requests}; @@ -65,6 +65,7 @@ pub struct RgbppTxBuilder { } impl RgbppTxBuilder { + #[allow(clippy::too_many_arguments)] pub fn new( chain_id: String, rgbpp_uri: String, diff --git a/aggregator/util/rgbpp-tx/src/clear.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs similarity index 100% rename from aggregator/util/rgbpp-tx/src/clear.rs rename to aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs diff --git a/aggregator/util/rgbpp-tx/src/custodian.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs similarity index 100% rename from aggregator/util/rgbpp-tx/src/custodian.rs rename to aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs new file mode 100644 index 0000000000..1e5b3146f4 --- /dev/null +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs @@ -0,0 +1,2 @@ +mod clear; +mod custodian; From 6b2f674011d691fc77cf71609af1d22e8e9094c8 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Fri, 23 Aug 2024 21:48:58 +0800 Subject: [PATCH 04/28] refactoring --- aggregator/aggregator-main/src/lib.rs | 94 +------------------ .../src/rgbpp_to_branch/mod.rs | 92 +++++++++++++++++- ckb-bin/src/subcommand/aggregator.rs | 2 +- 3 files changed, 95 insertions(+), 93 deletions(-) diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index 820bf2cd21..d2a8e2e2c6 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -4,26 +4,21 @@ pub(crate) mod branch_to_rgbpp; pub(crate) mod rgbpp_to_branch; pub(crate) mod schemas; -use crate::schemas::leap::Request; - use aggregator_common::{error::Error, utils::encode_udt_amount}; use aggregator_rgbpp_tx::RgbppTxBuilder; use ckb_app_config::{AggregatorConfig, AssetConfig, LockConfig, ScriptConfig}; -use ckb_channel::Receiver; -use ckb_logger::{error, info}; use ckb_sdk::rpc::CkbRpcClient as RpcClient; use ckb_types::{ - packed::{CellDep, OutPoint, Script}, - prelude::*, + packed::{CellDep, Script}, H256, }; use std::collections::HashMap; use std::str::FromStr; -use std::thread::{self, sleep}; +use std::thread::sleep; use std::time::Duration; -/// +/// Aggregator #[derive(Clone)] pub struct Aggregator { config: AggregatorConfig, @@ -77,87 +72,6 @@ impl Aggregator { rgbpp_tx_builder, } } - - /// Run the Aggregator - pub fn collect_rgbpp_requests(&self, stop_rx: Receiver<()>) { - let poll_interval = self.poll_interval; - let poll_service: Aggregator = self.clone(); - - loop { - match stop_rx.try_recv() { - Ok(_) => { - info!("Aggregator received exit signal, stopped"); - break; - } - Err(crossbeam_channel::TryRecvError::Empty) => { - // No exit signal, continue execution - } - Err(_) => { - info!("Error receiving exit signal"); - break; - } - } - - // get queue data - let rgbpp_requests = poll_service.rgbpp_tx_builder.get_rgbpp_queue_requests(); - let (rgbpp_requests, queue_cell) = match rgbpp_requests { - Ok((rgbpp_requests, queue_cell)) => { - let rgbpp_requests: Vec<_> = rgbpp_requests - .into_iter() - .map(|r| Request::new_unchecked(r.as_bytes())) - .collect(); - let queue_cell = OutPoint::new_unchecked(queue_cell.as_bytes()); - (rgbpp_requests, queue_cell) - } - Err(e) => { - error!("get RGB++ queue data error: {}", e.to_string()); - continue; - } - }; - - let leap_tx = poll_service.create_leap_tx(rgbpp_requests.clone(), queue_cell); - let leap_tx = match leap_tx { - Ok(leap_tx) => leap_tx, - Err(e) => { - error!("create leap transaction error: {}", e.to_string()); - continue; - } - }; - match wait_for_tx_confirmation( - poll_service.branch_rpc_client.clone(), - leap_tx, - Duration::from_secs(600), - ) { - Ok(()) => {} - Err(e) => error!("{}", e.to_string()), - } - - if !rgbpp_requests.is_empty() { - let update_queue_tx = poll_service.rgbpp_tx_builder.create_clear_queue_tx(); - let update_queue_tx = match update_queue_tx { - Ok(update_queue_tx) => update_queue_tx, - Err(e) => { - error!("{}", e.to_string()); - continue; - } - }; - match wait_for_tx_confirmation( - poll_service.rgbpp_rpc_client.clone(), - H256(update_queue_tx.0), - Duration::from_secs(600), - ) { - Ok(()) => {} - Err(e) => error!("{}", e.to_string()), - } - } - - if let Err(e) = poll_service.rgbpp_tx_builder.collect_rgbpp_request() { - info!("Aggregator: {:?}", e); - } - - thread::sleep(poll_interval); - } - } } fn get_script_map(scripts: Vec) -> HashMap { @@ -244,7 +158,7 @@ fn wait_for_tx_confirmation( mod tests { use super::*; - use ckb_types::{bytes::Bytes, core::ScriptHashType}; + use ckb_types::{bytes::Bytes, core::ScriptHashType, prelude::*}; use std::str::FromStr; diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index c5556e39ac..46b87bc021 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -1,21 +1,109 @@ mod leap; +use crate::schemas::leap::Request; +use crate::wait_for_tx_confirmation; use crate::Aggregator; use aggregator_common::error::Error; -use ckb_logger::{info, warn}; +use ckb_channel::Receiver; +use ckb_logger::{error, info, warn}; use ckb_types::{ core::FeeRate, h256, - packed::{CellDep, Script}, + packed::{CellDep, OutPoint, Script}, + prelude::*, H256, }; +use std::thread::{self}; +use std::time::Duration; + pub const SIGHASH_TYPE_HASH: H256 = h256!("0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8"); const CKB_FEE_RATE_LIMIT: u64 = 5000; impl Aggregator { + /// Collect RGB++ requests and send them to the branch chain + pub fn poll_rgbpp_requests(&self, stop_rx: Receiver<()>) { + let poll_interval = self.poll_interval; + let poll_service: Aggregator = self.clone(); + + loop { + match stop_rx.try_recv() { + Ok(_) => { + info!("Aggregator received exit signal, stopped"); + break; + } + Err(crossbeam_channel::TryRecvError::Empty) => { + // No exit signal, continue execution + } + Err(_) => { + info!("Error receiving exit signal"); + break; + } + } + + // get queue data + let rgbpp_requests = poll_service.rgbpp_tx_builder.get_rgbpp_queue_requests(); + let (rgbpp_requests, queue_cell) = match rgbpp_requests { + Ok((rgbpp_requests, queue_cell)) => { + let rgbpp_requests: Vec<_> = rgbpp_requests + .into_iter() + .map(|r| Request::new_unchecked(r.as_bytes())) + .collect(); + let queue_cell = OutPoint::new_unchecked(queue_cell.as_bytes()); + (rgbpp_requests, queue_cell) + } + Err(e) => { + error!("get RGB++ queue data error: {}", e.to_string()); + continue; + } + }; + + let leap_tx = poll_service.create_leap_tx(rgbpp_requests.clone(), queue_cell); + let leap_tx = match leap_tx { + Ok(leap_tx) => leap_tx, + Err(e) => { + error!("create leap transaction error: {}", e.to_string()); + continue; + } + }; + match wait_for_tx_confirmation( + poll_service.branch_rpc_client.clone(), + leap_tx, + Duration::from_secs(600), + ) { + Ok(()) => {} + Err(e) => error!("{}", e.to_string()), + } + + if !rgbpp_requests.is_empty() { + let update_queue_tx = poll_service.rgbpp_tx_builder.create_clear_queue_tx(); + let update_queue_tx = match update_queue_tx { + Ok(update_queue_tx) => update_queue_tx, + Err(e) => { + error!("{}", e.to_string()); + continue; + } + }; + match wait_for_tx_confirmation( + poll_service.rgbpp_rpc_client.clone(), + H256(update_queue_tx.0), + Duration::from_secs(600), + ) { + Ok(()) => {} + Err(e) => error!("{}", e.to_string()), + } + } + + if let Err(e) = poll_service.rgbpp_tx_builder.collect_rgbpp_request() { + info!("Aggregator: {:?}", e); + } + + thread::sleep(poll_interval); + } + } + fn fee_rate(&self) -> Result { let value = { let dynamic = self diff --git a/ckb-bin/src/subcommand/aggregator.rs b/ckb-bin/src/subcommand/aggregator.rs index 54356be1c2..68b8074369 100644 --- a/ckb-bin/src/subcommand/aggregator.rs +++ b/ckb-bin/src/subcommand/aggregator.rs @@ -18,7 +18,7 @@ pub fn aggregator(args: AggregatorArgs, chain_id: String) -> Result<(), ExitCode let aggregator_jh = thread::Builder::new() .name(THREAD_NAME.into()) .spawn(move || { - aggregator.collect_rgbpp_requests(stop_rx); + aggregator.poll_rgbpp_requests(stop_rx); }) .expect("Start aggregator failed!"); register_thread(THREAD_NAME, aggregator_jh); From 2c3b3871d1568a2ae6c11ffa9f743eefa6b5f782 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Fri, 23 Aug 2024 22:28:44 +0800 Subject: [PATCH 05/28] Aggregator_Branch --- .../src/branch_to_rgbpp/mod.rs | 15 ++++++++++ .../src/rgbpp_to_branch/mod.rs | 4 ++- ckb-bin/src/subcommand/aggregator.rs | 28 ++++++++++++++----- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index 8b13789179..cda5880e29 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -1 +1,16 @@ +use crate::Aggregator; +use ckb_channel::Receiver; +use ckb_logger::info; + +use std::thread; + +impl Aggregator { + /// Collect Branch requests and send them to the RGB++ chain + pub fn poll_branch_requests(&self, _stop_rx: Receiver<()>) { + info!("Branch Aggregator service started ..."); + + let poll_interval = self.poll_interval; + thread::sleep(poll_interval); + } +} diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index 46b87bc021..c07caf0340 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -15,7 +15,7 @@ use ckb_types::{ H256, }; -use std::thread::{self}; +use std::thread; use std::time::Duration; pub const SIGHASH_TYPE_HASH: H256 = @@ -25,6 +25,8 @@ const CKB_FEE_RATE_LIMIT: u64 = 5000; impl Aggregator { /// Collect RGB++ requests and send them to the branch chain pub fn poll_rgbpp_requests(&self, stop_rx: Receiver<()>) { + info!("RGB++ Aggregator service started ..."); + let poll_interval = self.poll_interval; let poll_service: Aggregator = self.clone(); diff --git a/ckb-bin/src/subcommand/aggregator.rs b/ckb-bin/src/subcommand/aggregator.rs index 68b8074369..4c5273f991 100644 --- a/ckb-bin/src/subcommand/aggregator.rs +++ b/ckb-bin/src/subcommand/aggregator.rs @@ -13,17 +13,31 @@ pub fn aggregator(args: AggregatorArgs, chain_id: String) -> Result<(), ExitCode let aggregator = Aggregator::new(args.config, Duration::from_secs(2), chain_id); - let stop_rx = new_crossbeam_exit_rx(); - const THREAD_NAME: &str = "Aggregator"; - let aggregator_jh = thread::Builder::new() - .name(THREAD_NAME.into()) + let stop_rx_rgbpp = new_crossbeam_exit_rx(); + let stop_rx_branch = new_crossbeam_exit_rx(); + + const THREAD_NAME_RGBPP: &str = "Aggregator_RGBPP"; + let aggregator_rgbpp = aggregator.clone(); + let aggregator_rgbpp_jh = thread::Builder::new() + .name(THREAD_NAME_RGBPP.into()) .spawn(move || { - aggregator.poll_rgbpp_requests(stop_rx); + aggregator_rgbpp.poll_rgbpp_requests(stop_rx_rgbpp); }) .expect("Start aggregator failed!"); - register_thread(THREAD_NAME, aggregator_jh); + register_thread(THREAD_NAME_RGBPP, aggregator_rgbpp_jh); + + const THREAD_NAME_BRANCH: &str = "Aggregator_Branch"; + let aggregator_branch = aggregator.clone(); + let aggregator_branch_jh = thread::Builder::new() + .name(THREAD_NAME_BRANCH.into()) + .spawn({ + move || { + aggregator_branch.poll_branch_requests(stop_rx_branch); + } + }) + .expect("Start Branch aggregator failed!"); + register_thread(THREAD_NAME_BRANCH, aggregator_branch_jh); - info!("Branch Aggregator service started ..."); ctrlc::set_handler(|| { info!("Trapped exit signal, exiting..."); broadcast_exit_signals(); From c0c8336f45b9f4a40b95a738b4d5b5e09ef20f1e Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Fri, 23 Aug 2024 22:38:25 +0800 Subject: [PATCH 06/28] refactoring. --- .../src/rgbpp_to_branch/mod.rs | 4 +- aggregator/util/rgbpp-tx/src/lib.rs | 64 +--------------- .../rgbpp-tx/src/rgbpp_to_branch/clear.rs | 2 +- .../rgbpp-tx/src/rgbpp_to_branch/custodian.rs | 2 +- .../util/rgbpp-tx/src/rgbpp_to_branch/mod.rs | 75 +++++++++++++++++++ 5 files changed, 83 insertions(+), 64 deletions(-) diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index c07caf0340..569cb14c81 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -46,7 +46,9 @@ impl Aggregator { } // get queue data - let rgbpp_requests = poll_service.rgbpp_tx_builder.get_rgbpp_queue_requests(); + let rgbpp_requests = poll_service + .rgbpp_tx_builder + .get_rgbpp_queue_outbox_requests(); let (rgbpp_requests, queue_cell) = match rgbpp_requests { Ok((rgbpp_requests, queue_cell)) => { let rgbpp_requests: Vec<_> = rgbpp_requests diff --git a/aggregator/util/rgbpp-tx/src/lib.rs b/aggregator/util/rgbpp-tx/src/lib.rs index 4234d81ea8..6a4311153f 100644 --- a/aggregator/util/rgbpp-tx/src/lib.rs +++ b/aggregator/util/rgbpp-tx/src/lib.rs @@ -13,20 +13,19 @@ use aggregator_common::{ use ckb_app_config::{AssetConfig, LockConfig, ScriptConfig}; use ckb_logger::{info, warn}; use ckb_sdk::{ - rpc::ckb_indexer::{Cell, Order}, rpc::CkbRpcClient as RpcClient, rpc::ResponseFormatGetter, - traits::{CellQueryOptions, LiveCell, MaturityOption, PrimaryScriptType, QueryOrder}, + traits::{CellQueryOptions, MaturityOption, PrimaryScriptType, QueryOrder}, }; use ckb_types::{ core::{FeeRate, ScriptHashType}, h256, - packed::{Byte32, Bytes as PackedBytes, CellDep, OutPoint, Script, Transaction, WitnessArgs}, + packed::{Bytes as PackedBytes, CellDep, Script, Transaction, WitnessArgs}, prelude::*, H256, }; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; @@ -116,63 +115,6 @@ impl RgbppTxBuilder { Ok(value) } - pub(crate) fn get_rgbpp_queue_cell(&self) -> Result<(Cell, CrossChainQueue), Error> { - info!("Scan RGB++ Message Queue ..."); - - let queue_cell_search_option = self.build_message_queue_cell_search_option()?; - let queue_cell = self - .rgbpp_rpc_client - .get_cells(queue_cell_search_option.into(), Order::Asc, 1.into(), None) - .map_err(|e| Error::LiveCellNotFound(e.to_string()))?; - if queue_cell.objects.len() != 1 { - return Err(Error::LiveCellNotFound(format!( - "Queue cell found: {}", - queue_cell.objects.len() - ))); - } - info!("Found {} queue cell", queue_cell.objects.len()); - let queue_cell = queue_cell.objects[0].clone(); - - let queue_live_cell: LiveCell = queue_cell.clone().into(); - let queue_data = queue_live_cell.output_data; - let queue = CrossChainQueue::from_slice(&queue_data) - .map_err(|e| Error::QueueCellDataDecodeError(e.to_string()))?; - - Ok((queue_cell, queue)) - } - - pub fn get_rgbpp_queue_requests(&self) -> Result<(Vec, OutPoint), Error> { - let (queue_cell, queue_cell_data) = self.get_rgbpp_queue_cell()?; - if queue_cell_data.outbox().is_empty() { - info!("No requests in queue"); - return Ok((vec![], OutPoint::default())); - } - let request_ids: Vec = queue_cell_data.outbox().into_iter().collect(); - - let queue_out_point = queue_cell.out_point.clone(); - let (_, witness_input_type) = - self.get_tx_witness_input_type(queue_cell.out_point, self.rgbpp_rpc_client.clone())?; - let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| { - Error::TransactionParseError(format!("get requests from witness error: {}", e)) - })?; - info!("Found {} requests in witness", requests.len()); - - // check requests - let request_set: HashSet = requests - .clone() - .into_iter() - .map(|request| request.as_bytes().pack().calc_raw_data_hash()) - .collect(); - let all_ids_present = request_ids.iter().all(|id| request_set.contains(id)); - if all_ids_present { - Ok((requests.into_iter().collect(), queue_out_point.into())) - } else { - Err(Error::QueueCellDataError( - "Request IDs in queue cell data do not match witness".to_string(), - )) - } - } - fn get_tx_witness_input_type( &self, out_point: ckb_jsonrpc_types::OutPoint, diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs index 5435c21e73..9ce9d40467 100644 --- a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs @@ -35,7 +35,7 @@ pub const SIGHASH_TYPE_HASH: H256 = impl RgbppTxBuilder { pub fn create_clear_queue_tx(&self) -> Result { // get queue cell - let (queue_cell, queue_cell_data) = self.get_rgbpp_queue_cell()?; + let (queue_cell, queue_cell_data) = self.get_rgbpp_queue_outbox_cell()?; info!( "The queue contains {} items that need to be cleared.", queue_cell_data.outbox().len() diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs index 843d003e3b..56cce0e7ad 100644 --- a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs @@ -102,7 +102,7 @@ impl RgbppTxBuilder { pub fn create_custodian_tx(&self, request_cells: Vec<(Cell, Transfer)>) -> Result { // get queue cell - let (queue_cell, queue_cell_data) = self.get_rgbpp_queue_cell()?; + let (queue_cell, queue_cell_data) = self.get_rgbpp_queue_outbox_cell()?; // build new queue let mut request_ids = vec![]; diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs index 1e5b3146f4..2994c734f6 100644 --- a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs @@ -1,2 +1,77 @@ mod clear; mod custodian; + +pub use crate::schemas::leap::{CrossChainQueue, Request, Requests}; +use crate::RgbppTxBuilder; + +use aggregator_common::error::Error; +use ckb_logger::info; +use ckb_sdk::{ + rpc::ckb_indexer::{Cell, Order}, + traits::LiveCell, +}; +use ckb_types::{ + packed::{Byte32, OutPoint}, + prelude::*, +}; + +use std::collections::HashSet; + +impl RgbppTxBuilder { + pub fn get_rgbpp_queue_outbox_requests(&self) -> Result<(Vec, OutPoint), Error> { + let (queue_cell, queue_cell_data) = self.get_rgbpp_queue_outbox_cell()?; + if queue_cell_data.outbox().is_empty() { + info!("No requests in queue"); + return Ok((vec![], OutPoint::default())); + } + let request_ids: Vec = queue_cell_data.outbox().into_iter().collect(); + + let queue_out_point = queue_cell.out_point.clone(); + let (_, witness_input_type) = + self.get_tx_witness_input_type(queue_cell.out_point, self.rgbpp_rpc_client.clone())?; + let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| { + Error::TransactionParseError(format!("get requests from witness error: {}", e)) + })?; + info!("Found {} requests in witness", requests.len()); + + // check requests + let request_set: HashSet = requests + .clone() + .into_iter() + .map(|request| request.as_bytes().pack().calc_raw_data_hash()) + .collect(); + let all_ids_present = request_ids.iter().all(|id| request_set.contains(id)); + if all_ids_present { + Ok((requests.into_iter().collect(), queue_out_point.into())) + } else { + Err(Error::QueueCellDataError( + "Request IDs in queue cell data do not match witness".to_string(), + )) + } + } + + pub(crate) fn get_rgbpp_queue_outbox_cell(&self) -> Result<(Cell, CrossChainQueue), Error> { + info!("Scan RGB++ Message Queue ..."); + + let queue_cell_search_option = self.build_message_queue_cell_search_option()?; + let queue_cell = self + .rgbpp_rpc_client + .get_cells(queue_cell_search_option.into(), Order::Asc, 1.into(), None) + .map_err(|e| Error::LiveCellNotFound(e.to_string()))?; + if queue_cell.objects.len() != 1 { + return Err(Error::LiveCellNotFound(format!( + "Queue cell found: {}", + queue_cell.objects.len() + ))); + } + info!("Found {} queue cell", queue_cell.objects.len()); + let queue_cell = queue_cell.objects[0].clone(); + + let queue_live_cell: LiveCell = queue_cell.clone().into(); + let queue_data = queue_live_cell.output_data; + let queue = CrossChainQueue::from_slice(&queue_data) + .map_err(|e| Error::QueueCellDataDecodeError(e.to_string()))?; + + Ok((queue_cell, queue)) + } +} From 0b41e17342bb9f2559badb797322cff3fb5efca9 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sat, 24 Aug 2024 13:18:03 +0800 Subject: [PATCH 07/28] add aggregator storage crate --- Cargo.lock | 10 + Cargo.toml | 1 + .../src/branch_to_rgbpp/mod.rs | 22 +- aggregator/util/common/src/error.rs | 2 + aggregator/util/storage/Cargo.toml | 16 ++ aggregator/util/storage/src/lib.rs | 268 ++++++++++++++++++ 6 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 aggregator/util/storage/Cargo.toml create mode 100644 aggregator/util/storage/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 1e39b8e623..8b3632c8c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "aggregator-storage" +version = "0.1.0" +dependencies = [ + "aggregator-common", + "ckb-rocksdb", + "ckb-types 0.116.1", + "tempfile", +] + [[package]] name = "ahash" version = "0.3.8" diff --git a/Cargo.toml b/Cargo.toml index 5b1cdaff10..f711757658 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,7 @@ members = [ "ckb-bin", "branch-chain-producer", "aggregator/util/common", + "aggregator/util/storage", "aggregator/util/rgbpp-tx", "aggregator/aggregator-main"] diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index cda5880e29..aa62a140b5 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -7,10 +7,28 @@ use std::thread; impl Aggregator { /// Collect Branch requests and send them to the RGB++ chain - pub fn poll_branch_requests(&self, _stop_rx: Receiver<()>) { + pub fn poll_branch_requests(&self, stop_rx: Receiver<()>) { info!("Branch Aggregator service started ..."); let poll_interval = self.poll_interval; - thread::sleep(poll_interval); + let _poll_service: Aggregator = self.clone(); + + loop { + match stop_rx.try_recv() { + Ok(_) => { + info!("Aggregator received exit signal, stopped"); + break; + } + Err(crossbeam_channel::TryRecvError::Empty) => { + // No exit signal, continue execution + } + Err(_) => { + info!("Error receiving exit signal"); + break; + } + } + + thread::sleep(poll_interval); + } } } diff --git a/aggregator/util/common/src/error.rs b/aggregator/util/common/src/error.rs index 9f376388a0..9611acd85f 100644 --- a/aggregator/util/common/src/error.rs +++ b/aggregator/util/common/src/error.rs @@ -30,6 +30,8 @@ pub enum Error { LockNotFound(String), #[error("Branch script not found: {0}")] BranchScriptNotFound(String), + #[error("database error: {0}")] + DatabaseError(String), #[error("other error: {0}")] Other(String), } diff --git a/aggregator/util/storage/Cargo.toml b/aggregator/util/storage/Cargo.toml new file mode 100644 index 0000000000..1c6037c73d --- /dev/null +++ b/aggregator/util/storage/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "aggregator-storage" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +aggregator-common = { path = "../common", version = "0.1.0" } + +ckb-types = { path = "../../../util/types", version = "= 0.116.1" } + +rocksdb = { package = "ckb-rocksdb", version ="=0.21.1", features = ["snappy"], default-features = false } + +[dev-dependencies] +tempfile = "3" \ No newline at end of file diff --git a/aggregator/util/storage/src/lib.rs b/aggregator/util/storage/src/lib.rs new file mode 100644 index 0000000000..d86cddf05e --- /dev/null +++ b/aggregator/util/storage/src/lib.rs @@ -0,0 +1,268 @@ +#![allow(missing_docs)] + +use aggregator_common::error::Error::{self, DatabaseError}; +use ckb_types::H256; +use rocksdb::{ + ops::{Delete, Get, Iterate, Open, Put}, + IteratorMode, Options, DB, +}; + +use std::path::Path; +use std::sync::Arc; + +pub struct Storage { + db: Arc, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BranchRequestStatus { + Pending = 0x00, + Commit = 0x01, +} + +impl From for u8 { + fn from(status: BranchRequestStatus) -> Self { + status as u8 + } +} + +impl TryFrom for BranchRequestStatus { + type Error = (); + + fn try_from(value: u8) -> Result { + match value { + 0x00 => Ok(BranchRequestStatus::Pending), + 0x01 => Ok(BranchRequestStatus::Commit), + _ => Err(()), + } + } +} + +impl Storage { + /// Opens or creates a new database instance + pub fn new>(path: P) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + let db = Arc::new(DB::open(&opts, path).map_err(|err| DatabaseError(err.to_string()))?); + Ok(Storage { db }) + } + + /// Inserts a new branch request with a default status of `Pending` + pub fn insert_branch_request(&self, height: u64, tx: H256) -> Result<(), Error> { + let key = height.to_be_bytes(); + let mut value = vec![BranchRequestStatus::Pending.into()]; // Use enum value to represent status + value.extend_from_slice(tx.as_bytes()); + self.db + .put(key, value) + .map_err(|err| DatabaseError(err.to_string()))?; + Ok(()) + } + + /// Updates the status of a specific height to `Commit`, with validation of `tx` + pub fn commit_branch_request(&self, height: u64, expected_tx: H256) -> Result<(), Error> { + let key = height.to_be_bytes(); + + let value = self + .db + .get(key) + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))?; + let value = match value { + Some(val) => val, + None => return Err(DatabaseError("Height not found in the database".into())), + }; + + let stored_tx = H256::from_slice(&value[1..33]) + .map_err(|_| DatabaseError("Failed to parse stored transaction".into()))?; + + // Verify that the provided tx matches the stored tx + if stored_tx != expected_tx { + return Err(DatabaseError( + "Transaction hash does not match the stored value".into(), + )); + } + + let mut value = value.to_vec(); // Convert DBVector to Vec for mutability + value[0] = BranchRequestStatus::Commit.into(); // Update status to `Commit` + self.db + .put(key, value) + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))?; // Write back to the database + + // Check if the next height exists and is pending + let next_height = height + 1; + let next_key = next_height.to_be_bytes(); + if let Some(next_value) = self + .db + .get(next_key) + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))? + { + if BranchRequestStatus::try_from(next_value[0]).unwrap() == BranchRequestStatus::Pending + { + self.db + .put(b"earliest_pending", next_height.to_be_bytes()) + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))?; + } else { + return Err(DatabaseError( + "Unexpected commit status found for next height".into(), + )); + } + } else { + // No next height or it is not pending, clear the earliest_pending + self.db + .delete(b"earliest_pending") + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))?; + } + + Ok(()) + } + + /// Retrieves the earliest height with `Pending` status + pub fn get_earliest_pending(&self) -> Result, Error> { + match self + .db + .get(b"earliest_pending") + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))? + { + Some(height_bytes) => { + let height = u64::from_be_bytes(height_bytes.as_ref().try_into().map_err( + |err: std::array::TryFromSliceError| DatabaseError(err.to_string()), + )?); + let key = height.to_be_bytes(); + if let Some(value) = self + .db + .get(key) + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string()))? + { + let tx = H256::from_slice(&value[1..33]) + .map_err(|_| DatabaseError("Failed to parse stored tx hash".into()))?; + Ok(Some((height, tx))) + } else { + Ok(None) + } + } + None => Ok(None), + } + } + + /// Retrieves the highest height in the database + pub fn get_last_branch_request(&self) -> Result, Error> { + let mut iter = self.db.iterator(IteratorMode::End); + if let Some((key, value)) = iter.next() { + let height = + u64::from_be_bytes(key.as_ref().try_into().map_err( + |err: std::array::TryFromSliceError| DatabaseError(err.to_string()), + )?); + let tx = H256::from_slice(&value[1..33]) + .map_err(|_| DatabaseError("Failed to parse stored transaction".into()))?; + Ok(Some((height, tx))) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn setup_store() -> Storage { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + Storage::new(temp_dir.path()).expect("failed to create store") + } + + #[test] + fn test_insert_and_get_last_branch_request() { + let store = setup_store(); + + // Insert branch requests + store + .insert_branch_request(1, H256::default()) + .expect("failed to insert request 1"); + store + .insert_branch_request(2, H256::default()) + .expect("failed to insert request 2"); + store + .insert_branch_request(3, H256::default()) + .expect("failed to insert request 3"); + + // Get the last branch request + let last_request = store + .get_last_branch_request() + .expect("failed to get last branch request"); + assert_eq!(last_request, Some((3, H256::default()))); + } + + #[test] + fn test_commit_height() { + let store = setup_store(); + + // Insert branch requests + store + .insert_branch_request(1, H256::default()) + .expect("failed to insert request 1"); + store + .insert_branch_request(2, H256::default()) + .expect("failed to insert request 2"); + + // Commit the first request + store + .commit_branch_request(1, H256::default()) + .expect("failed to commit height 1"); + + // Check if the earliest pending is now 2 + let earliest_pending = store + .get_earliest_pending() + .expect("failed to get earliest pending"); + assert_eq!(earliest_pending, Some((2, H256::default()))); + } + + #[test] + fn test_get_earliest_pending_after_all_commit() { + let store = setup_store(); + + // Insert and commit branch requests + store + .insert_branch_request(1, H256::default()) + .expect("failed to insert request 1"); + store + .insert_branch_request(2, H256::default()) + .expect("failed to insert request 2"); + store + .commit_branch_request(1, H256::default()) + .expect("failed to commit height 1"); + store + .commit_branch_request(2, H256::default()) + .expect("failed to commit height 2"); + + // Check if there is no pending request left + let earliest_pending = store + .get_earliest_pending() + .expect("failed to get earliest pending"); + assert_eq!(earliest_pending, None); + } + + #[test] + fn test_commit_height_invalid_tx() { + let store = setup_store(); + + // Insert branch requests + store + .insert_branch_request(1, H256::default()) + .expect("failed to insert request 1"); + + // Attempt to commit with an invalid tx + let result = store.commit_branch_request(1, H256::from_slice(&[1u8; 32]).unwrap()); + assert!(result.is_err()); + } + + #[test] + fn test_get_last_branch_request_empty_store() { + let store = setup_store(); + + // Attempt to get the last branch request in an empty store + let last_request = store + .get_last_branch_request() + .expect("failed to get last branch request"); + assert_eq!(last_request, None); + } +} From a1a3195803a6787d0d4441199c07457ed45ba01f Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sat, 24 Aug 2024 17:14:43 +0800 Subject: [PATCH 08/28] init store in aggregator. --- Cargo.lock | 1 + aggregator/aggregator-main/Cargo.toml | 1 + aggregator/aggregator-main/src/lib.rs | 7 +++++++ aggregator/util/storage/src/lib.rs | 1 + 4 files changed, 10 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 8b3632c8c9..e8a4267797 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,7 @@ version = "0.1.0" dependencies = [ "aggregator-common", "aggregator-rgbpp-tx", + "aggregator-storage", "ckb-app-config", "ckb-channel 0.116.1", "ckb-gen-types 0.116.1 (git+https://github.com/ethanyuan/ckb?branch=v0.116.1-branch-chain)", diff --git a/aggregator/aggregator-main/Cargo.toml b/aggregator/aggregator-main/Cargo.toml index 161bd1c687..6b3abfca38 100644 --- a/aggregator/aggregator-main/Cargo.toml +++ b/aggregator/aggregator-main/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] aggregator-common = { path = "../util/common", version = "0.1.0" } +aggregator-storage = { path = "../util/storage", version = "0.1.0" } aggregator-rgbpp-tx = { path = "../util/rgbpp-tx", version = "0.1.0" } ckb-app-config = { path = "../../util/app-config", version = "= 0.116.1" } ckb-channel = { path = "../../util/channel", version = "= 0.116.1" } diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index d2a8e2e2c6..acd2e01cda 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -6,7 +6,9 @@ pub(crate) mod schemas; use aggregator_common::{error::Error, utils::encode_udt_amount}; use aggregator_rgbpp_tx::RgbppTxBuilder; +use aggregator_storage::Storage; use ckb_app_config::{AggregatorConfig, AssetConfig, LockConfig, ScriptConfig}; +use ckb_logger::info; use ckb_sdk::rpc::CkbRpcClient as RpcClient; use ckb_types::{ packed::{CellDep, Script}, @@ -31,6 +33,8 @@ pub struct Aggregator { asset_locks: HashMap, rgbpp_tx_builder: RgbppTxBuilder, + + store: Storage, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -61,6 +65,8 @@ impl Aggregator { config.asset_types.clone(), config.asset_locks.clone(), ); + info!("aggregator store:{:?}", config.store); + let store = Storage::new(config.store.clone()).expect("storage init"); Aggregator { config: config.clone(), poll_interval, @@ -70,6 +76,7 @@ impl Aggregator { asset_types: get_asset_types(config.asset_types), asset_locks: get_asset_locks(config.asset_locks), rgbpp_tx_builder, + store, } } } diff --git a/aggregator/util/storage/src/lib.rs b/aggregator/util/storage/src/lib.rs index d86cddf05e..b1a9465a65 100644 --- a/aggregator/util/storage/src/lib.rs +++ b/aggregator/util/storage/src/lib.rs @@ -10,6 +10,7 @@ use rocksdb::{ use std::path::Path; use std::sync::Arc; +#[derive(Clone)] pub struct Storage { db: Arc, } From 9cb8453f1f481d4180d301d9524e5b91fe2069f9 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sun, 25 Aug 2024 14:17:49 +0800 Subject: [PATCH 09/28] impl get_branch_queue_outbox_requests. --- .../src/branch_to_rgbpp/mod.rs | 129 +++++++++++++++++- aggregator/aggregator-main/src/lib.rs | 12 ++ .../src/rgbpp_to_branch/leap.rs | 2 +- .../src/rgbpp_to_branch/mod.rs | 20 +-- aggregator/util/common/src/utils/privkey.rs | 5 +- .../rgbpp-tx/src/rgbpp_to_branch/clear.rs | 6 +- aggregator/util/storage/src/lib.rs | 39 ++++++ 7 files changed, 185 insertions(+), 28 deletions(-) diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index aa62a140b5..bd2b34985e 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -1,8 +1,28 @@ -use crate::Aggregator; +pub use crate::schemas::leap::{CrossChainQueue, Request, Requests}; +use crate::{Aggregator, CKB_FEE_RATE_LIMIT, SIGHASH_TYPE_HASH}; +use aggregator_common::{ + error::Error, + utils::{privkey::get_sighash_lock_args_from_privkey, QUEUE_TYPE}, +}; use ckb_channel::Receiver; -use ckb_logger::info; +use ckb_logger::{error, info, warn}; +use ckb_sdk::{ + rpc::{ + ckb_indexer::{Cell, Order}, + CkbRpcClient as RpcClient, ResponseFormatGetter, + }, + traits::{CellQueryOptions, LiveCell, MaturityOption, PrimaryScriptType, QueryOrder}, +}; +use ckb_types::{ + core::{FeeRate, ScriptHashType}, + h256, + packed::{Byte32, CellDep, OutPoint, Script}, + prelude::*, + H256, +}; +use std::collections::HashSet; use std::thread; impl Aggregator { @@ -11,7 +31,7 @@ impl Aggregator { info!("Branch Aggregator service started ..."); let poll_interval = self.poll_interval; - let _poll_service: Aggregator = self.clone(); + let poll_service: Aggregator = self.clone(); loop { match stop_rx.try_recv() { @@ -28,7 +48,110 @@ impl Aggregator { } } + // get Branch queue outbox data + let rgbpp_requests = poll_service.get_branch_queue_outbox_requests(); + let (rgbpp_requests, queue_cell) = match rgbpp_requests { + Ok((rgbpp_requests, queue_cell)) => (rgbpp_requests, queue_cell), + Err(e) => { + error!("get RGB++ queue data error: {}", e.to_string()); + continue; + } + }; + + // clear queue + thread::sleep(poll_interval); } } + + fn get_branch_queue_outbox_requests(&self) -> Result<(Vec, OutPoint), Error> { + let (queue_cell, queue_cell_data) = self.get_branch_queue_outbox_cell()?; + if queue_cell_data.outbox().is_empty() { + info!("No requests in queue"); + return Ok((vec![], OutPoint::default())); + } + let request_ids: Vec = queue_cell_data.outbox().into_iter().collect(); + + let queue_out_point = queue_cell.out_point.clone(); + let (_, witness_input_type) = + self.get_tx_witness_input_type(queue_cell.out_point, self.branch_rpc_client.clone())?; + let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| { + Error::TransactionParseError(format!("get requests from witness error: {}", e)) + })?; + info!("Found {} requests in witness", requests.len()); + + // check requests + let request_set: HashSet = requests + .clone() + .into_iter() + .map(|request| request.as_bytes().pack().calc_raw_data_hash()) + .collect(); + let all_ids_present = request_ids.iter().all(|id| request_set.contains(id)); + if all_ids_present { + Ok((requests.into_iter().collect(), queue_out_point.into())) + } else { + Err(Error::QueueCellDataError( + "Request IDs in queue cell data do not match witness".to_string(), + )) + } + } + + fn get_branch_queue_outbox_cell(&self) -> Result<(Cell, CrossChainQueue), Error> { + info!("Scan Branch Message Queue Outbox ..."); + + let queue_cell_search_option = self.build_branch_queue_outbox_cell_search_option()?; + let queue_cell = self + .branch_rpc_client + .get_cells(queue_cell_search_option.into(), Order::Asc, 1.into(), None) + .map_err(|e| Error::LiveCellNotFound(e.to_string()))?; + if queue_cell.objects.len() != 1 { + return Err(Error::LiveCellNotFound(format!( + "Branch queue outbox cell found: {}", + queue_cell.objects.len() + ))); + } + info!("Found {} queue outbox cell", queue_cell.objects.len()); + let queue_cell = queue_cell.objects[0].clone(); + + let queue_live_cell: LiveCell = queue_cell.clone().into(); + let queue_data = queue_live_cell.output_data; + let queue = CrossChainQueue::from_slice(&queue_data) + .map_err(|e| Error::QueueCellDataDecodeError(e.to_string()))?; + + Ok((queue_cell, queue)) + } + + fn build_branch_queue_outbox_cell_search_option(&self) -> Result { + let (message_queue_outbox_lock_args, _) = get_sighash_lock_args_from_privkey( + self.config + .branch_chain_token_manager_outbox_lock_key_path + .clone(), + )?; + info!( + "message_queue_outbox_lock_args: {:?}", + message_queue_outbox_lock_args.pack().to_string() + ); + let message_queue_outbox_lock = Script::new_builder() + .code_hash(SIGHASH_TYPE_HASH.pack()) + .hash_type(ScriptHashType::Type.into()) + .args(message_queue_outbox_lock_args.pack()) + .build(); + + let cell_query_option = CellQueryOptions { + primary_script: message_queue_outbox_lock, + primary_type: PrimaryScriptType::Lock, + with_data: Some(true), + secondary_script: None, + secondary_script_len_range: None, + data_len_range: None, + capacity_range: None, + block_range: None, + order: QueryOrder::Asc, + limit: Some(1), + maturity: MaturityOption::Mature, + min_total_capacity: 1, + script_search_mode: None, + }; + Ok(cell_query_option) + } } diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index acd2e01cda..5296cf521c 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -11,6 +11,7 @@ use ckb_app_config::{AggregatorConfig, AssetConfig, LockConfig, ScriptConfig}; use ckb_logger::info; use ckb_sdk::rpc::CkbRpcClient as RpcClient; use ckb_types::{ + h256, packed::{CellDep, Script}, H256, }; @@ -20,6 +21,10 @@ use std::str::FromStr; use std::thread::sleep; use std::time::Duration; +pub const SIGHASH_TYPE_HASH: H256 = + h256!("0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8"); +const CKB_FEE_RATE_LIMIT: u64 = 5000; + /// Aggregator #[derive(Clone)] pub struct Aggregator { @@ -79,6 +84,13 @@ impl Aggregator { store, } } + + fn get_branch_script(&self, script_name: &str) -> Result { + self.branch_scripts + .get(script_name) + .map(|script_info| script_info.script.clone()) + .ok_or_else(|| Error::MissingScriptInfo(script_name.to_string())) + } } fn get_script_map(scripts: Vec) -> HashMap { diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs index f2ab363d43..24af30c2fa 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/leap.rs @@ -1,5 +1,5 @@ -use crate::rgbpp_to_branch::SIGHASH_TYPE_HASH; use crate::schemas::leap::{MessageUnion, Request}; +use crate::SIGHASH_TYPE_HASH; use crate::{encode_udt_amount, Aggregator}; use aggregator_common::{ diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index 569cb14c81..2b158ec023 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -2,15 +2,14 @@ mod leap; use crate::schemas::leap::Request; use crate::wait_for_tx_confirmation; -use crate::Aggregator; +use crate::{Aggregator, CKB_FEE_RATE_LIMIT}; use aggregator_common::error::Error; use ckb_channel::Receiver; use ckb_logger::{error, info, warn}; use ckb_types::{ core::FeeRate, - h256, - packed::{CellDep, OutPoint, Script}, + packed::{CellDep, OutPoint}, prelude::*, H256, }; @@ -18,10 +17,6 @@ use ckb_types::{ use std::thread; use std::time::Duration; -pub const SIGHASH_TYPE_HASH: H256 = - h256!("0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8"); -const CKB_FEE_RATE_LIMIT: u64 = 5000; - impl Aggregator { /// Collect RGB++ requests and send them to the branch chain pub fn poll_rgbpp_requests(&self, stop_rx: Receiver<()>) { @@ -45,7 +40,7 @@ impl Aggregator { } } - // get queue data + // get RGB++ queue outbox data let rgbpp_requests = poll_service .rgbpp_tx_builder .get_rgbpp_queue_outbox_requests(); @@ -64,6 +59,7 @@ impl Aggregator { } }; + // create leap transaction let leap_tx = poll_service.create_leap_tx(rgbpp_requests.clone(), queue_cell); let leap_tx = match leap_tx { Ok(leap_tx) => leap_tx, @@ -81,6 +77,7 @@ impl Aggregator { Err(e) => error!("{}", e.to_string()), } + // clear queue if !rgbpp_requests.is_empty() { let update_queue_tx = poll_service.rgbpp_tx_builder.create_clear_queue_tx(); let update_queue_tx = match update_queue_tx { @@ -140,11 +137,4 @@ impl Aggregator { .map(|script_info| script_info.cell_dep.clone()) .ok_or_else(|| Error::MissingScriptInfo(script_name.to_string())) } - - fn _get_branch_script(&self, script_name: &str) -> Result { - self.branch_scripts - .get(script_name) - .map(|script_info| script_info.script.clone()) - .ok_or_else(|| Error::MissingScriptInfo(script_name.to_string())) - } } diff --git a/aggregator/util/common/src/utils/privkey.rs b/aggregator/util/common/src/utils/privkey.rs index ed4feb3cd2..fd165b7bfe 100644 --- a/aggregator/util/common/src/utils/privkey.rs +++ b/aggregator/util/common/src/utils/privkey.rs @@ -3,15 +3,12 @@ use crate::error::Error; use ckb_hash::blake2b_256; -use ckb_types::{bytes::Bytes, h256, H256}; +use ckb_types::bytes::Bytes; use std::fs::File; use std::io::Read; use std::path::PathBuf; -pub const SIGHASH_TYPE_HASH: H256 = - h256!("0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8"); - pub fn get_sighash_lock_args_from_privkey( key_path: PathBuf, ) -> Result<(Bytes, secp256k1::SecretKey), Error> { diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs index 9ce9d40467..a09b92697b 100644 --- a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/clear.rs @@ -1,4 +1,4 @@ -use crate::RgbppTxBuilder; +use crate::{RgbppTxBuilder, SIGHASH_TYPE_HASH}; use aggregator_common::{ error::Error, @@ -20,7 +20,6 @@ use ckb_sdk::{ }; use ckb_types::{ core::ScriptHashType, - h256, packed::{Byte32, Bytes as PackedBytes, CellInput, Script}, prelude::*, H256, @@ -29,9 +28,6 @@ use molecule::prelude::Entity; use std::collections::HashMap; -pub const SIGHASH_TYPE_HASH: H256 = - h256!("0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8"); - impl RgbppTxBuilder { pub fn create_clear_queue_tx(&self) -> Result { // get queue cell diff --git a/aggregator/util/storage/src/lib.rs b/aggregator/util/storage/src/lib.rs index b1a9465a65..0344cfc8c4 100644 --- a/aggregator/util/storage/src/lib.rs +++ b/aggregator/util/storage/src/lib.rs @@ -159,6 +159,45 @@ impl Storage { Ok(None) } } + + /// Records a transaction that has been constructed but not yet confirmed + pub fn record_staged_tx(&self, tx_hash: H256) -> Result<(), Error> { + self.db + .put(b"staged_tx", tx_hash.as_bytes()) + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string())) + } + + /// Removes the staged transaction record after it has been confirmed + pub fn clear_staged_tx(&self) -> Result<(), Error> { + self.db + .delete(b"staged_tx") + .map_err(|err: rocksdb::Error| DatabaseError(err.to_string())) + } + + /// Checks if a given transaction hash has already been stored for a specific height + pub fn is_tx_stored(&self, height: u64, tx_hash: H256) -> Result { + let key = height.to_be_bytes(); + + // Retrieve the stored value by height + if let Some(value) = self + .db + .get(&key) + .map_err(|err: rocksdb::Error| Error::DatabaseError(err.to_string()))? + { + let stored_tx = + H256::from_slice(&value[1..33]) // Extract tx field from the value + .map_err(|_| { + Error::DatabaseError("Failed to parse stored transaction".into()) + })?; + + debug_assert!(stored_tx == tx_hash); + + // Compare the stored transaction hash with the provided one + Ok(stored_tx == tx_hash) + } else { + Ok(false) + } + } } #[cfg(test)] From cf5301843747798027a66d57172260761487bfbd Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sun, 25 Aug 2024 16:01:46 +0800 Subject: [PATCH 10/28] impl check_storage. --- .../src/branch_to_rgbpp/clear_queue_outbox.rs | 76 +++++++++++++++++++ .../src/branch_to_rgbpp/mod.rs | 29 ++++--- aggregator/util/storage/src/lib.rs | 8 ++ 3 files changed, 98 insertions(+), 15 deletions(-) create mode 100644 aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs new file mode 100644 index 0000000000..fddeb85d5d --- /dev/null +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs @@ -0,0 +1,76 @@ +use crate::Aggregator; + +use aggregator_common::error::Error; +use ckb_logger::info; +use ckb_types::H256; + +use std::thread::sleep; +use std::time::Duration; + +impl Aggregator { + pub(crate) fn check_storage(&self) -> Result<(), Error> { + if let Some(tx_hash) = self.store.get_staged_tx()? { + let hash = H256::from_slice(tx_hash.as_bytes()) + .expect("Failed to convert staged transaction hash to H256"); + let tx = self + .branch_rpc_client + .get_transaction(hash.clone()) + .map_err(|e| Error::RpcError(format!("Failed to get transaction: {}", e)))?; + match tx { + Some(tx) => { + let height = tx.tx_status.block_number; + match height { + Some(height) => { + if self.store.is_tx_stored(height.into(), tx_hash.clone())? { + self.store.clear_staged_tx()?; + } else { + self.store.insert_branch_request(height.into(), tx_hash)?; + self.store.clear_staged_tx()?; + } + } + None => { + let height = self.wait_for_transaction_packing(hash)?; + self.store.insert_branch_request(height.into(), tx_hash)?; + self.store.clear_staged_tx()?; + } + } + } + None => { + // Theoretically, this situation does not exist + self.store.clear_staged_tx()?; + } + } + } + + Ok(()) + } + + pub(crate) fn create_clear_queue_outbox_tx(&self) -> Result<(), Error> { + Ok(()) + } + + fn wait_for_transaction_packing(&self, tx_hash: H256) -> Result { + loop { + let tx = self + .branch_rpc_client + .get_transaction( + H256::from_slice(tx_hash.as_bytes()) + .expect("Failed to convert staged transaction hash to H256"), + ) + .map_err(|e| Error::RpcError(format!("Failed to get transaction: {}", e)))?; + + if let Some(tx) = tx { + if let Some(height) = tx.tx_status.block_number { + // Transaction has been included in a block, return the height + return Ok(height.into()); + } else { + // Transaction is pending, log and wait + info!("Transaction is pending, waiting for block..."); + } + } + + // Wait before next retry + sleep(Duration::from_secs(1)); + } + } +} diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index bd2b34985e..d0cb622f33 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -1,25 +1,19 @@ +mod clear_queue_outbox; + pub use crate::schemas::leap::{CrossChainQueue, Request, Requests}; -use crate::{Aggregator, CKB_FEE_RATE_LIMIT, SIGHASH_TYPE_HASH}; +use crate::{Aggregator, SIGHASH_TYPE_HASH}; -use aggregator_common::{ - error::Error, - utils::{privkey::get_sighash_lock_args_from_privkey, QUEUE_TYPE}, -}; +use aggregator_common::{error::Error, utils::privkey::get_sighash_lock_args_from_privkey}; use ckb_channel::Receiver; -use ckb_logger::{error, info, warn}; +use ckb_logger::{error, info}; use ckb_sdk::{ - rpc::{ - ckb_indexer::{Cell, Order}, - CkbRpcClient as RpcClient, ResponseFormatGetter, - }, + rpc::ckb_indexer::{Cell, Order}, traits::{CellQueryOptions, LiveCell, MaturityOption, PrimaryScriptType, QueryOrder}, }; use ckb_types::{ - core::{FeeRate, ScriptHashType}, - h256, - packed::{Byte32, CellDep, OutPoint, Script}, + core::ScriptHashType, + packed::{Byte32, OutPoint, Script}, prelude::*, - H256, }; use std::collections::HashSet; @@ -50,7 +44,7 @@ impl Aggregator { // get Branch queue outbox data let rgbpp_requests = poll_service.get_branch_queue_outbox_requests(); - let (rgbpp_requests, queue_cell) = match rgbpp_requests { + let (rgbpp_requests, _queue_cell) = match rgbpp_requests { Ok((rgbpp_requests, queue_cell)) => (rgbpp_requests, queue_cell), Err(e) => { error!("get RGB++ queue data error: {}", e.to_string()); @@ -59,6 +53,11 @@ impl Aggregator { }; // clear queue + if rgbpp_requests.is_empty() { + let _ = self.check_storage(); + } else { + let _ = self.create_clear_queue_outbox_tx(); + } thread::sleep(poll_interval); } diff --git a/aggregator/util/storage/src/lib.rs b/aggregator/util/storage/src/lib.rs index 0344cfc8c4..9654c5fa20 100644 --- a/aggregator/util/storage/src/lib.rs +++ b/aggregator/util/storage/src/lib.rs @@ -160,6 +160,14 @@ impl Storage { } } + pub fn get_staged_tx(&self) -> Result, Error> { + // Retrieve the staged transaction hash from the store + self.db + .get(b"staged_tx") + .map(|opt| opt.map(|bytes| H256::from_slice(&bytes).expect("Invalid H256"))) + .map_err(|err| Error::DatabaseError(err.to_string())) + } + /// Records a transaction that has been constructed but not yet confirmed pub fn record_staged_tx(&self, tx_hash: H256) -> Result<(), Error> { self.db From de4e9c8b799eeedefa49bdaad9cb380dfccc636e Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sun, 25 Aug 2024 17:47:33 +0800 Subject: [PATCH 11/28] impl create_clear_queue_outbox_tx. --- .../src/branch_to_rgbpp/clear_queue_outbox.rs | 228 +++++++++++++++++- .../src/branch_to_rgbpp/mod.rs | 2 +- aggregator/aggregator-main/src/lib.rs | 31 ++- .../src/rgbpp_to_branch/mod.rs | 33 +-- 4 files changed, 256 insertions(+), 38 deletions(-) diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs index fddeb85d5d..6691a23542 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/clear_queue_outbox.rs @@ -1,9 +1,33 @@ +use crate::schemas::leap::{Request, Requests}; use crate::Aggregator; +use crate::SIGHASH_TYPE_HASH; -use aggregator_common::error::Error; -use ckb_logger::info; -use ckb_types::H256; +use aggregator_common::{ + error::Error, + utils::{privkey::get_sighash_lock_args_from_privkey, SECP256K1}, +}; +use ckb_jsonrpc_types::TransactionView; +use ckb_logger::{debug, info}; +use ckb_sdk::{ + core::TransactionBuilder, + transaction::{ + builder::{ChangeBuilder, DefaultChangeBuilder}, + handler::HandlerContexts, + input::{InputIterator, TransactionInput}, + signer::{SignContexts, TransactionSigner}, + TransactionBuilderConfiguration, + }, + types::{NetworkInfo, NetworkType, TransactionWithScriptGroups}, + ScriptGroup, +}; +use ckb_types::{ + core::ScriptHashType, + packed::{Byte32, Bytes as PackedBytes, CellInput, Script, WitnessArgs}, + prelude::*, + H256, +}; +use std::collections::HashMap; use std::thread::sleep; use std::time::Duration; @@ -45,8 +69,202 @@ impl Aggregator { Ok(()) } - pub(crate) fn create_clear_queue_outbox_tx(&self) -> Result<(), Error> { - Ok(()) + pub(crate) fn create_clear_queue_outbox_tx( + &self, + requests: Vec, + ) -> Result { + let (queue_cell, queue_cell_data) = self.get_branch_queue_outbox_cell()?; + info!( + "The Branch queue outbox contains {} items that need to be cleared.", + queue_cell_data.outbox().len() + ); + if queue_cell_data.outbox().is_empty() { + return Ok(H256::default()); + } + + // build new queue + let request_ids = vec![]; + let existing_outbox = queue_cell_data + .outbox() + .as_builder() + .set(request_ids) + .build(); + let queue_data = queue_cell_data.as_builder().outbox(existing_outbox).build(); + let queue_witness = Requests::new_builder().set(requests).build(); + + // build inputs + let inputs: Vec = std::iter::once(queue_cell.out_point.clone()) + .map(|out_point| { + CellInput::new_builder() + .previous_output(out_point.into()) + .build() + }) + .collect(); + + // build outputs + let outputs = vec![queue_cell.output.clone().into()]; + let outputs_data = vec![queue_data.as_bytes().pack()]; + + // cell deps + let secp256k1_cell_dep = self.get_branch_cell_dep(SECP256K1)?; + + // build transaction + let mut tx_builder = TransactionBuilder::default(); + tx_builder + .cell_deps(vec![secp256k1_cell_dep]) + .inputs(inputs) + .outputs(outputs) + .outputs_data(outputs_data) + .witness( + WitnessArgs::new_builder() + .input_type(Some(queue_witness.as_bytes()).pack()) + .build() + .as_bytes() + .pack(), + ); + + // group + #[allow(clippy::mutable_key_type)] + let mut lock_groups: HashMap = HashMap::default(); + #[allow(clippy::mutable_key_type)] + let mut type_groups: HashMap = HashMap::default(); + { + let lock_script: Script = queue_cell.output.lock.clone().into(); + lock_groups + .entry(lock_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_lock_script(&lock_script)) + .input_indices + .push(0); + + for (output_idx, output) in tx_builder.get_outputs().clone().iter().enumerate() { + if let Some(type_script) = &output.type_().to_opt() { + type_groups + .entry(type_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_type_script(type_script)) + .output_indices + .push(output_idx); + } + } + + // balance transaction + let network_info = + NetworkInfo::new(NetworkType::Testnet, self.config.branch_uri.clone()); + let fee_rate = self.fee_rate()?; + let configuration = { + let mut config = + TransactionBuilderConfiguration::new_with_network(network_info.clone()) + .map_err(|e| Error::TransactionBuildError(e.to_string()))?; + config.fee_rate = fee_rate; + config + }; + let (capacity_provider_script_args, capacity_provider_key) = + get_sighash_lock_args_from_privkey( + self.config.branch_chain_capacity_provider_key_path.clone(), + )?; + let capacity_provider_script = Script::new_builder() + .code_hash(SIGHASH_TYPE_HASH.pack()) + .hash_type(ScriptHashType::Type.into()) + .args(capacity_provider_script_args.pack()) + .build(); + let mut change_builder = DefaultChangeBuilder::new( + &configuration, + capacity_provider_script.clone(), + Vec::new(), + ); + change_builder.init(&mut tx_builder); + { + let queue_cell_input = TransactionInput { + live_cell: queue_cell.clone().into(), + since: 0, + }; + let _ = change_builder.check_balance(queue_cell_input, &mut tx_builder); + }; + let contexts = HandlerContexts::default(); + let iterator = InputIterator::new(vec![capacity_provider_script], &network_info); + let mut tx_with_groups = { + let mut check_result = None; + for (mut input_index, input) in iterator.enumerate() { + input_index += 1; + let input = input.map_err(|err| { + let msg = format!("failed to find {input_index}-th live cell since {err}"); + Error::Other(msg) + })?; + tx_builder.input(input.cell_input()); + tx_builder.witness(PackedBytes::default()); + + let previous_output = input.previous_output(); + let lock_script = previous_output.lock(); + lock_groups + .entry(lock_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_lock_script(&lock_script)) + .input_indices + .push(input_index); + + if change_builder.check_balance(input, &mut tx_builder) { + let mut script_groups: Vec = lock_groups + .into_values() + .chain(type_groups.into_values()) + .collect(); + + for script_group in script_groups.iter_mut() { + for handler in configuration.get_script_handlers() { + for context in &contexts.contexts { + if handler + .build_transaction( + &mut tx_builder, + script_group, + context.as_ref(), + ) + .map_err(|e| Error::TransactionBuildError(e.to_string()))? + { + break; + } + } + } + } + let tx_view = change_builder.finalize(tx_builder); + + check_result = + Some(TransactionWithScriptGroups::new(tx_view, script_groups)); + break; + } + } + check_result + } + .ok_or_else(|| { + let msg = "live cells are not enough".to_string(); + Error::Other(msg) + })?; + + // sign + let (_, message_queue_key) = get_sighash_lock_args_from_privkey( + self.config + .branch_chain_token_manager_outbox_lock_key_path + .clone(), + )?; + TransactionSigner::new(&network_info) + .sign_transaction( + &mut tx_with_groups, + &SignContexts::new_sighash(vec![message_queue_key, capacity_provider_key]), + ) + .map_err(|e| Error::TransactionSignError(e.to_string()))?; + + // send tx + let tx_json = TransactionView::from(tx_with_groups.get_tx_view().clone()); + debug!( + "clear queue tx: {}", + serde_json::to_string_pretty(&tx_json).unwrap() + ); + let tx_hash = self + .rgbpp_rpc_client + .send_transaction(tx_json.inner, None) + .map_err(|e| { + Error::TransactionSendError(format!("send transaction error: {}", e)) + })?; + info!("clear branch queue outbox tx send: {:?}", tx_hash.pack()); + + Ok(tx_hash) + } } fn wait_for_transaction_packing(&self, tx_hash: H256) -> Result { diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index d0cb622f33..8375038ae7 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -56,7 +56,7 @@ impl Aggregator { if rgbpp_requests.is_empty() { let _ = self.check_storage(); } else { - let _ = self.create_clear_queue_outbox_tx(); + let _ = self.create_clear_queue_outbox_tx(rgbpp_requests); } thread::sleep(poll_interval); diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index 5296cf521c..83b2c6e30a 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -8,9 +8,10 @@ use aggregator_common::{error::Error, utils::encode_udt_amount}; use aggregator_rgbpp_tx::RgbppTxBuilder; use aggregator_storage::Storage; use ckb_app_config::{AggregatorConfig, AssetConfig, LockConfig, ScriptConfig}; -use ckb_logger::info; +use ckb_logger::{info, warn}; use ckb_sdk::rpc::CkbRpcClient as RpcClient; use ckb_types::{ + core::FeeRate, h256, packed::{CellDep, Script}, H256, @@ -85,12 +86,38 @@ impl Aggregator { } } - fn get_branch_script(&self, script_name: &str) -> Result { + fn _get_branch_script(&self, script_name: &str) -> Result { self.branch_scripts .get(script_name) .map(|script_info| script_info.script.clone()) .ok_or_else(|| Error::MissingScriptInfo(script_name.to_string())) } + + fn fee_rate(&self) -> Result { + let value = { + let dynamic = self + .branch_rpc_client + .get_fee_rate_statistics(None) + .map_err(|e| Error::RpcError(format!("get dynamic fee rate error: {}", e)))? + .ok_or_else(|| Error::RpcError("get dynamic fee rate error: None".to_string())) + .map(|resp| resp.median) + .map(Into::into) + .map_err(|e| Error::RpcError(format!("get dynamic fee rate error: {}", e)))?; + info!("CKB fee rate: {} (dynamic)", FeeRate(dynamic)); + if dynamic > CKB_FEE_RATE_LIMIT { + warn!( + "dynamic CKB fee rate {} is too large, it seems unreasonable;\ + so the upper limit {} will be used", + FeeRate(dynamic), + FeeRate(CKB_FEE_RATE_LIMIT) + ); + CKB_FEE_RATE_LIMIT + } else { + dynamic + } + }; + Ok(value) + } } fn get_script_map(scripts: Vec) -> HashMap { diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index 2b158ec023..34f1f89f78 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -2,13 +2,12 @@ mod leap; use crate::schemas::leap::Request; use crate::wait_for_tx_confirmation; -use crate::{Aggregator, CKB_FEE_RATE_LIMIT}; +use crate::Aggregator; use aggregator_common::error::Error; use ckb_channel::Receiver; -use ckb_logger::{error, info, warn}; +use ckb_logger::{error, info}; use ckb_types::{ - core::FeeRate, packed::{CellDep, OutPoint}, prelude::*, H256, @@ -105,33 +104,7 @@ impl Aggregator { } } - fn fee_rate(&self) -> Result { - let value = { - let dynamic = self - .branch_rpc_client - .get_fee_rate_statistics(None) - .map_err(|e| Error::RpcError(format!("get dynamic fee rate error: {}", e)))? - .ok_or_else(|| Error::RpcError("get dynamic fee rate error: None".to_string())) - .map(|resp| resp.median) - .map(Into::into) - .map_err(|e| Error::RpcError(format!("get dynamic fee rate error: {}", e)))?; - info!("CKB fee rate: {} (dynamic)", FeeRate(dynamic)); - if dynamic > CKB_FEE_RATE_LIMIT { - warn!( - "dynamic CKB fee rate {} is too large, it seems unreasonable;\ - so the upper limit {} will be used", - FeeRate(dynamic), - FeeRate(CKB_FEE_RATE_LIMIT) - ); - CKB_FEE_RATE_LIMIT - } else { - dynamic - } - }; - Ok(value) - } - - fn get_branch_cell_dep(&self, script_name: &str) -> Result { + pub(crate) fn get_branch_cell_dep(&self, script_name: &str) -> Result { self.branch_scripts .get(script_name) .map(|script_info| script_info.cell_dep.clone()) From 9ae4998c3aef7bc3329c263b9f6b6a3e47f2cba3 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sun, 25 Aug 2024 22:07:37 +0800 Subject: [PATCH 12/28] impl check request when branch to rgbpp. --- Cargo.lock | 1 + aggregator/aggregator-main/Cargo.toml | 1 + .../src/branch_to_rgbpp/burn_tx.rs | 219 ++++++++++++++++++ .../src/branch_to_rgbpp/mod.rs | 33 ++- aggregator/aggregator-main/src/lib.rs | 1 + .../src/rgbpp_to_branch/mod.rs | 4 +- .../rgbpp-tx/src/rgbpp_to_branch/custodian.rs | 18 +- 7 files changed, 264 insertions(+), 13 deletions(-) create mode 100644 aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs diff --git a/Cargo.lock b/Cargo.lock index e8a4267797..89e878fd9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,7 @@ dependencies = [ "ckb-jsonrpc-types 0.116.1 (git+https://github.com/ethanyuan/ckb?branch=v0.116.1-branch-chain)", "ckb-logger 0.116.1", "ckb-sdk 3.2.1", + "ckb-stop-handler", "ckb-types 0.116.1 (git+https://github.com/ethanyuan/ckb?branch=v0.116.1-branch-chain)", "crossbeam-channel", "hex", diff --git a/aggregator/aggregator-main/Cargo.toml b/aggregator/aggregator-main/Cargo.toml index 6b3abfca38..1d92fba78f 100644 --- a/aggregator/aggregator-main/Cargo.toml +++ b/aggregator/aggregator-main/Cargo.toml @@ -14,6 +14,7 @@ ckb-app-config = { path = "../../util/app-config", version = "= 0.116.1" } ckb-channel = { path = "../../util/channel", version = "= 0.116.1" } ckb-hash = { path = "../../util/hash", version = "= 0.116.1" } ckb-logger = { path = "../../util/logger", version = "= 0.116.1" } +ckb-stop-handler = { path = "../../util/stop-handler", version = "= 0.116.1" } ckb-sdk = { git = "https://github.com/ethanyuan/ckb-sdk-rust", branch = "v3.2.1-branch-chain", features = ["native-tls-vendored"] } ckb-gen-types = { package = "ckb-gen-types", git = "https://github.com/ethanyuan/ckb", branch = "v0.116.1-branch-chain" } diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs new file mode 100644 index 0000000000..655a65a5ab --- /dev/null +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs @@ -0,0 +1,219 @@ +use crate::schemas::leap::{ + Message, MessageUnion, Request, RequestContent, RequestLockArgs, Requests, Transfer, +}; +use crate::Aggregator; +use crate::{CONFIRMATION_THRESHOLD, SIGHASH_TYPE_HASH}; + +use aggregator_common::{ + error::Error, + types::RequestType, + utils::{ + decode_udt_amount, encode_udt_amount, privkey::get_sighash_lock_args_from_privkey, + QUEUE_TYPE, REQUEST_LOCK, SECP256K1, TOKEN_MANAGER_TYPE, + }, +}; +use ckb_jsonrpc_types::TransactionView; +use ckb_logger::{debug, info}; +use ckb_sdk::{ + core::TransactionBuilder, + rpc::ckb_indexer::{Cell, Order}, + traits::{CellQueryOptions, LiveCell}, + transaction::{ + builder::{ChangeBuilder, DefaultChangeBuilder}, + handler::HandlerContexts, + input::{InputIterator, TransactionInput}, + signer::{SignContexts, TransactionSigner}, + TransactionBuilderConfiguration, + }, + types::{NetworkInfo, NetworkType, TransactionWithScriptGroups}, + ScriptGroup, Since, SinceType, +}; +use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; +use ckb_types::h256; +use ckb_types::{ + bytes::Bytes, + core::ScriptHashType, + packed::{Byte32, Bytes as PackedBytes, CellInput, Script, WitnessArgs}, + prelude::*, + H256, +}; +use molecule::prelude::Byte; + +use std::collections::HashMap; +use std::thread::sleep; +use std::time::Duration; + +impl Aggregator { + pub fn collect_branch_requests(&self) -> Result<(), Error> { + info!("Scan Branch requests ..."); + + let stop: CancellationToken = new_tokio_exit_rx(); + + let request_search_option = self.build_request_cell_search_option()?; + let mut cursor = None; + let limit = 10; + + loop { + if stop.is_cancelled() { + info!("Aggregator scan_rgbpp_request received exit signal, exiting now"); + return Ok(()); + } + + let request_cells = self + .branch_rpc_client + .get_cells( + request_search_option.clone().into(), + Order::Asc, + limit.into(), + cursor, + ) + .map_err(|e| Error::LiveCellNotFound(e.to_string()))?; + + if request_cells.objects.is_empty() { + info!("No more request cells found"); + break; + } + cursor = Some(request_cells.last_cursor); + + info!("Found {} request cells", request_cells.objects.len()); + let tip = self + .branch_rpc_client + .get_tip_block_number() + .map_err(|e| Error::RpcError(format!("get tip block number error: {}", e)))? + .value(); + let cells_with_messge = self.check_request(request_cells.objects.clone(), tip); + info!("Found {} valid request cells", cells_with_messge.len()); + if cells_with_messge.is_empty() { + break; + } + } + + Ok(()) + } + + fn build_request_cell_search_option(&self) -> Result { + let request_script = self._get_branch_script(REQUEST_LOCK)?; + Ok(CellQueryOptions::new_lock(request_script)) + } + + fn check_request(&self, cells: Vec, tip: u64) -> Vec<(Cell, Transfer)> { + cells + .into_iter() + .filter_map(|cell| { + let live_cell: LiveCell = cell.clone().into(); + RequestLockArgs::from_slice(&live_cell.output.lock().args().raw_data()) + .ok() + .and_then(|args| { + let target_request_type_hash = args.request_type_hash(); + info!("target_request_type_hash: {:?}", target_request_type_hash); + + let timeout: u64 = args.timeout().unpack(); + let since = Since::from_raw_value(timeout); + let since_check = + since.extract_metric().map_or(false, |(since_type, value)| { + match since_type { + SinceType::BlockNumber => { + let threshold = if since.is_absolute() { + value + } else { + cell.block_number.value() + value + }; + tip + CONFIRMATION_THRESHOLD < threshold + } + _ => false, + } + }); + + let content = args.content(); + let target_chain_id: Bytes = content.target_chain_id().raw_data(); + info!("target_chain_id: {:?}", target_chain_id); + let request_type = content.request_type(); + + let (check_message, transfer) = { + let message = content.message(); + let message_union = message.to_enum(); + match message_union { + MessageUnion::Transfer(transfer) => { + let transfer_amount: u128 = transfer.amount().unpack(); + + let check_asset = if cell.output.type_.is_some() { + let check_amount = cell + .clone() + .output_data + .and_then(|data| decode_udt_amount(data.as_bytes())) + .map_or(false, |amount| { + info!( + "original amount: {:?}, transfer amount: {:?}", + amount, transfer_amount + ); + transfer_amount <= amount + }); + + let check_cell_type = { + let type_args = cell.output.type_.clone().unwrap().args; + let (token_manager_lock_args, _) = + get_sighash_lock_args_from_privkey( + self.config + .branch_chain_token_manager_lock_key_path + .clone(), + ) + .expect("get message queue outbox lock args"); + let token_manager_lock_hash = Script::new_builder() + .code_hash(SIGHASH_TYPE_HASH.pack()) + .hash_type(ScriptHashType::Type.into()) + .args(token_manager_lock_args.pack()) + .build() + .calc_script_hash(); + type_args.into_bytes() + == token_manager_lock_hash.as_bytes() + }; + + let check_transfer_type = { + let transfer_type_hash: H256 = + transfer.asset_type().unpack(); + transfer_type_hash == h256!("0x37b6748d268d4aa62445d546bac1f90ccbc02cbbcecc7831aca3b77d70304e0f") + }; + + check_amount && check_cell_type && check_transfer_type + } else { + let check_amount = { + let capacity: u64 = cell.output.capacity.into(); + transfer_amount <= capacity as u128 + }; + + let check_transfer_type = { + let transfer_type_hash: H256 = + transfer.asset_type().unpack(); + transfer_type_hash == h256!("0x29b0b1a449b0e7fb08881e1d810a6abbedb119e9c4ffc76eebbc757fb214f091") + }; + + check_amount && check_transfer_type + }; + + let lock_hash: H256 = transfer.owner_lock_hash().unpack(); + let check_lock = self.asset_locks.contains_key(&lock_hash); + + (check_asset && check_lock, transfer) + } + } + }; + + let request_type_hash = self + .branch_scripts + .get(TOKEN_MANAGER_TYPE) + .map(|script_info| script_info.script.calc_script_hash()); + + if Some(target_request_type_hash) == request_type_hash + && request_type == Byte::new(RequestType::BranchToCkb as u8) + && check_message + && since_check + { + Some((cell, transfer)) + } else { + None + } + }) + }) + .collect() + } +} diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index 8375038ae7..d42fc54e49 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -1,7 +1,8 @@ +mod burn_tx; mod clear_queue_outbox; pub use crate::schemas::leap::{CrossChainQueue, Request, Requests}; -use crate::{Aggregator, SIGHASH_TYPE_HASH}; +use crate::{wait_for_tx_confirmation, Aggregator, SIGHASH_TYPE_HASH}; use aggregator_common::{error::Error, utils::privkey::get_sighash_lock_args_from_privkey}; use ckb_channel::Receiver; @@ -14,10 +15,12 @@ use ckb_types::{ core::ScriptHashType, packed::{Byte32, OutPoint, Script}, prelude::*, + H256, }; use std::collections::HashSet; use std::thread; +use std::time::Duration; impl Aggregator { /// Collect Branch requests and send them to the RGB++ chain @@ -25,7 +28,6 @@ impl Aggregator { info!("Branch Aggregator service started ..."); let poll_interval = self.poll_interval; - let poll_service: Aggregator = self.clone(); loop { match stop_rx.try_recv() { @@ -43,7 +45,7 @@ impl Aggregator { } // get Branch queue outbox data - let rgbpp_requests = poll_service.get_branch_queue_outbox_requests(); + let rgbpp_requests = self.get_branch_queue_outbox_requests(); let (rgbpp_requests, _queue_cell) = match rgbpp_requests { Ok((rgbpp_requests, queue_cell)) => (rgbpp_requests, queue_cell), Err(e) => { @@ -56,7 +58,26 @@ impl Aggregator { if rgbpp_requests.is_empty() { let _ = self.check_storage(); } else { - let _ = self.create_clear_queue_outbox_tx(rgbpp_requests); + let clear_queue_tx = self.create_clear_queue_outbox_tx(rgbpp_requests); + let clear_queue_tx = match clear_queue_tx { + Ok(clear_queue_tx) => clear_queue_tx, + Err(e) => { + error!("{}", e.to_string()); + continue; + } + }; + match wait_for_tx_confirmation( + self.branch_rpc_client.clone(), + H256(clear_queue_tx.0), + Duration::from_secs(600), + ) { + Ok(()) => {} + Err(e) => error!("{}", e.to_string()), + } + } + + if let Err(e) = self.collect_branch_requests() { + info!("Aggregator collect Branch requests: {:?}", e); } thread::sleep(poll_interval); @@ -126,10 +147,6 @@ impl Aggregator { .branch_chain_token_manager_outbox_lock_key_path .clone(), )?; - info!( - "message_queue_outbox_lock_args: {:?}", - message_queue_outbox_lock_args.pack().to_string() - ); let message_queue_outbox_lock = Script::new_builder() .code_hash(SIGHASH_TYPE_HASH.pack()) .hash_type(ScriptHashType::Type.into()) diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index 83b2c6e30a..94ddccf9a9 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -25,6 +25,7 @@ use std::time::Duration; pub const SIGHASH_TYPE_HASH: H256 = h256!("0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8"); const CKB_FEE_RATE_LIMIT: u64 = 5000; +const CONFIRMATION_THRESHOLD: u64 = 24; /// Aggregator #[derive(Clone)] diff --git a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs index 34f1f89f78..8a1adb8b6e 100644 --- a/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs +++ b/aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs @@ -96,8 +96,8 @@ impl Aggregator { } } - if let Err(e) = poll_service.rgbpp_tx_builder.collect_rgbpp_request() { - info!("Aggregator: {:?}", e); + if let Err(e) = poll_service.rgbpp_tx_builder.collect_rgbpp_requests() { + info!("Aggregator collect RGB++ requests: {:?}", e); } thread::sleep(poll_interval); diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs index 56cce0e7ad..22da6f71e5 100644 --- a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/custodian.rs @@ -43,8 +43,8 @@ use std::thread::sleep; use std::time::Duration; impl RgbppTxBuilder { - pub fn collect_rgbpp_request(&self) -> Result<(), Error> { - info!("Scan RGB++ Request ..."); + pub fn collect_rgbpp_requests(&self) -> Result<(), Error> { + info!("Scan RGB++ requests ..."); let stop: CancellationToken = new_tokio_exit_rx(); @@ -396,11 +396,23 @@ impl RgbppTxBuilder { ); transfer_amount <= amount }); + + let check_asset = + if let Some(ref type_script) = cell.output.type_ { + let type_script: Script = type_script.clone().into(); + transfer.asset_type() == type_script.calc_script_hash() + } else { + false + }; + let lock_hash: H256 = transfer.owner_lock_hash().unpack(); let type_hash: H256 = transfer.asset_type().unpack(); let check_lock = self.asset_locks.contains_key(&lock_hash); let check_type = self.asset_types.contains_key(&type_hash); - (check_amount && check_lock && check_type, transfer) + ( + check_amount && check_asset && check_lock && check_type, + transfer, + ) } } }; From 72435635d07c3f6e55dc0e5a5a0563c7455a67d7 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Sun, 25 Aug 2024 23:14:05 +0800 Subject: [PATCH 13/28] update spec: deploy token manager in genesis. --- aggregator/aggregator-main/src/lib.rs | 9 ++++---- aggregator/util/common/src/utils/mod.rs | 1 + resource/always_success | Bin 0 -> 512 bytes resource/build.rs | 1 + resource/ckb-aggregator.toml | 27 ++++++++++++++++++++---- resource/specs/dev.toml | 4 ++++ spec/src/lib.rs | 27 +++++++++++++++++++----- 7 files changed, 56 insertions(+), 13 deletions(-) create mode 100755 resource/always_success diff --git a/aggregator/aggregator-main/src/lib.rs b/aggregator/aggregator-main/src/lib.rs index 94ddccf9a9..4f62db24c2 100644 --- a/aggregator/aggregator-main/src/lib.rs +++ b/aggregator/aggregator-main/src/lib.rs @@ -212,8 +212,8 @@ mod tests { #[test] fn calc_script() { let code_hash = "00000000000000000000000000000000000000000000000000545950455f4944"; - let args = "57fdfd0617dcb74d1287bb78a7368a3a4bf9a790cfdcf5c1a105fd7cb406de0d"; - let script_hash = "6283a479a3cf5d4276cd93594de9f1827ab9b55c7b05b3d28e4c2e0a696cfefd"; + let args = "314f67c0ffd0c6fbffe886f03c6b00b42e4e66e3e71d32a66b8a38d69e6a4250"; + let target_script_hash = "9c6933d977360f115a3e9cd5a2e0e475853681b80d775d93ad0f8969da343e56"; let code_hash = H256::from_str(code_hash).unwrap(); let args = Bytes::from(hex::decode(args).unwrap()); @@ -224,11 +224,12 @@ mod tests { .args(args.pack()) .build(); - println!("{:?}", script.calc_script_hash()); + let script_hash: H256 = script.calc_script_hash().unpack(); + println!("script_hash: {:?}", script_hash.to_string()); assert_eq!( script.calc_script_hash().as_bytes(), - Bytes::from(hex::decode(script_hash).unwrap()) + Bytes::from(hex::decode(target_script_hash).unwrap()) ); } } diff --git a/aggregator/util/common/src/utils/mod.rs b/aggregator/util/common/src/utils/mod.rs index 4bbec7875b..ed1cfa8df5 100644 --- a/aggregator/util/common/src/utils/mod.rs +++ b/aggregator/util/common/src/utils/mod.rs @@ -3,6 +3,7 @@ pub mod privkey; pub const SECP256K1: &str = "secp256k1_blake160"; pub const XUDT: &str = "xudt"; pub const REQUEST_LOCK: &str = "request_lock"; +pub const TOKEN_MANAGER_TYPE: &str = "token_manager_type"; pub const QUEUE_TYPE: &str = "queue_type"; pub fn decode_udt_amount(data: &[u8]) -> Option { diff --git a/resource/always_success b/resource/always_success new file mode 100755 index 0000000000000000000000000000000000000000..92ba5e0cc624deca99a2b588d597584990059623 GIT binary patch literal 512 zcmb<-^>JfjWMqH=CWg-pAl?=*$-v+MlYlA#Q4S0i3`{_hg@GBYiWQ~?OvB8A(C46X zFq%OCA`tkrB!vw{AASy08v~_Oh#rtQh|i3y0Ls1(rKQm1VESRg06@Va AY5)KL literal 0 HcmV?d00001 diff --git a/resource/build.rs b/resource/build.rs index 0bbe96d9a2..d48c513906 100644 --- a/resource/build.rs +++ b/resource/build.rs @@ -22,6 +22,7 @@ fn main() { "default.db-options", "xudt_rce", "request-cell-lock", + "always_success", ] { bundled .add_file(f, Compression::Gzip) diff --git a/resource/ckb-aggregator.toml b/resource/ckb-aggregator.toml index 39264f041a..c72b513cf1 100644 --- a/resource/ckb-aggregator.toml +++ b/resource/ckb-aggregator.toml @@ -52,7 +52,7 @@ branch_chain_capacity_provider_key_path = "../dev/branch_chain_capacity_provider branch_chain_token_manager_lock_key_path = "../dev/branch_chain_token_manager_key" branch_chain_token_manager_outbox_lock_key_path = "../dev/branch_chain_token_manager_outbox_key" -[[aggregator.gbpp_scripts]] +[[aggregator.rgbpp_scripts]] script_name = "secp256k1_blake160" script = ''' { @@ -142,7 +142,7 @@ cell_dep = ''' "dep_type": "dep_group", "out_point": { "index": "0x0", - "tx_hash": "0xdd4fea4dc94ae5b3bd5a8a169f7edf6ea731689eec39b94f395a1a8ea1321205" + "tx_hash": "0x3e06cbfb8d09033449d6c2b49a5cda673b6640de7b2fabaa0a53d7c52722a20f" } } ''' @@ -161,7 +161,7 @@ cell_dep = ''' "dep_type": "code", "out_point": { "index": "0x5", - "tx_hash": "0x7f1180615b86b8216cfb4a9b5421828c41f766f8d3111239e63d253d2ad81381" + "tx_hash": "0xe0606c8041a51eae120d6fd7e3b0aa9966f2a2c05ca2921ea556f6e7de1fff4b" } } ''' @@ -180,7 +180,26 @@ cell_dep = ''' "dep_type": "code", "out_point": { "index": "0x6", - "tx_hash": "0x7f1180615b86b8216cfb4a9b5421828c41f766f8d3111239e63d253d2ad81381" + "tx_hash": "0xe0606c8041a51eae120d6fd7e3b0aa9966f2a2c05ca2921ea556f6e7de1fff4b" + } +} +''' + +[[aggregator.branch_scripts]] +script_name = "token_manager_type" +script = ''' +{ + "args": "0x4242", + "code_hash": "0x9c6933d977360f115a3e9cd5a2e0e475853681b80d775d93ad0f8969da343e56", + "hash_type": "type" +} +''' +cell_dep = ''' +{ + "dep_type": "code", + "out_point": { + "index": "0x7", + "tx_hash": "0xe0606c8041a51eae120d6fd7e3b0aa9966f2a2c05ca2921ea556f6e7de1fff4b" } } ''' diff --git a/resource/specs/dev.toml b/resource/specs/dev.toml index 771dc700f4..39f565377d 100644 --- a/resource/specs/dev.toml +++ b/resource/specs/dev.toml @@ -44,6 +44,10 @@ capacity = 1_0000_0000 file = { bundled = "request-cell-lock" } create_type_id = true capacity = 1_0000_0000 +[[genesis.system_cells]] +file = { bundled = "always_success" } +create_type_id = true +capacity = 1_0000_0000 [genesis.system_cells_lock] code_hash = "0x0000000000000000000000000000000000000000000000000000000000000000" diff --git a/spec/src/lib.rs b/spec/src/lib.rs index 5a47d9c034..4ae247a5fb 100644 --- a/spec/src/lib.rs +++ b/spec/src/lib.rs @@ -67,7 +67,7 @@ pub const OUTPUT_INDEX_SECP256K1_DATA: u64 = 3; /// The output index of SECP256K1/multisig script in the genesis no.0 transaction pub const OUTPUT_INDEX_SECP256K1_BLAKE160_MULTISIG_ALL: u64 = 4; /// The output index of Token Manager script in the genesis no.0 transaction -pub const OUTPUT_INDEX_TOKEN_MANAGER: u64 = 10; +pub const OUTPUT_INDEX_TOKEN_MANAGER: u64 = 11; /// The CKB block chain specification #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] @@ -946,10 +946,27 @@ impl GenesisCell { impl IssuedCell { fn build_output(&self) -> packed::CellOutput { - packed::CellOutput::new_builder() - .lock(self.lock.clone().into()) - .capacity(self.capacity.pack()) - .build() + if self.with_queue { + let type_ = packed::ScriptBuilder::default() + .code_hash( + h256!("0x9c6933d977360f115a3e9cd5a2e0e475853681b80d775d93ad0f8969da343e56") + .pack(), + ) + .hash_type(ScriptHashType::Type.into()) + .args(Bytes::from(b"4242".to_vec()).pack()) + .build(); + + packed::CellOutput::new_builder() + .capacity(self.capacity.pack()) + .lock(self.lock.clone().into()) + .type_(Some(type_).pack()) + .build() + } else { + packed::CellOutput::new_builder() + .lock(self.lock.clone().into()) + .capacity(self.capacity.pack()) + .build() + } } } From f3a5cc1e2dbfdba4cbed0a3472ce6d75911ff838 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Mon, 26 Aug 2024 00:43:42 +0800 Subject: [PATCH 14/28] impl create_burn_tx --- .../src/branch_to_rgbpp/burn_tx.rs | 224 +++++++++++++++++- 1 file changed, 216 insertions(+), 8 deletions(-) diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs index 655a65a5ab..323d30b545 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs @@ -1,15 +1,14 @@ use crate::schemas::leap::{ Message, MessageUnion, Request, RequestContent, RequestLockArgs, Requests, Transfer, }; -use crate::Aggregator; -use crate::{CONFIRMATION_THRESHOLD, SIGHASH_TYPE_HASH}; +use crate::{wait_for_tx_confirmation, Aggregator, CONFIRMATION_THRESHOLD, SIGHASH_TYPE_HASH}; use aggregator_common::{ error::Error, types::RequestType, utils::{ - decode_udt_amount, encode_udt_amount, privkey::get_sighash_lock_args_from_privkey, - QUEUE_TYPE, REQUEST_LOCK, SECP256K1, TOKEN_MANAGER_TYPE, + decode_udt_amount, privkey::get_sighash_lock_args_from_privkey, REQUEST_LOCK, SECP256K1, + TOKEN_MANAGER_TYPE, XUDT, }, }; use ckb_jsonrpc_types::TransactionView; @@ -20,8 +19,7 @@ use ckb_sdk::{ traits::{CellQueryOptions, LiveCell}, transaction::{ builder::{ChangeBuilder, DefaultChangeBuilder}, - handler::HandlerContexts, - input::{InputIterator, TransactionInput}, + input::TransactionInput, signer::{SignContexts, TransactionSigner}, TransactionBuilderConfiguration, }, @@ -33,14 +31,13 @@ use ckb_types::h256; use ckb_types::{ bytes::Bytes, core::ScriptHashType, - packed::{Byte32, Bytes as PackedBytes, CellInput, Script, WitnessArgs}, + packed::{Byte32, Bytes as PackedBytes, CellInput, CellOutput, Script, WitnessArgs}, prelude::*, H256, }; use molecule::prelude::Byte; use std::collections::HashMap; -use std::thread::sleep; use std::time::Duration; impl Aggregator { @@ -86,11 +83,222 @@ impl Aggregator { if cells_with_messge.is_empty() { break; } + + let burn_tx = self.create_burn_tx(cells_with_messge)?; + match wait_for_tx_confirmation( + self.branch_rpc_client.clone(), + burn_tx, + Duration::from_secs(15), + ) { + Ok(()) => info!("Transaction confirmed"), + Err(e) => info!("{}", e.to_string()), + } } Ok(()) } + pub fn create_burn_tx(&self, request_cells: Vec<(Cell, Transfer)>) -> Result { + // get queue cell + let (queue_cell, queue_cell_data) = self.get_branch_queue_outbox_cell()?; + + // build new queue + let mut request_ids = vec![]; + let mut requests = vec![]; + for (cell, transfer) in request_cells.clone() { + let request_content = RequestContent::new_builder() + .request_type(Byte::new(RequestType::BranchToCkb as u8)) + .message( + Message::new_builder() + .set(MessageUnion::Transfer(transfer)) + .build(), + ) + .build(); + let request = Request::new_builder() + .request_cell(cell.out_point.into()) + .request_content(request_content) + .build(); + let request_id = request.as_bytes().pack().calc_raw_data_hash(); + request_ids.push(request_id); + requests.push(request); + } + + if !queue_cell_data.outbox().is_empty() { + return Err(Error::QueueOutboxHasUnprocessedRequests); + } + let existing_outbox = queue_cell_data + .outbox() + .as_builder() + .extend(request_ids) + .build(); + let queue_data = queue_cell_data.as_builder().outbox(existing_outbox).build(); + let queue_witness = Requests::new_builder().set(requests).build(); + + // build capacity provider lock + let (capacity_provider_lock_args, _) = get_sighash_lock_args_from_privkey( + self.config.branch_chain_capacity_provider_key_path.clone(), + )?; + let capacity_provider_lock = Script::new_builder() + .code_hash(SIGHASH_TYPE_HASH.pack()) + .hash_type(ScriptHashType::Type.into()) + .args(capacity_provider_lock_args.pack()) + .build(); + + // build inputs + let inputs: Vec = std::iter::once(queue_cell.out_point.clone()) + .chain(request_cells.iter().map(|(cell, _)| cell.out_point.clone())) + .map(|out_point| { + CellInput::new_builder() + .previous_output(out_point.into()) + .build() + }) + .collect(); + + // build outputs + let mut outputs = vec![queue_cell.output.clone().into()]; + let mut outputs_data = vec![queue_data.as_bytes().pack()]; + for (cell, _) in &request_cells { + if cell.output.type_.is_some() { + let output: CellOutput = cell.output.clone().into(); + let output = output + .as_builder() + .lock(capacity_provider_lock.clone()) + .type_(None::