diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs index ad5a1bc335..2fda1f634b 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs @@ -12,7 +12,7 @@ use aggregator_common::{ }, }; use ckb_jsonrpc_types::TransactionView; -use ckb_logger::{info, debug}; +use ckb_logger::{debug, info}; use ckb_sdk::{ core::TransactionBuilder, rpc::ckb_indexer::{Cell, Order}, diff --git a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs index ae70467ec3..76fb6a297e 100644 --- a/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs +++ b/aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs @@ -6,14 +6,17 @@ use crate::{wait_for_tx_confirmation, Aggregator, SIGHASH_TYPE_HASH}; use aggregator_common::{error::Error, utils::privkey::get_sighash_lock_args_from_privkey}; use ckb_channel::Receiver; -use ckb_logger::{error, info}; +use ckb_logger::{debug, error, info}; use ckb_sdk::{ - rpc::ckb_indexer::{Cell, Order}, + rpc::{ + ckb_indexer::{Cell, Order}, + ResponseFormatGetter, + }, traits::{CellQueryOptions, LiveCell, MaturityOption, PrimaryScriptType, QueryOrder}, }; use ckb_types::{ core::ScriptHashType, - packed::{Byte32, OutPoint, Script}, + packed::{Byte32, Bytes as PackedBytes, OutPoint, Script, Transaction, WitnessArgs}, prelude::*, H256, }; @@ -92,11 +95,13 @@ impl Aggregator { } pub fn poll_rgbpp_unlock(&self, stop_rx: Receiver<()>) { - info!("RGB++ Unlock service started ..."); + info!("RGB++ Unlock loop started ..."); let poll_interval = self.poll_interval; loop { + thread::sleep(poll_interval); + match stop_rx.try_recv() { Ok(_) => { info!("Aggregator received exit signal, stopped"); @@ -113,8 +118,16 @@ impl Aggregator { let pending_request = self.store.get_earliest_pending(); match pending_request { + Ok(None) => { + info!("No pending request found"); + } + Err(e) => { + error!("{}", e.to_string()); + continue; + } Ok(Some((height, tx_hash))) => { - let tip = self.rgbpp_rpc_client.get_tip_block_number(); + info!("Found pending request at height: {}", height); + let tip = self.branch_rpc_client.get_tip_block_number(); let tip: u64 = match tip { Ok(tip) => tip.into(), Err(e) => { @@ -123,9 +136,20 @@ impl Aggregator { } }; if height + self.config.challenge_period < tip { - let inbox_tx = self - .rgbpp_tx_builder - .create_add_inbox_tx(height, tx_hash.0.into()); + let requests_in_witness = if let Ok((_, requests_in_witness)) = + self.get_burn_tx_requests_in_witness(tx_hash.clone()) + { + requests_in_witness + } else { + error!("Burn tx requests not found in witness"); + continue; + }; + + let inbox_tx = self.rgbpp_tx_builder.create_add_queue_inbox_tx( + height, + tx_hash.0.into(), + requests_in_witness.raw_data(), + ); let inbox_tx = match inbox_tx { Ok(inbox_tx) => { H256::from_slice(inbox_tx.as_bytes()).expect("unlock tx to H256") @@ -140,32 +164,23 @@ impl Aggregator { H256(inbox_tx.0), Duration::from_secs(30), ) { - Ok(height) => { + Ok(_) => { self.store - .commit_branch_request(height, inbox_tx) + .commit_branch_request(height) .expect("commit branch request"); } Err(e) => error!("{}", e.to_string()), } } } - Ok(None) => { - info!("No pending request found"); - } - Err(e) => { - error!("{}", e.to_string()); - continue; - } } - - thread::sleep(poll_interval); } } fn get_branch_queue_outbox_requests(&self) -> Result<(Vec, OutPoint), Error> { let (queue_cell, queue_cell_data) = self.get_branch_queue_outbox_cell()?; if queue_cell_data.outbox().is_empty() { - info!("No requests in queue"); + debug!("No requests in queue"); return Ok((vec![], OutPoint::default())); } let request_ids: Vec = queue_cell_data.outbox().into_iter().collect(); @@ -176,7 +191,7 @@ impl Aggregator { let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| { Error::TransactionParseError(format!("get requests from witness error: {}", e)) })?; - info!("Found {} requests in witness", requests.len()); + info!("Found {} requests in outbox", requests.len()); // check requests let request_set: HashSet = requests @@ -208,7 +223,7 @@ impl Aggregator { queue_cell.objects.len() ))); } - info!("Found {} queue outbox cell", queue_cell.objects.len()); + debug!("Found {} queue outbox cell", queue_cell.objects.len()); let queue_cell = queue_cell.objects[0].clone(); let queue_live_cell: LiveCell = queue_cell.clone().into(); @@ -248,4 +263,41 @@ impl Aggregator { }; Ok(cell_query_option) } + + pub(crate) fn get_burn_tx_requests_in_witness( + &self, + tx_hash: H256, + ) -> Result<(H256, PackedBytes), Error> { + let tx = self + .branch_rpc_client + .get_transaction(tx_hash.clone()) + .map_err(|e| Error::RpcError(format!("get transaction error: {}", e)))? + .ok_or(Error::RpcError("get transaction error: None".to_string()))? + .transaction + .ok_or(Error::RpcError("get transaction error: None".to_string()))? + .get_value() + .map_err(|e| Error::RpcError(format!("get transaction error: {}", e)))? + .inner; + let tx: Transaction = tx.into(); + let witness = tx + .witnesses() + .get(0 as usize) + .ok_or(Error::TransactionParseError( + "get witness error: None".to_string(), + ))?; + let witness_input_type = WitnessArgs::from_slice(&witness.raw_data()) + .map_err(|e| Error::TransactionParseError(format!("get witness error: {}", e)))? + .input_type() + .to_opt() + .ok_or(Error::TransactionParseError( + "get witness input type error: None".to_string(), + ))?; + + let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| { + Error::TransactionParseError(format!("get requests from witness error: {}", e)) + })?; + info!("Found {} requests in burn tx witness", requests.len()); + + Ok((tx.calc_tx_hash().unpack(), witness_input_type)) + } } diff --git a/aggregator/util/common/src/error.rs b/aggregator/util/common/src/error.rs index 08f94fcd7c..388548e1b5 100644 --- a/aggregator/util/common/src/error.rs +++ b/aggregator/util/common/src/error.rs @@ -10,6 +10,8 @@ pub enum Error { QueueCellDataDecodeError(String), #[error("outbox has unprocessed requests, cannot add new ones")] QueueOutboxHasUnprocessedRequests, + #[error("inbox has unprocessed requests, cannot add new ones")] + QueueInboxHasUnprocessedRequests, #[error("queue cell data error: {0}")] QueueCellDataError(String), #[error("missing script info: {0}")] diff --git a/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/inbox.rs b/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/inbox.rs index d22b24f199..1ab4dc3a06 100644 --- a/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/inbox.rs +++ b/aggregator/util/rgbpp-tx/src/branch_to_rgbpp/inbox.rs @@ -1,23 +1,14 @@ -use crate::schemas::leap::{ - Message, MessageUnion, Request, RequestContent, RequestLockArgs, Requests, Transfer, -}; -use crate::{RgbppTxBuilder, CONFIRMATION_THRESHOLD, SIGHASH_TYPE_HASH}; +use crate::schemas::leap::Requests; +use crate::{RgbppTxBuilder, SIGHASH_TYPE_HASH}; use aggregator_common::{ error::Error, - types::RequestType, - utils::{ - decode_udt_amount, encode_udt_amount, privkey::get_sighash_lock_args_from_privkey, - QUEUE_TYPE, REQUEST_LOCK, SECP256K1, XUDT, - }, + utils::{privkey::get_sighash_lock_args_from_privkey, QUEUE_TYPE, SECP256K1}, }; use ckb_jsonrpc_types::TransactionView; use ckb_logger::{debug, info}; use ckb_sdk::{ core::TransactionBuilder, - rpc::ckb_indexer::{Cell, Order}, - rpc::CkbRpcClient as RpcClient, - traits::{CellQueryOptions, LiveCell}, transaction::{ builder::{ChangeBuilder, DefaultChangeBuilder}, handler::HandlerContexts, @@ -26,35 +17,212 @@ use ckb_sdk::{ TransactionBuilderConfiguration, }, types::{NetworkInfo, NetworkType, TransactionWithScriptGroups}, - ScriptGroup, Since, SinceType, + ScriptGroup, }; -use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; use ckb_types::{ bytes::Bytes, core::ScriptHashType, - packed::{Byte32, Bytes as PackedBytes, CellInput, CellOutput, Script, WitnessArgs}, + packed::{Byte32, Bytes as PackedBytes, CellInput, Script, WitnessArgs}, prelude::*, H256, }; -use molecule::prelude::Byte; use std::collections::HashMap; -use std::thread::sleep; -use std::time::Duration; impl RgbppTxBuilder { - pub fn create_add_inbox_tx( + pub fn create_add_queue_inbox_tx( &self, - request_height: u64, - request_tx: H256, + _request_height: u64, + _request_tx: H256, + requests_in_witness: Bytes, ) -> Result { - let (queue_cell, queue_cell_data) = + let requests_in_witness = Requests::from_slice(&requests_in_witness).map_err(|e| { + Error::TransactionParseError(format!("get requests from witness error: {}", e)) + })?; + let mut request_ids = vec![]; + let mut requests = vec![]; + for request in requests_in_witness { + let request_id = request.as_bytes().pack().calc_raw_data_hash(); + request_ids.push(request_id); + requests.push(request); + } + + let (queue_inbox_cell, queue_inbox_cell_data) = self.get_rgbpp_queue_cell(self.rgbpp_queue_inbox_lock_key_path.clone())?; + if !queue_inbox_cell_data.inbox().is_empty() { + return Err(Error::QueueInboxHasUnprocessedRequests); + } + let existing_inbox = queue_inbox_cell_data + .inbox() + .as_builder() + .extend(request_ids) + .build(); + let queue_inbox_data = queue_inbox_cell_data + .as_builder() + .inbox(existing_inbox) + .build(); + let queue_inbox_witness = Requests::new_builder().set(requests).build(); + + // build inputs + let inputs: Vec = std::iter::once(queue_inbox_cell.out_point.clone()) + .map(|out_point| { + CellInput::new_builder() + .previous_output(out_point.into()) + .build() + }) + .collect(); + + // build outputs + let outputs = vec![queue_inbox_cell.output.clone().into()]; + let outputs_data = vec![queue_inbox_data.as_bytes().pack()]; + + // cell deps + let secp256k1_cell_dep = self.get_rgbpp_cell_dep(SECP256K1)?; + let queue_type_cell_dep = self.get_rgbpp_cell_dep(QUEUE_TYPE)?; + + // build transaction + let mut tx_builder = TransactionBuilder::default(); + tx_builder + .cell_deps(vec![secp256k1_cell_dep, queue_type_cell_dep]) + .inputs(inputs) + .outputs(outputs) + .outputs_data(outputs_data) + .witness( + WitnessArgs::new_builder() + .input_type(Some(queue_inbox_witness.as_bytes()).pack()) + .build() + .as_bytes() + .pack(), + ); + + // group + #[allow(clippy::mutable_key_type)] + let mut lock_groups: HashMap = HashMap::default(); + #[allow(clippy::mutable_key_type)] + let mut type_groups: HashMap = HashMap::default(); + { + let lock_script: Script = queue_inbox_cell.output.lock.clone().into(); + lock_groups + .entry(lock_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_lock_script(&lock_script)) + .input_indices + .push(0); + } + for (output_idx, output) in tx_builder.get_outputs().clone().iter().enumerate() { + if let Some(type_script) = &output.type_().to_opt() { + type_groups + .entry(type_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_type_script(type_script)) + .output_indices + .push(output_idx); + } + } + + // balance transaction + let network_info = NetworkInfo::new(NetworkType::Testnet, self.rgbpp_uri.clone()); + let fee_rate = self.fee_rate()?; + let configuration = { + let mut config = + TransactionBuilderConfiguration::new_with_network(network_info.clone()) + .map_err(|e| Error::TransactionBuildError(e.to_string()))?; + config.fee_rate = fee_rate; + config + }; + let (capacity_provider_script_args, capacity_provider_key) = + get_sighash_lock_args_from_privkey(self.rgbpp_ckb_provider_key_path.clone())?; + let capacity_provider_script = Script::new_builder() + .code_hash(SIGHASH_TYPE_HASH.pack()) + .hash_type(ScriptHashType::Type.into()) + .args(capacity_provider_script_args.pack()) + .build(); + + let mut change_builder = + DefaultChangeBuilder::new(&configuration, capacity_provider_script.clone(), Vec::new()); + change_builder.init(&mut tx_builder); + { + let queue_cell_input = TransactionInput { + live_cell: queue_inbox_cell.clone().into(), + since: 0, + }; + let _ = change_builder.check_balance(queue_cell_input, &mut tx_builder); + }; + let contexts = HandlerContexts::default(); + let iterator = InputIterator::new(vec![capacity_provider_script], &network_info); + let mut tx_with_groups = { + let mut check_result = None; + for (mut input_index, input) in iterator.enumerate() { + input_index += 1; + let input = input.map_err(|err| { + let msg = format!("failed to find {input_index}-th live cell since {err}"); + Error::Other(msg) + })?; + tx_builder.input(input.cell_input()); + tx_builder.witness(PackedBytes::default()); + + let previous_output = input.previous_output(); + let lock_script = previous_output.lock(); + lock_groups + .entry(lock_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_lock_script(&lock_script)) + .input_indices + .push(input_index); + + if change_builder.check_balance(input, &mut tx_builder) { + let mut script_groups: Vec = lock_groups + .into_values() + .chain(type_groups.into_values()) + .collect(); + for script_group in script_groups.iter_mut() { + for handler in configuration.get_script_handlers() { + for context in &contexts.contexts { + if handler + .build_transaction( + &mut tx_builder, + script_group, + context.as_ref(), + ) + .map_err(|e| Error::TransactionBuildError(e.to_string()))? + { + break; + } + } + } + } + let tx_view = change_builder.finalize(tx_builder); + + check_result = Some(TransactionWithScriptGroups::new(tx_view, script_groups)); + break; + } + } + check_result + } + .ok_or_else(|| { + let msg = "live cells are not enough".to_string(); + Error::Other(msg) + })?; + + // sign + let (_, message_queue_inbox_key) = + get_sighash_lock_args_from_privkey(self.rgbpp_queue_inbox_lock_key_path.clone())?; + TransactionSigner::new(&network_info) + .sign_transaction( + &mut tx_with_groups, + &SignContexts::new_sighash(vec![message_queue_inbox_key, capacity_provider_key]), + ) + .map_err(|e| Error::TransactionSignError(e.to_string()))?; + + // send tx + let tx_json = TransactionView::from(tx_with_groups.get_tx_view().clone()); info!( - "RGB++ queue inbox cell: {:?}, data: {:?}", - queue_cell, queue_cell_data + "add queue inbox tx: {}", + serde_json::to_string_pretty(&tx_json).unwrap() ); + let tx_hash = self + .rgbpp_rpc_client + .send_transaction(tx_json.inner, None) + .map_err(|e| Error::TransactionSendError(format!("send transaction error: {}", e)))?; + info!("add queue inbox tx send: {:?}", tx_hash.pack()); - Ok(H256::default()) + Ok(tx_hash) } } diff --git a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs index d4d8f1670a..d3a6f2b651 100644 --- a/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs +++ b/aggregator/util/rgbpp-tx/src/rgbpp_to_branch/mod.rs @@ -5,7 +5,7 @@ pub use crate::schemas::leap::{CrossChainQueue, Request, Requests}; use crate::RgbppTxBuilder; use aggregator_common::error::Error; -use ckb_logger::{info, debug}; +use ckb_logger::{debug, info}; use ckb_sdk::{ rpc::ckb_indexer::{Cell, Order}, traits::LiveCell, diff --git a/aggregator/util/storage/src/lib.rs b/aggregator/util/storage/src/lib.rs index 0a54bde534..ac0487c4f4 100644 --- a/aggregator/util/storage/src/lib.rs +++ b/aggregator/util/storage/src/lib.rs @@ -68,7 +68,7 @@ impl Storage { } /// Updates the status of a specific height to `Commit`, with validation of `tx` - pub fn commit_branch_request(&self, height: u64, expected_tx: H256) -> Result<(), Error> { + pub fn commit_branch_request(&self, height: u64) -> Result<(), Error> { let key = height.to_be_bytes(); let value = self @@ -80,16 +80,6 @@ impl Storage { None => return Err(DatabaseError("Height not found in the database".into())), }; - let stored_tx = H256::from_slice(&value[1..33]) - .map_err(|_| DatabaseError("Failed to parse stored transaction".into()))?; - - // Verify that the provided tx matches the stored tx - if stored_tx != expected_tx { - return Err(DatabaseError( - "Transaction hash does not match the stored value".into(), - )); - } - let mut value = value.to_vec(); // Convert DBVector to Vec for mutability value[0] = BranchRequestStatus::Commit.into(); // Update status to `Commit` self.db @@ -285,7 +275,7 @@ mod tests { // Commit the first request store - .commit_branch_request(1, H256::default()) + .commit_branch_request(1) .expect("failed to commit height 1"); // Check if the earliest pending is now 2 @@ -307,10 +297,10 @@ mod tests { .insert_branch_request(2, H256::default()) .expect("failed to insert request 2"); store - .commit_branch_request(1, H256::default()) + .commit_branch_request(1) .expect("failed to commit height 1"); store - .commit_branch_request(2, H256::default()) + .commit_branch_request(2) .expect("failed to commit height 2"); // Check if there is no pending request left @@ -339,20 +329,6 @@ mod tests { assert_eq!(earliest_pending, Some((1, H256::default()))); } - #[test] - fn test_commit_height_invalid_tx() { - let store = setup_store(); - - // Insert branch requests - store - .insert_branch_request(1, H256::default()) - .expect("failed to insert request 1"); - - // Attempt to commit with an invalid tx - let result = store.commit_branch_request(1, H256::from_slice(&[1u8; 32]).unwrap()); - assert!(result.is_err()); - } - #[test] fn test_get_last_branch_request_empty_store() { let store = setup_store();