Skip to content

Commit

Permalink
impl add rgbpp queue inbox tx building
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanYuan committed Aug 27, 2024
1 parent be08600 commit 5db0992
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 77 deletions.
2 changes: 1 addition & 1 deletion aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use aggregator_common::{
},
};
use ckb_jsonrpc_types::TransactionView;
use ckb_logger::{info, debug};
use ckb_logger::{debug, info};
use ckb_sdk::{
core::TransactionBuilder,
rpc::ckb_indexer::{Cell, Order},
Expand Down
96 changes: 74 additions & 22 deletions aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ use crate::{wait_for_tx_confirmation, Aggregator, SIGHASH_TYPE_HASH};

use aggregator_common::{error::Error, utils::privkey::get_sighash_lock_args_from_privkey};
use ckb_channel::Receiver;
use ckb_logger::{error, info};
use ckb_logger::{debug, error, info};
use ckb_sdk::{
rpc::ckb_indexer::{Cell, Order},
rpc::{
ckb_indexer::{Cell, Order},
ResponseFormatGetter,
},
traits::{CellQueryOptions, LiveCell, MaturityOption, PrimaryScriptType, QueryOrder},
};
use ckb_types::{
core::ScriptHashType,
packed::{Byte32, OutPoint, Script},
packed::{Byte32, Bytes as PackedBytes, OutPoint, Script, Transaction, WitnessArgs},
prelude::*,
H256,
};
Expand Down Expand Up @@ -92,11 +95,13 @@ impl Aggregator {
}

pub fn poll_rgbpp_unlock(&self, stop_rx: Receiver<()>) {
info!("RGB++ Unlock service started ...");
info!("RGB++ Unlock loop started ...");

let poll_interval = self.poll_interval;

loop {
thread::sleep(poll_interval);

match stop_rx.try_recv() {
Ok(_) => {
info!("Aggregator received exit signal, stopped");
Expand All @@ -113,8 +118,16 @@ impl Aggregator {

let pending_request = self.store.get_earliest_pending();
match pending_request {
Ok(None) => {
info!("No pending request found");
}
Err(e) => {
error!("{}", e.to_string());
continue;
}
Ok(Some((height, tx_hash))) => {
let tip = self.rgbpp_rpc_client.get_tip_block_number();
info!("Found pending request at height: {}", height);
let tip = self.branch_rpc_client.get_tip_block_number();
let tip: u64 = match tip {
Ok(tip) => tip.into(),
Err(e) => {
Expand All @@ -123,9 +136,20 @@ impl Aggregator {
}
};
if height + self.config.challenge_period < tip {
let inbox_tx = self
.rgbpp_tx_builder
.create_add_inbox_tx(height, tx_hash.0.into());
let requests_in_witness = if let Ok((_, requests_in_witness)) =
self.get_burn_tx_requests_in_witness(tx_hash.clone())
{
requests_in_witness
} else {
error!("Burn tx requests not found in witness");
continue;
};

let inbox_tx = self.rgbpp_tx_builder.create_add_queue_inbox_tx(
height,
tx_hash.0.into(),
requests_in_witness.raw_data(),
);
let inbox_tx = match inbox_tx {
Ok(inbox_tx) => {
H256::from_slice(inbox_tx.as_bytes()).expect("unlock tx to H256")
Expand All @@ -140,32 +164,23 @@ impl Aggregator {
H256(inbox_tx.0),
Duration::from_secs(30),
) {
Ok(height) => {
Ok(_) => {
self.store
.commit_branch_request(height, inbox_tx)
.commit_branch_request(height)
.expect("commit branch request");
}
Err(e) => error!("{}", e.to_string()),
}
}
}
Ok(None) => {
info!("No pending request found");
}
Err(e) => {
error!("{}", e.to_string());
continue;
}
}

thread::sleep(poll_interval);
}
}

fn get_branch_queue_outbox_requests(&self) -> Result<(Vec<Request>, OutPoint), Error> {
let (queue_cell, queue_cell_data) = self.get_branch_queue_outbox_cell()?;
if queue_cell_data.outbox().is_empty() {
info!("No requests in queue");
debug!("No requests in queue");
return Ok((vec![], OutPoint::default()));
}
let request_ids: Vec<Byte32> = queue_cell_data.outbox().into_iter().collect();
Expand All @@ -176,7 +191,7 @@ impl Aggregator {
let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| {
Error::TransactionParseError(format!("get requests from witness error: {}", e))
})?;
info!("Found {} requests in witness", requests.len());
info!("Found {} requests in outbox", requests.len());

// check requests
let request_set: HashSet<Byte32> = requests
Expand Down Expand Up @@ -208,7 +223,7 @@ impl Aggregator {
queue_cell.objects.len()
)));
}
info!("Found {} queue outbox cell", queue_cell.objects.len());
debug!("Found {} queue outbox cell", queue_cell.objects.len());
let queue_cell = queue_cell.objects[0].clone();

let queue_live_cell: LiveCell = queue_cell.clone().into();
Expand Down Expand Up @@ -248,4 +263,41 @@ impl Aggregator {
};
Ok(cell_query_option)
}

pub(crate) fn get_burn_tx_requests_in_witness(
&self,
tx_hash: H256,
) -> Result<(H256, PackedBytes), Error> {
let tx = self
.branch_rpc_client
.get_transaction(tx_hash.clone())
.map_err(|e| Error::RpcError(format!("get transaction error: {}", e)))?
.ok_or(Error::RpcError("get transaction error: None".to_string()))?
.transaction
.ok_or(Error::RpcError("get transaction error: None".to_string()))?
.get_value()
.map_err(|e| Error::RpcError(format!("get transaction error: {}", e)))?
.inner;
let tx: Transaction = tx.into();
let witness = tx
.witnesses()
.get(0 as usize)
.ok_or(Error::TransactionParseError(
"get witness error: None".to_string(),
))?;
let witness_input_type = WitnessArgs::from_slice(&witness.raw_data())
.map_err(|e| Error::TransactionParseError(format!("get witness error: {}", e)))?
.input_type()
.to_opt()
.ok_or(Error::TransactionParseError(
"get witness input type error: None".to_string(),
))?;

let requests = Requests::from_slice(&witness_input_type.raw_data()).map_err(|e| {
Error::TransactionParseError(format!("get requests from witness error: {}", e))
})?;
info!("Found {} requests in burn tx witness", requests.len());

Ok((tx.calc_tx_hash().unpack(), witness_input_type))
}
}
2 changes: 2 additions & 0 deletions aggregator/util/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Error {
QueueCellDataDecodeError(String),
#[error("outbox has unprocessed requests, cannot add new ones")]
QueueOutboxHasUnprocessedRequests,
#[error("inbox has unprocessed requests, cannot add new ones")]
QueueInboxHasUnprocessedRequests,
#[error("queue cell data error: {0}")]
QueueCellDataError(String),
#[error("missing script info: {0}")]
Expand Down
Loading

0 comments on commit 5db0992

Please sign in to comment.