Skip to content

Commit

Permalink
Merge pull request #12 from EthanYuan/v0.116.1-branch-chain-separatio…
Browse files Browse the repository at this point in the history
…n-aggregator

separation aggregator
  • Loading branch information
EthanYuan authored Aug 17, 2024
2 parents 783f352 + 54ad9de commit 6521e80
Show file tree
Hide file tree
Showing 24 changed files with 537 additions and 266 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
README.md
ckb.toml
ckb-miner.toml
ckb-aggregator.toml
target/
data/
specs/
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tags
# runtime folder
/ckb.toml
/ckb-miner.toml
/ckb-aggregator.toml
/default.db-options
/data
/specs
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ ${GEN_MOL_OUT_DIR}/protocols.rs: ${GEN_MOL_IN_DIR}/protocols.mol
##@ Cleanup
.PHONY: clean-node-files
clean-node-files: ## Clean files generated by `ckb init`
rm -rf ckb.toml ckb-miner.toml default.db-options specs/ data/
rm -rf ckb.toml ckb-miner.toml ckb-aggregator.toml default.db-options specs/ data/

##@ Helpers
.PHONY: stats
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ cd branch-dev
ckb init --chain dev --genesis-message branch-dev
```

### Configure
### Branch Configure

The following settings are used to configure the `block_assembler` in the `ckb.toml` file:

Expand All @@ -70,6 +70,21 @@ ckb run --indexer
```
Restarting in the same directory will reuse the data.

### Aggregator Configure

The following settings are used to configure the `block_assembler` in the `ckb-aggregator.toml` file:

```toml
branch_uri = "http://localhost:8114"
```
Fill in the target Branch node.

### Start Aggregator

```shell
ckb aggregator
```

## Use RPC

Find RPC port in the log output, the following command assumes 8114 is used:
Expand Down
1 change: 1 addition & 0 deletions branch-chain-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
[dependencies]
ckb-app-config = { path = "../util/app-config", version = "= 0.116.1" }
ckb-async-runtime = { path = "../util/runtime", version = "= 0.116.1" }
ckb-channel = { path = "../util/channel", version = "= 0.116.1" }
ckb-logger = { path = "../util/logger", version = "= 0.116.1" }
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.116.1" }

Expand Down
146 changes: 66 additions & 80 deletions branch-chain-aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::utils::{
};

use ckb_app_config::{AggregatorConfig, AssetConfig, LockConfig, ScriptConfig};
use ckb_channel::Receiver;
use ckb_logger::{error, info, warn};
use ckb_sdk::traits::LiveCell;
use ckb_sdk::{
Expand All @@ -24,9 +25,7 @@ use ckb_sdk::{
traits::{CellQueryOptions, MaturityOption, PrimaryScriptType, QueryOrder},
Since, SinceType,
};
use ckb_stop_handler::{
new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, CancellationToken,
};
use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken};
use ckb_types::H256;
use ckb_types::{
bytes::Bytes,
Expand All @@ -42,7 +41,6 @@ use std::thread;
use std::thread::sleep;
use std::time::Duration;

const THREAD_NAME: &str = "Aggregator";
const CKB_FEE_RATE_LIMIT: u64 = 5000;
const CONFIRMATION_THRESHOLD: u64 = 24;
///
Expand Down Expand Up @@ -98,86 +96,74 @@ impl Aggregator {
}

/// Run the Aggregator
pub fn run(&self) {
info!("chain id: {}", self.chain_id);

// Setup cancellation token
let stop_rx = new_crossbeam_exit_rx();
pub fn run(&self, stop_rx: Receiver<()>) {
let poll_interval = self.poll_interval;
let poll_service: Aggregator = self.clone();

let aggregator_jh = thread::Builder::new()
.name(THREAD_NAME.into())
.spawn(move || {
loop {
match stop_rx.try_recv() {
Ok(_) => {
info!("Aggregator received exit signal, stopped");
break;
}
Err(crossbeam_channel::TryRecvError::Empty) => {
// No exit signal, continue execution
}
Err(_) => {
info!("Error receiving exit signal");
break;
}
}

// get queue data
let rgbpp_requests = poll_service.get_rgbpp_queue_requests();
let (rgbpp_requests, queue_cell) = match rgbpp_requests {
Ok((rgbpp_requests, queue_cell)) => (rgbpp_requests, queue_cell),
Err(e) => {
error!("get RGB++ queue data error: {}", e.to_string());
continue;
}
};

let leap_tx =
poll_service.create_leap_tx(rgbpp_requests.clone(), queue_cell.clone());
let leap_tx = match leap_tx {
Ok(leap_tx) => leap_tx,
Err(e) => {
error!("create leap transaction error: {}", e.to_string());
continue;
}
};
match wait_for_tx_confirmation(
poll_service.rgbpp_rpc_client.clone(),
leap_tx,
Duration::from_secs(600),
) {
Ok(()) => {}
Err(e) => error!("{}", e.to_string()),
}

let update_queue_tx =
poll_service.create_clear_queue_tx(rgbpp_requests, queue_cell);
let update_queue_tx = match update_queue_tx {
Ok(update_queue_tx) => update_queue_tx,
Err(e) => {
error!("{}", e.to_string());
continue;
}
};
match wait_for_tx_confirmation(
poll_service.rgbpp_rpc_client.clone(),
update_queue_tx,
Duration::from_secs(600),
) {
Ok(()) => {}
Err(e) => error!("{}", e.to_string()),
}

if let Err(e) = poll_service.scan_rgbpp_request() {
info!("Aggregator: {:?}", e);
}
thread::sleep(poll_interval);
loop {
match stop_rx.try_recv() {
Ok(_) => {
info!("Aggregator received exit signal, stopped");
break;
}
})
.expect("Start aggregator failed!");
register_thread(THREAD_NAME, aggregator_jh);
Err(crossbeam_channel::TryRecvError::Empty) => {
// No exit signal, continue execution
}
Err(_) => {
info!("Error receiving exit signal");
break;
}
}

// get queue data
let rgbpp_requests = poll_service.get_rgbpp_queue_requests();
let (rgbpp_requests, queue_cell) = match rgbpp_requests {
Ok((rgbpp_requests, queue_cell)) => (rgbpp_requests, queue_cell),
Err(e) => {
error!("get RGB++ queue data error: {}", e.to_string());
continue;
}
};

let leap_tx = poll_service.create_leap_tx(rgbpp_requests.clone(), queue_cell.clone());
let leap_tx = match leap_tx {
Ok(leap_tx) => leap_tx,
Err(e) => {
error!("create leap transaction error: {}", e.to_string());
continue;
}
};
match wait_for_tx_confirmation(
poll_service.rgbpp_rpc_client.clone(),
leap_tx,
Duration::from_secs(600),
) {
Ok(()) => {}
Err(e) => error!("{}", e.to_string()),
}

let update_queue_tx = poll_service.create_clear_queue_tx(rgbpp_requests, queue_cell);
let update_queue_tx = match update_queue_tx {
Ok(update_queue_tx) => update_queue_tx,
Err(e) => {
error!("{}", e.to_string());
continue;
}
};
match wait_for_tx_confirmation(
poll_service.rgbpp_rpc_client.clone(),
update_queue_tx,
Duration::from_secs(600),
) {
Ok(()) => {}
Err(e) => error!("{}", e.to_string()),
}

if let Err(e) = poll_service.scan_rgbpp_request() {
info!("Aggregator: {:?}", e);
}
thread::sleep(poll_interval);
}
}

fn scan_rgbpp_request(&self) -> Result<(), Error> {
Expand Down
1 change: 0 additions & 1 deletion branch-chain-aggregator/src/transaction/leap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ impl Aggregator {
OutPoint::from_slice(&witness_input_type.raw_data()).map_err(|e| {
Error::TransactionParseError(format!("get queue from witness error: {}", e))
})?;
info!("Found message queue in leap tx witness");
Ok((queue_out_point, tx_hash))
}

Expand Down
1 change: 1 addition & 0 deletions ckb-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ckb-verification-traits = { path = "../verification/traits", version = "= 0.116.
ckb-async-runtime = { path = "../util/runtime", version = "= 0.116.1" }
ckb-migrate = { path = "../util/migrate", version = "= 0.116.1" }
ckb-launcher = { path = "../util/launcher", version = "= 0.116.1" }
branch-chain-aggregator = { path = "../branch-chain-aggregator", version = "= 0.1.0" }
base64 = "0.21.0"
tempfile.workspace = true
rayon = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions ckb-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ fn run_app_inner(
let is_silent_logging = is_silent_logging(cmd);
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime();
let setup = Setup::from_matches(bin_name, cmd, matches)?;
let chain_id = setup.consensus()?.clone().identify_name();
let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?;

raise_fd_limit();

let ret = match cmd {
cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()),
cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()),
cli::CMD_AGGREGATOR => subcommand::aggregator(setup.aggregator(matches)?, chain_id),
cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle.clone()),
cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle.clone()),
cli::CMD_IMPORT => subcommand::import(setup.import(matches)?, handle.clone()),
Expand Down
36 changes: 36 additions & 0 deletions ckb-bin/src/subcommand/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use branch_chain_aggregator::Aggregator;
use ckb_app_config::{AggregatorArgs, ExitCode};
use ckb_logger::info;
use ckb_stop_handler::{
broadcast_exit_signals, new_crossbeam_exit_rx, register_thread, wait_all_ckb_services_exit,
};

use std::thread;
use std::time::Duration;

pub fn aggregator(args: AggregatorArgs, chain_id: String) -> Result<(), ExitCode> {
info!("chain id: {}", chain_id);

let aggregator = Aggregator::new(args.config, Duration::from_secs(2), chain_id);

let stop_rx = new_crossbeam_exit_rx();
const THREAD_NAME: &str = "Aggregator";
let aggregator_jh = thread::Builder::new()
.name(THREAD_NAME.into())
.spawn(move || {
aggregator.run(stop_rx);
})
.expect("Start aggregator failed!");
register_thread(THREAD_NAME, aggregator_jh);

info!("Branch Aggregator service started ...");
ctrlc::set_handler(|| {
info!("Trapped exit signal, exiting...");
broadcast_exit_signals();
})
.expect("Error setting Ctrl-C handler");

wait_all_ckb_services_exit();

Ok(())
}
6 changes: 4 additions & 2 deletions ckb-bin/src/subcommand/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use ckb_app_config::{cli, AppConfig, ExitCode, InitArgs};
use ckb_chain_spec::ChainSpec;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_resource::{
Resource, TemplateContext, AVAILABLE_SPECS, CKB_CONFIG_FILE_NAME, DB_OPTIONS_FILE_NAME,
MINER_CONFIG_FILE_NAME, SPEC_DEV_FILE_NAME,
Resource, TemplateContext, AGGREGATOR_CONFIG_FILE_NAME, AVAILABLE_SPECS, CKB_CONFIG_FILE_NAME,
DB_OPTIONS_FILE_NAME, MINER_CONFIG_FILE_NAME, SPEC_DEV_FILE_NAME,
};
use ckb_types::{prelude::*, H256};

Expand Down Expand Up @@ -201,6 +201,8 @@ pub fn init(args: InitArgs) -> Result<(), ExitCode> {
Resource::bundled_ckb_config().export(&context, &args.root_dir)?;
println!("Create {MINER_CONFIG_FILE_NAME}");
Resource::bundled_miner_config().export(&context, &args.root_dir)?;
println!("Create {AGGREGATOR_CONFIG_FILE_NAME}");
Resource::bundled_aggregator_config().export(&context, &args.root_dir)?;
println!("Create {DB_OPTIONS_FILE_NAME}");
Resource::bundled_db_options().export(&context, &args.root_dir)?;

Expand Down
2 changes: 2 additions & 0 deletions ckb-bin/src/subcommand/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod aggregator;
#[cfg(not(target_os = "windows"))]
mod daemon;
mod export;
Expand All @@ -12,6 +13,7 @@ mod reset_data;
mod run;
mod stats;

pub use self::aggregator::aggregator;
#[cfg(not(target_os = "windows"))]
pub use self::daemon::{check_process, daemon};
pub use self::export::export;
Expand Down
1 change: 1 addition & 0 deletions resource/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ fn main() {
for f in &[
"ckb.toml",
"ckb-miner.toml",
"ckb-aggregator.toml",
"default.db-options",
"xudt_rce",
] {
Expand Down
Loading

0 comments on commit 6521e80

Please sign in to comment.