Skip to content

Commit

Permalink
add create_unlock_tx.
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanYuan committed Aug 26, 2024
1 parent cfa8f63 commit 7a5a694
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aggregator/aggregator-main/src/branch_to_rgbpp/burn_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Aggregator {
burn_tx,
Duration::from_secs(15),
) {
Ok(()) => info!("Transaction confirmed"),
Ok(_) => info!("Transaction confirmed"),
Err(e) => info!("{}", e.to_string()),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ use std::time::Duration;
impl Aggregator {
pub(crate) fn check_storage(&self) -> Result<(), Error> {
if let Some(tx_hash) = self.store.get_staged_tx()? {
let hash = H256::from_slice(tx_hash.as_bytes())
.expect("Failed to convert staged transaction hash to H256");
let tx = self
.branch_rpc_client
.get_transaction(hash.clone())
.get_transaction(tx_hash.clone())
.map_err(|e| Error::RpcError(format!("Failed to get transaction: {}", e)))?;
match tx {
Some(tx) => {
Expand All @@ -53,7 +51,7 @@ impl Aggregator {
}
}
None => {
let height = self.wait_for_transaction_packing(hash)?;
let height = self.wait_for_transaction_packing(tx_hash.clone())?;
self.store.insert_branch_request(height.into(), tx_hash)?;
self.store.clear_staged_tx()?;
}
Expand Down Expand Up @@ -255,6 +253,8 @@ impl Aggregator {
"clear queue tx: {}",
serde_json::to_string_pretty(&tx_json).unwrap()
);
// record staged tx
self.store.record_staged_tx(tx_json.hash)?;
let tx_hash = self
.rgbpp_rpc_client
.send_transaction(tx_json.inner, None)
Expand Down
58 changes: 56 additions & 2 deletions aggregator/aggregator-main/src/branch_to_rgbpp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::collections::HashSet;
use std::thread;
use std::time::Duration;

const CHALLENGE_PERIOD: u64 = 80; // blocks

impl Aggregator {
/// Collect Branch requests and send them to the RGB++ chain
pub fn poll_branch_requests(&self, stop_rx: Receiver<()>) {
Expand Down Expand Up @@ -68,10 +70,17 @@ impl Aggregator {
};
match wait_for_tx_confirmation(
self.branch_rpc_client.clone(),
H256(clear_queue_tx.0),
clear_queue_tx.clone(),
Duration::from_secs(600),
) {
Ok(()) => {}
Ok(height) => {
self.store
.insert_branch_request(height, clear_queue_tx)
.expect("Failed to insert clear queue transaction into storage");
self.store.clear_staged_tx().expect(
"Failed to clear staged transactions after successful clear queue",
);
}
Err(e) => error!("{}", e.to_string()),
}
}
Expand Down Expand Up @@ -104,6 +113,51 @@ impl Aggregator {
}
}

let pending_request = self.store.get_earliest_pending();
match pending_request {
Ok(Some(request)) => {
let tip = self.rgbpp_rpc_client.get_tip_block_number();
let tip: u64 = match tip {
Ok(tip) => tip.into(),
Err(e) => {
error!("{}", e.to_string());
continue;
}
};
if request.0 + CHALLENGE_PERIOD < tip {
let unlock_tx = self.rgbpp_tx_builder.create_unlock_tx();
let unlock_tx = match unlock_tx {
Ok(unlock_tx) => {
H256::from_slice(unlock_tx.as_bytes()).expect("unlock tx to H256")
}
Err(e) => {
error!("{}", e.to_string());
continue;
}
};
match wait_for_tx_confirmation(
self.rgbpp_rpc_client.clone(),
H256(unlock_tx.0),
Duration::from_secs(600),
) {
Ok(height) => {
self.store
.commit_branch_request(height, unlock_tx)
.expect("commit branch request");
}
Err(e) => error!("{}", e.to_string()),
}
}
}
Ok(None) => {
info!("No pending request found");
}
Err(e) => {
error!("{}", e.to_string());
continue;
}
}

thread::sleep(poll_interval);
}
}
Expand Down
26 changes: 19 additions & 7 deletions aggregator/aggregator-main/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,23 +181,35 @@ fn get_asset_locks(lock_configs: Vec<LockConfig>) -> HashMap<H256, Script> {
}

fn wait_for_tx_confirmation(
_client: RpcClient,
_tx_hash: H256,
client: RpcClient,
tx_hash: H256,
timeout: Duration,
) -> Result<(), Error> {
) -> Result<u64, Error> {
let start = std::time::Instant::now();

loop {
if true {
sleep(Duration::from_secs(8));
return Ok(());
let tx = client
.get_transaction(tx_hash.clone())
.map_err(|e| Error::RpcError(format!("Failed to get transaction: {}", e)))?;
match tx {
Some(tx) => {
if let Some(height) = tx.tx_status.block_number {
return Ok(height.into());
}
}
None => {
return Err(Error::TransactionNotFound(format!(
"Transaction not found: {:?}",
tx_hash
)));
}
}

if start.elapsed() > timeout {
return Err(Error::TimedOut(
"Transaction confirmation timed out".to_string(),
));
}
sleep(Duration::from_secs(2));
}
}

Expand Down
4 changes: 2 additions & 2 deletions aggregator/aggregator-main/src/rgbpp_to_branch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Aggregator {
leap_tx,
Duration::from_secs(600),
) {
Ok(()) => {}
Ok(_) => {}
Err(e) => error!("{}", e.to_string()),
}

Expand All @@ -91,7 +91,7 @@ impl Aggregator {
H256(update_queue_tx.0),
Duration::from_secs(600),
) {
Ok(()) => {}
Ok(_) => {}
Err(e) => error!("{}", e.to_string()),
}
}
Expand Down
8 changes: 5 additions & 3 deletions aggregator/util/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ pub enum Error {
TransactionParseError(String),
#[error("rpc error: {0}")]
RpcError(String),
#[error("Timeout: {0}")]
#[error("timeout: {0}")]
TimedOut(String),
#[error("Lock not found: {0}")]
#[error("lock not found: {0}")]
LockNotFound(String),
#[error("Branch script not found: {0}")]
#[error("branch script not found: {0}")]
BranchScriptNotFound(String),
#[error("database error: {0}")]
DatabaseError(String),
#[error("transaction not found: {0}")]
TransactionNotFound(String),
#[error("other error: {0}")]
Other(String),
}
2 changes: 1 addition & 1 deletion aggregator/util/rgbpp-tx/src/branch_to_rgbpp/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@

mod unlock;
49 changes: 49 additions & 0 deletions aggregator/util/rgbpp-tx/src/branch_to_rgbpp/unlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::schemas::leap::{
Message, MessageUnion, Request, RequestContent, RequestLockArgs, Requests, Transfer,
};
use crate::{RgbppTxBuilder, CONFIRMATION_THRESHOLD, SIGHASH_TYPE_HASH};

use aggregator_common::{
error::Error,
types::RequestType,
utils::{
decode_udt_amount, encode_udt_amount, privkey::get_sighash_lock_args_from_privkey,
QUEUE_TYPE, REQUEST_LOCK, SECP256K1, XUDT,
},
};
use ckb_jsonrpc_types::TransactionView;
use ckb_logger::{debug, info};
use ckb_sdk::{
core::TransactionBuilder,
rpc::ckb_indexer::{Cell, Order},
rpc::CkbRpcClient as RpcClient,
traits::{CellQueryOptions, LiveCell},
transaction::{
builder::{ChangeBuilder, DefaultChangeBuilder},
handler::HandlerContexts,
input::{InputIterator, TransactionInput},
signer::{SignContexts, TransactionSigner},
TransactionBuilderConfiguration,
},
types::{NetworkInfo, NetworkType, TransactionWithScriptGroups},
ScriptGroup, Since, SinceType,
};
use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken};
use ckb_types::{
bytes::Bytes,
core::ScriptHashType,
packed::{Byte32, Bytes as PackedBytes, CellInput, CellOutput, Script, WitnessArgs},
prelude::*,
H256,
};
use molecule::prelude::Byte;

use std::collections::HashMap;
use std::thread::sleep;
use std::time::Duration;

impl RgbppTxBuilder {
pub fn create_unlock_tx(&self) -> Result<H256, Error> {
Ok(H256::default())
}
}
2 changes: 1 addition & 1 deletion aggregator/util/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
aggregator-common = { path = "../common", version = "0.1.0" }

ckb-types = { path = "../../../util/types", version = "= 0.116.1" }
ckb-types = { package = "ckb-types", git = "https://github.com/ethanyuan/ckb", branch = "v0.116.1-branch-chain" }

rocksdb = { package = "ckb-rocksdb", version ="=0.21.1", features = ["snappy"], default-features = false }

Expand Down

0 comments on commit 7a5a694

Please sign in to comment.