diff --git a/.github/workflows/forester-tests.yml b/.github/workflows/forester-tests.yml index 96c301a1f1..efbd2ea303 100644 --- a/.github/workflows/forester-tests.yml +++ b/.github/workflows/forester-tests.yml @@ -22,18 +22,40 @@ concurrency: cancel-in-progress: true env: + RUST_BACKTRACE: "1" RUSTFLAGS: "--cfg tokio_unstable -D warnings" jobs: test: strategy: matrix: - test-name: [ - {name: "address-batched", command: "test_address_batched", timeout: 60, needs-test-program: true}, - {name: "state-batched", command: "test_state_batched", timeout: 60, needs-test-program: false}, - {name: "2-foresters", command: "test_epoch_monitor_with_2_foresters", timeout: 60, needs-test-program: false}, - {name: "double-registration", command: "test_epoch_double_registration", timeout: 60, needs-test-program: false} - ] + test-name: + [ + { + name: "address-batched", + command: "test_address_batched", + timeout: 60, + needs-test-program: true, + }, + { + name: "state-batched", + command: "test_state_batched", + timeout: 60, + needs-test-program: false, + }, + { + name: "2-foresters", + command: "test_epoch_monitor_with_2_foresters", + timeout: 60, + needs-test-program: false, + }, + { + name: "double-registration", + command: "test_epoch_double_registration", + timeout: 60, + needs-test-program: false, + }, + ] name: test-${{ matrix.test-name.name }} runs-on: ubuntu-latest timeout-minutes: ${{ matrix.test-name.timeout }} @@ -63,4 +85,4 @@ jobs: - name: Run ${{ matrix.test-name.name }} tests run: | source ./scripts/devenv.sh - cargo test --package forester ${{ matrix.test-name.command }} -- --nocapture \ No newline at end of file + cargo test --package forester ${{ matrix.test-name.command }} -- --nocapture diff --git a/Cargo.lock b/Cargo.lock index 03f09499fe..b8424c21ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1668,6 +1668,12 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -1949,10 +1955,13 @@ dependencies = [ "anchor-lang", "anyhow", "async-trait", + "bb8", + "bincode", "borsh 0.10.3", "bs58 0.5.1", "clap 4.5.23", "dashmap 6.1.0", + "dotenvy", "env_logger 0.11.6", "forester-utils", "futures", @@ -1980,11 +1989,14 @@ dependencies = [ "solana-client", "solana-program", "solana-sdk", + "solana-transaction-status", "thiserror 1.0.64", "tokio", + "tokio-tungstenite 0.16.1", "tracing", "tracing-appender", "tracing-subscriber", + "url", "warp", ] @@ -3127,7 +3139,9 @@ dependencies = [ "reqwest", "solana-banks-client", "solana-program-test", + "solana-rpc-client-api", "solana-sdk", + "solana-transaction-status", "tokio", ] @@ -5029,6 +5043,19 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha1" version = "0.10.6" @@ -7342,6 +7369,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.16.0", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" @@ -7546,6 +7585,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http 0.2.12", + "httparse", + "log", + "rand 0.8.5", + "sha-1", + "thiserror 1.0.64", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.20.1" diff --git a/Cargo.toml b/Cargo.toml index 72413219bb..8bd22e17ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ solana-cli-output = "=1.18.22" solana-transaction-status = "=1.18.22" solana-account-decoder = "=1.18.22" solana-rpc = "=1.18.22" +solana-rpc-client-api = "=1.18.22" spl-token = "=4.0.0" spl-token-2022 = {version="3.0.5", no-default-features = true, features = ["no-entrypoint"]} diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 5cf562daac..bce6dee24b 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -23,6 +23,12 @@ light-client = { workspace = true } light-merkle-tree-metadata = { workspace = true } light-sdk = { workspace = true } light-program-test = { workspace = true} +solana-transaction-status = { workspace = true } +bincode = "1.3" +url = "2.2" +tokio-tungstenite = "0.16" +bb8 = { workspace = true } + serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["full"] } @@ -48,3 +54,4 @@ serial_test = "3.2.0" light-prover-client = { workspace = true } light-test-utils = { workspace = true } light-program-test = { workspace = true, features = ["devenv"] } +dotenvy = "0.15" \ No newline at end of file diff --git a/forester/README.md b/forester/README.md index b94d73c579..fcfd5ca51f 100644 --- a/forester/README.md +++ b/forester/README.md @@ -8,36 +8,35 @@ It subscribes to the nullifier queue and nullifies merkle tree leaves. ## Configuration Forester requires a configuration file, `forester.toml`, specifying necessary keys: + - `STATE_MERKLE_TREE_PUBKEY`: Address of the State Merkle tree. - `NULLIFIER_QUEUE_PUBKEY`: Address of the State Nullifier queue. - `ADDRESS_MERKLE_TREE_PUBKEY`: Address of the Address Merkle tree. - `ADDRESS_MERKLE_TREE_QUEUE_PUBKEY`: Address of the Address queue. - `REGISTRY_PUBKEY`: Address of the Registry program. +To setup your environment properly, copy `.env.example` to `.env` +and update the `FORESTER_PAYER` field with your appropriate key. -To setup your environment properly, copy `.env.example` to `.env` -and update the `FORESTER_PAYER` field with your appropriate key. - -Alternatively, if you prefer to use a terminal profile file, -add the key to your `~/.zshrc` (zsh) or `~/.bashrc` (bash) +Alternatively, if you prefer to use a terminal profile file, +add the key to your `~/.zshrc` (zsh) or `~/.bashrc` (bash) by including this line: `export FORESTER_PAYER=your_value_here`. -Substitute `your_value_here` with your actual key. +Substitute `your_value_here` with your actual key. Remember to restart your terminal or source your terminal profile for the changes to take effect. ## Usage 1. Run the service: -To subscribe to nullify the state merkle tree, use the following command: -`cargo run -- subscribe` + To subscribe to nullify the state merkle tree, use the following command: + `cargo run -- subscribe` 2. To manually nullify state merkle tree leaves, use the following command: -`cargo run -- nullify-state` + `cargo run -- nullify-state` 3. To manually nullify address merkle tree leaves, use the following command: -`cargo run -- nullify-addresses` -4. To manually nullify state *and* address merkle tree leaves, use the following command: + `cargo run -- nullify-addresses` +4. To manually nullify state _and_ address merkle tree leaves, use the following command: `cargo run -- nullify` - ## TODO 1. Add indexer URL to the configuration file. diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 067a01f041..9e9933afc4 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -851,6 +851,7 @@ impl + IndexerType> EpochManager { ) .await?; + // light slot length in s let light_slot_timeout = { let slot_length_u32 = u32::try_from(epoch_pda.protocol_config.slot_length) .map_err(|_| ConfigurationError::SlotLengthOverflow { @@ -907,14 +908,21 @@ impl + IndexerType> EpochManager { } } } else { - // TODO: measure accuracy - // Optional replace with shutdown signal for all child processes + // TODO: measure accuracy Optional replace with shutdown + // signal for all child processes + // + // Note: as of now, this executes all batches sequentially: + // a single batch must fully complete before the next batch + // is sent. We can either limit num_batches to 1 and + // increase batch_size (quick fix) and require another + // rate-limiting mechanism (with more control). Or rework + // the send logic to not await confirmations. let batched_tx_config = SendBatchedTransactionsConfig { num_batches: 10, build_transaction_batch_config: BuildTransactionBatchConfig { batch_size: 50, // TODO: make batch size configurable and or dynamic based on queue usage - compute_unit_price: None, // Make dynamic based on queue usage - compute_unit_limit: Some(1_000_000), + compute_unit_price: Some(10_000), // Is dynamic. Sets max. + compute_unit_limit: Some(180_000), }, queue_config: self.config.queue_config, retry_config: RetryConfig { @@ -932,6 +940,7 @@ impl + IndexerType> EpochManager { }; debug!("Sending transactions..."); + // sequential let start_time = Instant::now(); let batch_tx_future = send_batched_transactions( &self.config.payer_keypair, diff --git a/forester/src/helius_priority_fee_types.rs b/forester/src/helius_priority_fee_types.rs new file mode 100644 index 0000000000..d9d22b2fa1 --- /dev/null +++ b/forester/src/helius_priority_fee_types.rs @@ -0,0 +1,74 @@ +// adapted from https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/types/types.rs +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub enum PriorityLevel { + Min, + Low, + Medium, + High, + VeryHigh, + UnsafeMax, + Default, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum UiTransactionEncoding { + Binary, + Base64, + Base58, + Json, + JsonParsed, +} +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +pub struct RpcRequest { + pub jsonrpc: String, + pub id: String, + pub method: String, + #[serde(rename = "params")] + pub parameters: T, +} + +impl RpcRequest { + pub fn new(method: String, parameters: T) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id: "1".to_string(), + method, + parameters, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +pub struct RpcResponse { + pub jsonrpc: String, + pub id: String, + pub result: T, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct GetPriorityFeeEstimateOptions { + pub priority_level: Option, + pub include_all_priority_fee_levels: Option, + pub transaction_encoding: Option, + pub lookback_slots: Option, + pub recommended: Option, + pub include_vote: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct GetPriorityFeeEstimateRequest { + #[serde(skip_serializing_if = "Option::is_none")] + pub transaction: Option, + #[serde(rename = "accountKeys", skip_serializing_if = "Option::is_none")] + pub account_keys: Option>, + pub options: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct GetPriorityFeeEstimateResponse { + #[serde(rename = "priorityFeeEstimate")] + pub priority_fee_estimate: Option, +} diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 64e9f5fdfe..25ea7e6c0e 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -6,6 +6,7 @@ pub mod config; pub mod epoch_manager; pub mod errors; pub mod forester_status; +pub mod helius_priority_fee_types; mod indexer_type; pub mod metrics; pub mod pagerduty; @@ -15,6 +16,7 @@ pub mod queue_helpers; pub mod rollover; pub mod send_transaction; mod slot_tracker; +pub mod smart_transaction; pub mod telemetry; pub mod tree_data_sync; pub mod tree_finder; diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index ceee29010e..a610586339 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -12,12 +12,17 @@ use light_client::{ rpc::{RetryConfig, RpcConnection}, rpc_pool::SolanaRpcPool, }; -use light_registry::account_compression_cpi::sdk::{ - create_nullify_instruction, create_update_address_merkle_tree_instruction, - CreateNullifyInstructionInputs, UpdateAddressMerkleTreeInstructionInputs, +use light_registry::{ + account_compression_cpi::sdk::{ + create_nullify_instruction, create_update_address_merkle_tree_instruction, + CreateNullifyInstructionInputs, UpdateAddressMerkleTreeInstructionInputs, + }, + utils::get_forester_epoch_pda_from_authority, }; +use solana_client::rpc_config::RpcSendTransactionConfig; use solana_sdk::{ - compute_budget::ComputeBudgetInstruction, + bs58, + commitment_config::CommitmentLevel, hash::Hash, instruction::Instruction, pubkey::Pubkey, @@ -30,25 +35,36 @@ use tokio::{ time::{sleep, Instant}, }; use tracing::{debug, warn}; +use url::Url; use crate::{ config::QueueConfig, epoch_manager::{MerkleProofType, WorkItem}, errors::ForesterError, + helius_priority_fee_types::{ + GetPriorityFeeEstimateOptions, GetPriorityFeeEstimateRequest, + GetPriorityFeeEstimateResponse, RpcRequest, RpcResponse, + }, queue_helpers::fetch_queue_item_data, + smart_transaction::{ + create_smart_transaction, send_and_confirm_transaction, CreateSmartTransactionConfig, + }, Result, }; - #[async_trait] +#[allow(clippy::too_many_arguments)] pub trait TransactionBuilder { + fn epoch(&self) -> u64; async fn build_signed_transaction_batch( &self, payer: &Keypair, derivation: &Pubkey, recent_blockhash: &Hash, + last_valid_block_height: u64, + priority_fee: u64, work_items: &[WorkItem], config: BuildTransactionBatchConfig, - ) -> Result>; + ) -> Result<(Vec, u64)>; } // We're assuming that: @@ -60,8 +76,13 @@ const LATENCY: Duration = Duration::from_millis(4 * 500); const TIMEOUT_CHECK_ENABLED: bool = true; +/// Calculate the compute unit price in microLamports based on the target lamports and compute units +pub fn calculate_compute_unit_price(target_lamports: u64, compute_units: u64) -> u64 { + ((target_lamports * 1_000_000) as f64 / compute_units as f64).ceil() as u64 +} + /// Setting: -/// 1. We have 1 light slot 15 seconds and a lot of elements in the queue +/// 1. We have 1 light slot (n solana slots), and elements in thequeue /// 2. we want to send as many elements from the queue as possible /// /// Strategy: @@ -78,7 +99,6 @@ const TIMEOUT_CHECK_ENABLED: bool = true; /// /// Questions: /// - How do we make sure that we have send all the transactions? -/// - How can we monitor how many txs have been dropped? /// /// TODO: /// - return number of sent transactions @@ -97,6 +117,7 @@ pub async fn send_batched_transactions( let start_time = Instant::now(); let mut rpc = pool.get_connection().await?; + let mut num_batches = 0; let mut num_sent_transactions: usize = 0; // 1. Execute batches until max number of batches is reached or light slot @@ -143,43 +164,69 @@ pub async fn send_batched_transactions( continue; } - // 4. Fetch recent blockhash. - // A recent blockhash is valid for 2 mins we only need one per batch. We - // use a new one per batch in case that we want to retry these same - // transactions and identical transactions might be dropped. + // 4. Fetch recent confirmed blockhash. + // A blockhash is valid for 150 blocks. let recent_blockhash = rpc.get_latest_blockhash().await?; + let current_block_height = rpc.get_block_height().await?; + let last_valid_block_height = current_block_height + 150; + + let forester_epoch_pda_pubkey = + get_forester_epoch_pda_from_authority(derivation, transaction_builder.epoch()).0; + // Get the priority fee estimate based on write-locked accounts + let account_keys = vec![ + payer.pubkey(), + forester_epoch_pda_pubkey, + tree_accounts.queue, + tree_accounts.merkle_tree, + ]; + let url = Url::parse(&rpc.get_url()).expect("Failed to parse URL"); + let priority_fee_recommendation: u64 = + request_priority_fee_estimate(&url, account_keys).await?; + + // Cap the priority fee and CU usage with buffer. + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: priority_fee_recommendation, + min_fee_lamports: config + .build_transaction_batch_config + .compute_unit_price + .unwrap_or(10_000), + max_fee_lamports: config + .build_transaction_batch_config + .compute_unit_price + .unwrap_or(100_000), + compute_unit_limit: config + .build_transaction_batch_config + .compute_unit_limit + .unwrap_or(200_000) as u64, + }; + let priority_fee = get_capped_priority_fee(cap_config); + // 5. Iterate over work items in chunks of batch size. for work_items in work_items.chunks(config.build_transaction_batch_config.batch_size as usize) { // 6. Check if we reached the end of the light slot. if TIMEOUT_CHECK_ENABLED { - let remaining_time = match config - .retry_config - .timeout - .checked_sub(start_time.elapsed()) - { - Some(time) => time, - None => { - debug!("Reached end of light slot"); - break; - } - }; - + let remaining_time = + get_remaining_time_in_light_slot(start_time, config.retry_config.timeout); if remaining_time < LATENCY { debug!("Reached end of light slot"); break; } } - // Minimum time to wait for the next batch of transactions. - // Can be used to avoid rate limits. + // Minimum time to wait for the next batch of transactions. Can be + // used to avoid rate limits. TODO(swen): check max feasible batch + // size and latency for large tx batches. TODO: add global rate + // limit across our instances and queues: max 100 RPS global. let transaction_build_time_start = Instant::now(); - let transactions: Vec = transaction_builder + let (transactions, _block_height) = transaction_builder .build_signed_transaction_batch( payer, derivation, &recent_blockhash, + last_valid_block_height, + priority_fee, work_items, config.build_transaction_batch_config, ) @@ -191,43 +238,58 @@ pub async fn send_batched_transactions( let batch_start = Instant::now(); if TIMEOUT_CHECK_ENABLED { - let remaining_time = config - .retry_config - .timeout - .saturating_sub(start_time.elapsed()); - + let remaining_time = + get_remaining_time_in_light_slot(start_time, config.retry_config.timeout); if remaining_time < LATENCY { debug!("Reached end of light slot"); break; } } - // Asynchronously send all transactions in the batch - let pool_clone = Arc::clone(&pool); - let send_futures = transactions.into_iter().map(move |tx| { - let pool_clone = Arc::clone(&pool_clone); - tokio::spawn(async move { - match pool_clone.get_connection().await { - Ok(mut rpc) => { - let result = rpc.process_transaction(tx).await; - println!("tx result: {:?}", result); - result + + let send_transaction_config = RpcSendTransactionConfig { + // Use required settings for routing through staked connection: + // https://docs.helius.dev/guides/sending-transactions-on-solana + skip_preflight: true, + max_retries: Some(0), + preflight_commitment: Some(CommitmentLevel::Confirmed), + ..Default::default() + }; + // Send and confirm all transactions in the batch non-blocking. + let send_futures: Vec<_> = transactions + .into_iter() + .map(|tx| { + let pool_clone = Arc::clone(&pool); + async move { + match pool_clone.get_connection().await { + Ok(mut rpc) => { + send_and_confirm_transaction( + &mut rpc, + &tx, + send_transaction_config, + last_valid_block_height, + config.retry_config.timeout, + ) + .await + } + Err(e) => Err(light_client::rpc::RpcError::CustomError(format!( + "Failed to get RPC connection: {}", + e + ))), } - Err(e) => Err(light_client::rpc::RpcError::CustomError(format!( - "Failed to get RPC connection: {}", - e - ))), } }) - }); + .collect(); let results = join_all(send_futures).await; - // Process results + // Evaluate results for result in results { match result { - Ok(Ok(_)) => num_sent_transactions += 1, - Ok(Err(e)) => warn!("Transaction failed: {:?}", e), - Err(e) => warn!("Task failed: {:?}", e), + Ok(signature) => { + num_sent_transactions += 1; + println!("Transaction sent: {:?}", signature); + } + Err(e) => warn!("Transaction failed: {:?}", e), } } @@ -255,6 +317,18 @@ pub async fn send_batched_transactions( Ok(num_sent_transactions) } +#[derive(Debug, Clone, Copy)] +pub struct CapConfig { + pub rec_fee_microlamports_per_cu: u64, + pub min_fee_lamports: u64, + pub max_fee_lamports: u64, + pub compute_unit_limit: u64, +} + +fn get_remaining_time_in_light_slot(start_time: Instant, timeout: Duration) -> Duration { + timeout.saturating_sub(start_time.elapsed()) +} + #[derive(Debug, Clone, Copy)] pub struct SendBatchedTransactionsConfig { pub num_batches: u64, @@ -279,14 +353,20 @@ pub struct EpochManagerTransactions> { #[async_trait] impl> TransactionBuilder for EpochManagerTransactions { + fn epoch(&self) -> u64 { + self.epoch + } + async fn build_signed_transaction_batch( &self, payer: &Keypair, derivation: &Pubkey, recent_blockhash: &Hash, + last_valid_block_height: u64, + priority_fee: u64, work_items: &[WorkItem], config: BuildTransactionBatchConfig, - ) -> Result> { + ) -> Result<(Vec, u64)> { let mut transactions = vec![]; let (_, all_instructions) = fetch_proofs_and_create_instructions( payer.pubkey(), @@ -296,44 +376,23 @@ impl> TransactionBuilder for EpochManagerTransac work_items, ) .await?; + for instruction in all_instructions { - let transaction = build_signed_transaction( - payer, - recent_blockhash, - config.compute_unit_price, - config.compute_unit_limit, - instruction, - ) - .await; + let (transaction, _) = create_smart_transaction(CreateSmartTransactionConfig { + payer: payer.insecure_clone(), + instructions: vec![instruction], + recent_blockhash: *recent_blockhash, + compute_unit_price: Some(priority_fee), + compute_unit_limit: config.compute_unit_limit, + last_valid_block_hash: last_valid_block_height, + }) + .await?; transactions.push(transaction); } - Ok(transactions) + Ok((transactions, last_valid_block_height)) } } -async fn build_signed_transaction( - payer: &Keypair, - recent_blockhash: &Hash, - compute_unit_price: Option, - compute_unit_limit: Option, - instruction: Instruction, -) -> Transaction { - let mut instructions: Vec = if let Some(price) = compute_unit_price { - vec![ComputeBudgetInstruction::set_compute_unit_price(price)] - } else { - vec![] - }; - if let Some(limit) = compute_unit_limit { - instructions.push(ComputeBudgetInstruction::set_compute_unit_limit(limit)); - } - instructions.push(instruction); - - let mut transaction = - Transaction::new_with_payer(instructions.as_slice(), Some(&payer.pubkey())); - transaction.sign(&[payer], *recent_blockhash); - transaction -} - /// Work items should be of only one type and tree pub async fn fetch_proofs_and_create_instructions>( authority: Pubkey, @@ -453,3 +512,73 @@ pub async fn fetch_proofs_and_create_instructions) -> Result { + if url.host_str() == Some("localhost") { + return Ok(10_000); + } + + let priority_fee_request = GetPriorityFeeEstimateRequest { + transaction: None, + account_keys: Some( + account_keys + .iter() + .map(|pubkey| bs58::encode(pubkey).into_string()) + .collect(), + ), + options: Some(GetPriorityFeeEstimateOptions { + include_all_priority_fee_levels: None, + recommended: Some(true), + include_vote: None, + lookback_slots: None, + priority_level: None, + transaction_encoding: None, + }), + }; + + let rpc_request = RpcRequest::new( + "getPriorityFeeEstimate".to_string(), + serde_json::json!({ + "get_priority_fee_estimate_request": priority_fee_request + }), + ); + + let client = reqwest::Client::new(); + let response = client + .post(url.clone()) + .header("Content-Type", "application/json") + .json(&rpc_request) + .send() + .await?; + + let response_text = response.text().await?; + + let response: RpcResponse = + serde_json::from_str(&response_text)?; + + response + .result + .priority_fee_estimate + .map(|estimate| estimate as u64) + .ok_or( + ForesterError::General { + error: "Priority fee estimate not available".to_string(), + } + .into(), + ) +} + +/// Get capped priority fee for transaction between min and max. +pub fn get_capped_priority_fee(cap_config: CapConfig) -> u64 { + if cap_config.max_fee_lamports < cap_config.min_fee_lamports { + panic!("Max fee is less than min fee"); + } + + let priority_fee_max = + calculate_compute_unit_price(cap_config.max_fee_lamports, cap_config.compute_unit_limit); + let priority_fee_min = + calculate_compute_unit_price(cap_config.min_fee_lamports, cap_config.compute_unit_limit); + let capped_fee = std::cmp::min(cap_config.rec_fee_microlamports_per_cu, priority_fee_max); + std::cmp::max(capped_fee, priority_fee_min) +} diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs new file mode 100644 index 0000000000..16bb84a7af --- /dev/null +++ b/forester/src/smart_transaction.rs @@ -0,0 +1,140 @@ +// adapted from https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/optimized_transaction.rs +// optimized for forester client +use std::time::{Duration, Instant}; + +use light_client::{rpc::RpcConnection, rpc_pool::SolanaConnectionManager}; +use solana_client::rpc_config::RpcSendTransactionConfig; +use solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + hash::Hash, + instruction::Instruction, + pubkey::Pubkey, + signature::{Signature, Signer}, + signer::keypair::Keypair, + transaction::Transaction, +}; +use solana_transaction_status::TransactionConfirmationStatus; +use tokio::time::sleep; + +pub struct CreateSmartTransactionConfig { + pub payer: Keypair, + pub recent_blockhash: Hash, + pub compute_unit_price: Option, + pub compute_unit_limit: Option, + pub instructions: Vec, + pub last_valid_block_hash: u64, +} + +/// Poll a transaction to check whether it has been confirmed +/// +/// * `txt-sig` - The transaction signature to check +/// +/// # Returns +/// The confirmed transaction signature or an error if the confirmation times out +pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( + connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, + txt_sig: Signature, + abort_timeout: Duration, +) -> Result { + // 12 second total timeout before exiting + let timeout: Duration = Duration::from_secs(12); + // 6 second retry interval + let interval: Duration = Duration::from_secs(6); + let start: Instant = Instant::now(); + + loop { + if start.elapsed() >= timeout || start.elapsed() >= abort_timeout { + return Err(light_client::rpc::RpcError::CustomError(format!( + "Transaction {}'s confirmation timed out", + txt_sig + ))); + } + + let status: Vec> = + connection.get_signature_statuses(&[txt_sig]).await?; + + match status[0].clone() { + Some(status) => { + if status.err.is_none() + && (status.confirmation_status + == Some(TransactionConfirmationStatus::Confirmed) + || status.confirmation_status + == Some(TransactionConfirmationStatus::Finalized)) + { + return Ok(txt_sig); + } + if status.err.is_some() { + return Err(light_client::rpc::RpcError::CustomError(format!( + "Transaction {}'s confirmation failed", + txt_sig + ))); + } + } + None => { + sleep(interval).await; + } + } + } +} + +// Sends a transaction and handles its confirmation. Retries until timeout or last_valid_block_height is reached. +pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( + connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, + transaction: &Transaction, + send_transaction_config: RpcSendTransactionConfig, + last_valid_block_height: u64, + timeout: Duration, +) -> Result { + let start_time: Instant = Instant::now(); + + while Instant::now().duration_since(start_time) < timeout + && connection.get_slot().await? <= last_valid_block_height + { + let result = connection.send_transaction_with_config(transaction, send_transaction_config); + + match result.await { + Ok(signature) => { + // Poll for transaction confirmation + match poll_transaction_confirmation(connection, signature, timeout).await { + Ok(sig) => return Ok(sig), + // Retry on polling failure + Err(_) => continue, + } + } + // Retry on send failure + Err(_) => continue, + } + } + + Err(light_client::rpc::RpcError::CustomError( + "Transaction failed to confirm within timeout.".to_string(), + )) +} + +/// Creates an optimized transaction based on the provided configuration +/// +/// # Arguments +/// * `config` - The configuration for the smart transaction, which includes the transaction's instructions, signers, and lookup tables, depending on +/// whether it's a legacy or versioned smart transaction. The transaction's send configuration can also be changed, if provided +/// +/// # Returns +/// An optimized `Transaction` and the `last_valid_block_height` +pub async fn create_smart_transaction( + config: CreateSmartTransactionConfig, +) -> Result<(Transaction, u64), light_client::rpc::RpcError> { + let payer_pubkey: Pubkey = config.payer.pubkey(); + let mut final_instructions: Vec = if let Some(price) = config.compute_unit_price { + vec![ComputeBudgetInstruction::set_compute_unit_price(price)] + } else { + vec![] + }; + if let Some(limit) = config.compute_unit_limit { + final_instructions.push(ComputeBudgetInstruction::set_compute_unit_limit(limit)); + } + final_instructions.extend(config.instructions); + + let mut tx = Transaction::new_with_payer(&final_instructions, Some(&payer_pubkey)); + tx.sign(&[&config.payer], config.recent_blockhash); + + Ok((tx, config.last_valid_block_hash)) +} diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs new file mode 100644 index 0000000000..5591c64b3e --- /dev/null +++ b/forester/tests/priority_fee_test.rs @@ -0,0 +1,213 @@ +use forester::{ + cli::StartArgs, + send_transaction::{get_capped_priority_fee, request_priority_fee_estimate, CapConfig}, + ForesterConfig, +}; +use light_client::rpc::{RpcConnection, SolanaRpcConnection}; +use solana_sdk::{commitment_config::CommitmentConfig, signature::Signer}; +use url::Url; + +use crate::test_utils::init; +mod test_utils; + +#[tokio::test] +async fn test_priority_fee_request() { + dotenvy::dotenv().ok(); + + init(None).await; + + let args = StartArgs { + rpc_url: Some( + std::env::var("FORESTER_RPC_URL").expect("FORESTER_RPC_URL must be set in environment"), + ), + push_gateway_url: None, + pagerduty_routing_key: None, + ws_rpc_url: Some( + std::env::var("FORESTER_WS_RPC_URL") + .expect("FORESTER_WS_RPC_URL must be set in environment"), + ), + indexer_url: Some( + std::env::var("FORESTER_INDEXER_URL") + .expect("FORESTER_INDEXER_URL must be set in environment"), + ), + prover_url: Some( + std::env::var("FORESTER_PROVER_URL") + .expect("FORESTER_PROVER_URL must be set in environment"), + ), + payer: Some( + std::env::var("FORESTER_PAYER").expect("FORESTER_PAYER must be set in environment"), + ), + derivation: Some( + std::env::var("FORESTER_DERIVATION_PUBKEY") + .expect("FORESTER_DERIVATION_PUBKEY must be set in environment"), + ), + photon_api_key: Some( + std::env::var("PHOTON_API_KEY").expect("PHOTON_API_KEY must be set in environment"), + ), + indexer_batch_size: 50, + indexer_max_concurrent_batches: 10, + transaction_batch_size: 1, + transaction_max_concurrent_batches: 20, + cu_limit: 1_000_000, + rpc_pool_size: 20, + slot_update_interval_seconds: 10, + tree_discovery_interval_seconds: 5, + max_retries: 3, + retry_delay: 1000, + retry_timeout: 30000, + state_queue_start_index: 0, + state_queue_processing_length: 28807, + address_queue_start_index: 0, + address_queue_processing_length: 28807, + }; + + let config = ForesterConfig::new_for_start(&args).expect("Failed to create config"); + + // Setup RPC connection using config + let mut rpc = SolanaRpcConnection::new( + config.external_services.rpc_url, + Some(CommitmentConfig::confirmed()), + ); + rpc.payer = config.payer_keypair.insecure_clone(); + + let account_keys = vec![config.payer_keypair.pubkey()]; + + let url = Url::parse(&rpc.get_url()).expect("Failed to parse URL"); + println!("URL: {}", url); + let priority_fee = request_priority_fee_estimate(&url, account_keys) + .await + .unwrap(); + + println!("Priority fee: {:?}", priority_fee); + assert!(priority_fee > 0, "Priority fee should be greater than 0"); +} +#[test] + +fn test_capped_priority_fee() { + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 50_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + // 1_000_000 cu x 50_000 microlamports per cu = 50_000 lamports total + compute_unit_limit: 1_000_000, + }; + let expected = 50_000; + + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 10_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 100_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 100_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 20_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 20_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 200_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 100_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 0, + max_fee_lamports: 0, + compute_unit_limit: 1_000_000, + }; + let expected = 0; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 10_000, + max_fee_lamports: 0, + compute_unit_limit: 1_000_000, + }; + println!("expecting panic"); + let result = std::panic::catch_unwind(|| get_capped_priority_fee(cap_config)); + assert!( + result.is_err(), + "Expected panic for max fee less than min fee" + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 50_000, + max_fee_lamports: 50_000, + compute_unit_limit: 1_000_000, + }; + let expected = 50_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 100_000, + min_fee_lamports: 50_000, + max_fee_lamports: 50_000, + compute_unit_limit: 1_000_000, + }; + let expected = 50_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); +} diff --git a/programs/registry/src/protocol_config/state.rs b/programs/registry/src/protocol_config/state.rs index 24a6eca469..0a635b3bb9 100644 --- a/programs/registry/src/protocol_config/state.rs +++ b/programs/registry/src/protocol_config/state.rs @@ -25,7 +25,7 @@ pub struct ProtocolConfig { pub genesis_slot: u64, /// Minimum weight required for a forester to register to an epoch. pub min_weight: u64, - /// Light protocol slot length. + /// Light protocol slot length pub slot_length: u64, /// Foresters can register for this phase. pub registration_phase_length: u64, diff --git a/sdk-libs/client/src/rpc/rpc_connection.rs b/sdk-libs/client/src/rpc/rpc_connection.rs index 08854d230d..34cfd5d338 100644 --- a/sdk-libs/client/src/rpc/rpc_connection.rs +++ b/sdk-libs/client/src/rpc/rpc_connection.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use async_trait::async_trait; use borsh::BorshDeserialize; +use solana_client::rpc_config::RpcSendTransactionConfig; use solana_program::{clock::Slot, instruction::Instruction}; use solana_sdk::{ account::{Account, AccountSharedData}, @@ -12,6 +13,7 @@ use solana_sdk::{ signature::{Keypair, Signature}, transaction::Transaction, }; +use solana_transaction_status::TransactionStatus; use crate::{rpc::errors::RpcError, transaction_params::TransactionParams}; @@ -91,5 +93,15 @@ pub trait RpcConnection: Send + Sync + Debug + 'static { async fn get_slot(&mut self) -> Result; async fn warp_to_slot(&mut self, slot: Slot) -> Result<(), RpcError>; async fn send_transaction(&self, transaction: &Transaction) -> Result; + async fn send_transaction_with_config( + &self, + transaction: &Transaction, + config: RpcSendTransactionConfig, + ) -> Result; async fn get_transaction_slot(&mut self, signature: &Signature) -> Result; + async fn get_signature_statuses( + &self, + signatures: &[Signature], + ) -> Result>, RpcError>; + async fn get_block_height(&mut self) -> Result; } diff --git a/sdk-libs/client/src/rpc/solana_rpc.rs b/sdk-libs/client/src/rpc/solana_rpc.rs index 2c3cc6b632..c5461381f2 100644 --- a/sdk-libs/client/src/rpc/solana_rpc.rs +++ b/sdk-libs/client/src/rpc/solana_rpc.rs @@ -22,7 +22,7 @@ use solana_sdk::{ transaction::Transaction, }; use solana_transaction_status::{ - option_serializer::OptionSerializer, UiInstruction, UiTransactionEncoding, + option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding, }; use tokio::time::{sleep, Instant}; @@ -56,6 +56,8 @@ impl Display for SolanaRpcUrl { pub struct RetryConfig { pub max_retries: u32, pub retry_delay: Duration, + /// Max Light slot timeout in time based on solana slot length and light + /// slot length. pub timeout: Duration, } @@ -343,6 +345,15 @@ impl RpcConnection for SolanaRpcConnection { let result = parsed_event.map(|e| (e, signature, slot)); Ok(result) } + async fn get_signature_statuses( + &self, + signatures: &[Signature], + ) -> Result>, RpcError> { + self.client + .get_signature_statuses(signatures) + .map(|response| response.value) + .map_err(RpcError::from) + } async fn confirm_transaction(&self, signature: Signature) -> Result { self.retry(|| async { @@ -413,8 +424,15 @@ impl RpcConnection for SolanaRpcConnection { } async fn get_latest_blockhash(&mut self) -> Result { - self.retry(|| async { self.client.get_latest_blockhash().map_err(RpcError::from) }) - .await + self.retry(|| async { + self.client + // Confirmed commitments land more reliably than finalized + // https://www.helius.dev/blog/how-to-deal-with-blockhash-errors-on-solana#how-to-deal-with-blockhash-errors + .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) + .map(|response| response.0) + .map_err(RpcError::from) + }) + .await } async fn get_slot(&mut self) -> Result { @@ -443,6 +461,18 @@ impl RpcConnection for SolanaRpcConnection { }) .await } + async fn send_transaction_with_config( + &self, + transaction: &Transaction, + config: RpcSendTransactionConfig, + ) -> Result { + self.retry(|| async { + self.client + .send_transaction_with_config(transaction, config) + .map_err(RpcError::from) + }) + .await + } async fn get_transaction_slot(&mut self, signature: &Signature) -> Result { self.retry(|| async { @@ -461,6 +491,10 @@ impl RpcConnection for SolanaRpcConnection { }) .await } + async fn get_block_height(&mut self) -> Result { + self.retry(|| async { self.client.get_block_height().map_err(RpcError::from) }) + .await + } } impl MerkleTreeExt for SolanaRpcConnection {} diff --git a/sdk-libs/program-test/Cargo.toml b/sdk-libs/program-test/Cargo.toml index a9ec2ab938..10cdcc4f1b 100644 --- a/sdk-libs/program-test/Cargo.toml +++ b/sdk-libs/program-test/Cargo.toml @@ -33,4 +33,6 @@ num-traits = { workspace = true } reqwest = { workspace = true } anchor-lang = { workspace = true } light-verifier = { workspace = true } -light-batched-merkle-tree = { workspace = true } \ No newline at end of file +light-batched-merkle-tree = { workspace = true } +solana-transaction-status = { workspace = true } +solana-rpc-client-api = { workspace = true } diff --git a/sdk-libs/program-test/src/test_rpc.rs b/sdk-libs/program-test/src/test_rpc.rs index 8a9bc2d6ab..3864f46d08 100644 --- a/sdk-libs/program-test/src/test_rpc.rs +++ b/sdk-libs/program-test/src/test_rpc.rs @@ -8,6 +8,7 @@ use light_client::{ }; use solana_banks_client::BanksClientError; use solana_program_test::ProgramTestContext; +use solana_rpc_client_api::config::RpcSendTransactionConfig; use solana_sdk::{ account::{Account, AccountSharedData}, clock::Slot, @@ -20,6 +21,7 @@ use solana_sdk::{ system_instruction, transaction::{Transaction, TransactionError}, }; +use solana_transaction_status::TransactionStatus; pub struct ProgramTestRpcConnection { pub context: ProgramTestContext, @@ -308,6 +310,14 @@ impl RpcConnection for ProgramTestRpcConnection { unimplemented!("send transaction is unimplemented for ProgramTestRpcConnection") } + async fn send_transaction_with_config( + &self, + _transaction: &Transaction, + _config: RpcSendTransactionConfig, + ) -> Result { + unimplemented!("send transaction with config is unimplemented for ProgramTestRpcConnection") + } + async fn get_transaction_slot(&mut self, signature: &Signature) -> Result { self.context .banks_client @@ -322,6 +332,16 @@ impl RpcConnection for ProgramTestRpcConnection { .map(|status| status.slot) }) } + async fn get_signature_statuses( + &self, + _signatures: &[Signature], + ) -> Result>, RpcError> { + unimplemented!("get_signature_statuses is unimplemented for ProgramTestRpcConnection") + } + + async fn get_block_height(&mut self) -> Result { + unimplemented!("get_block_height is unimplemented for ProgramTestRpcConnection") + } } impl MerkleTreeExt for ProgramTestRpcConnection {}