diff --git a/.github/actions/setup-rust/action.yaml b/.github/actions/setup-rust/action.yaml index 8b15b571..4a562a75 100644 --- a/.github/actions/setup-rust/action.yaml +++ b/.github/actions/setup-rust/action.yaml @@ -52,9 +52,9 @@ runs: - name: Install cargo dependencies run: | - rustup install nightly-2023-10-05 - rustup component add rustfmt --toolchain nightly-2023-10-05-x86_64-unknown-linux-gnu - rustup component add clippy --toolchain nightly-2023-10-05-x86_64-unknown-linux-gnu + rustup install nightly-2024-02-01 + rustup component add rustfmt --toolchain nightly-2024-02-01-x86_64-unknown-linux-gnu + rustup component add clippy --toolchain nightly-2024-02-01-x86_64-unknown-linux-gnu cargo install cargo-udeps --locked cargo install cargo-sort shell: bash @@ -63,6 +63,6 @@ runs: run: | rustc --version; cargo --version; - cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu clippy --version; - cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu fmt --version; + cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu clippy --version; + cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu fmt --version; shell: bash diff --git a/.github/workflows/clean_code.yaml b/.github/workflows/clean_code.yaml index 3e439c48..07303382 100644 --- a/.github/workflows/clean_code.yaml +++ b/.github/workflows/clean_code.yaml @@ -16,7 +16,7 @@ jobs: caller-workflow-name: clippy_and_udeps_check - name: cargo clippy - run: cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu clippy --all-targets + run: cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu clippy --all-targets - name: cargo udeps - run: cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu udeps --locked + run: cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu udeps --locked diff --git a/block_engine/src/block_engine.rs b/block_engine/src/block_engine.rs index 719c4bf7..699cf11c 100644 --- a/block_engine/src/block_engine.rs +++ b/block_engine/src/block_engine.rs @@ -5,7 +5,6 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, }, - thread, thread::{Builder, JoinHandle}, time::{Duration, Instant, SystemTime}, }; @@ -53,6 +52,11 @@ use tonic::{ use crate::block_engine_stats::BlockEngineStats; +pub struct BlockEngineConfig { + pub block_engine_url: String, + pub auth_service_url: String, +} + #[derive(Clone)] struct AuthInterceptor { access_token: Arc>, @@ -95,7 +99,7 @@ pub type BlockEngineResult = Result; /// Attempts to maintain a connection to a Block Engine and forward packets to it pub struct BlockEngineRelayerHandler { - block_engine_forwarder: JoinHandle<()>, + block_engine_forwarder: Option>, } impl BlockEngineRelayerHandler { @@ -103,8 +107,7 @@ impl BlockEngineRelayerHandler { #[allow(clippy::too_many_arguments)] pub fn new( - block_engine_url: String, - auth_service_url: String, + block_engine_config: Option, mut block_engine_receiver: Receiver, keypair: Arc, exit: Arc, @@ -114,46 +117,50 @@ impl BlockEngineRelayerHandler { ofac_addresses: HashSet, ) -> BlockEngineRelayerHandler { let is_connected_to_block_engine = is_connected_to_block_engine.clone(); - let block_engine_forwarder = Builder::new() - .name("block_engine_relayer_handler_thread".into()) - .spawn(move || { - let rt = Runtime::new().unwrap(); - rt.block_on(async move { - while !exit.load(Ordering::Relaxed) { - let result = Self::auth_and_connect( - &block_engine_url, - &auth_service_url, - &mut block_engine_receiver, - &keypair, - &exit, - aoi_cache_ttl_s, - &address_lookup_table_cache, - &is_connected_to_block_engine, - &ofac_addresses, - ) - .await; - is_connected_to_block_engine.store(false, Ordering::Relaxed); - - if let Err(e) = result { - error!("error authenticating and connecting: {:?}", e); - datapoint_error!("block_engine_relayer-error", - "block_engine_url" => block_engine_url, - "auth_service_url" => auth_service_url, - ("error", e.to_string(), String) - ); - sleep(Duration::from_secs(2)).await; + let block_engine_forwarder = block_engine_config.map(|config| { + Builder::new() + .name("block_engine_relayer_handler_thread".into()) + .spawn(move || { + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + while !exit.load(Ordering::Relaxed) { + let result = Self::auth_and_connect( + &config.block_engine_url, + &config.auth_service_url, + &mut block_engine_receiver, + &keypair, + &exit, + aoi_cache_ttl_s, + &address_lookup_table_cache, + &is_connected_to_block_engine, + &ofac_addresses, + ) + .await; + is_connected_to_block_engine.store(false, Ordering::Relaxed); + + if let Err(e) = result { + error!("error authenticating and connecting: {:?}", e); + datapoint_error!("block_engine_relayer-error", + "block_engine_url" => &config.block_engine_url, + "auth_service_url" => &config.auth_service_url, + ("error", e.to_string(), String) + ); + sleep(Duration::from_secs(2)).await; + } } - } - }); - }) - .unwrap(); + }); + }) + .unwrap() + }); BlockEngineRelayerHandler { block_engine_forwarder, } } - pub fn join(self) -> thread::Result<()> { - self.block_engine_forwarder.join() + pub fn join(self) { + if let Some(forwarder) = self.block_engine_forwarder { + forwarder.join().unwrap() + } } /// Relayers are whitelisted in the block engine. In order to auth, a challenge-response handshake diff --git a/core/src/ofac.rs b/core/src/ofac.rs index 92fcded3..337da02c 100644 --- a/core/src/ofac.rs +++ b/core/src/ofac.rs @@ -55,11 +55,9 @@ fn is_ofac_address_in_lookup_table( #[cfg(test)] mod tests { - use std::{collections::HashSet, sync::Arc, time::Duration}; + use std::collections::HashSet; - use crossbeam_channel::unbounded; use dashmap::DashMap; - use solana_perf::packet::PacketBatch; use solana_sdk::{ address_lookup_table_account::AddressLookupTableAccount, hash::Hash, @@ -167,7 +165,7 @@ mod tests { let lookup_table_pubkey = Pubkey::new_unique(); let lookup_table = AddressLookupTableAccount { - key: lookup_table_pubkey.clone(), + key: lookup_table_pubkey, addresses: vec![ofac_pubkey, Pubkey::new_unique()], }; @@ -281,7 +279,7 @@ mod tests { Hash::default(), ); let random_tx = VersionedTransaction::from(random_tx); - let random_packet = Packet::from_data(None, &random_tx).expect("can create packet"); + let random_packet = Packet::from_data(None, random_tx).expect("can create packet"); let ofac_tx = Transaction::new_signed_with_payer( &[Instruction::new_with_bytes( @@ -298,7 +296,7 @@ mod tests { Hash::default(), ); let ofac_tx = VersionedTransaction::from(ofac_tx); - let ofac_packet = Packet::from_data(None, &ofac_tx).expect("can create packet"); + let ofac_packet = Packet::from_data(None, ofac_tx).expect("can create packet"); assert!(!is_tx_ofac_related( &random_packet.deserialize_slice(..).unwrap(), diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8142c301..624eb0ea 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.73.0" +channel = "1.76.0" diff --git a/transaction-relayer/src/forwarder.rs b/transaction-relayer/src/forwarder.rs index e9d334b6..12b1d9b0 100644 --- a/transaction-relayer/src/forwarder.rs +++ b/transaction-relayer/src/forwarder.rs @@ -25,6 +25,7 @@ pub fn start_forward_and_delay_thread( packet_delay_ms: u32, block_engine_sender: tokio::sync::mpsc::Sender, num_threads: u64, + disable_mempool: bool, exit: &Arc, ) -> Vec> { const SLEEP_DURATION: Duration = Duration::from_millis(5); @@ -77,23 +78,26 @@ pub fn start_forward_and_delay_thread( // try_send because the block engine receiver only drains when it's connected // and we don't want to OOM on packet_receiver - match block_engine_sender.try_send(BlockEnginePackets { - banking_packet_batch: banking_packet_batch.clone(), - stamp: system_time, - expiration: packet_delay_ms, - }) { - Ok(_) => { - forwarder_metrics.num_be_packets_forwarded += num_packets; - } - Err(TrySendError::Closed(_)) => { - panic!( - "error sending packet batch to block engine handler" - ); - } - Err(TrySendError::Full(_)) => { - // block engine most likely not connected - forwarder_metrics.num_be_packets_dropped += num_packets; - forwarder_metrics.num_be_sender_full += 1; + if !disable_mempool { + match block_engine_sender.try_send(BlockEnginePackets { + banking_packet_batch: banking_packet_batch.clone(), + stamp: system_time, + expiration: packet_delay_ms, + }) { + Ok(_) => { + forwarder_metrics.num_be_packets_forwarded += + num_packets; + } + Err(TrySendError::Closed(_)) => { + panic!( + "error sending packet batch to block engine handler" + ); + } + Err(TrySendError::Full(_)) => { + // block engine most likely not connected + forwarder_metrics.num_be_packets_dropped += num_packets; + forwarder_metrics.num_be_sender_full += 1; + } } } buffered_packet_batches.push_back(RelayerPacketBatches { diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index 5038a485..70169c7a 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -17,7 +17,7 @@ use clap::Parser; use crossbeam_channel::tick; use dashmap::DashMap; use env_logger::Env; -use jito_block_engine::block_engine::BlockEngineRelayerHandler; +use jito_block_engine::block_engine::{BlockEngineConfig, BlockEngineRelayerHandler}; use jito_core::{ graceful_panic, tpu::{Tpu, TpuSockets}, @@ -50,6 +50,8 @@ use tikv_jemallocator::Jemalloc; use tokio::{runtime::Builder, signal, sync::mpsc::channel}; use tonic::transport::Server; +// no-op change to test ci + #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; @@ -117,7 +119,7 @@ struct Args { /// Address for Jito Block Engine. /// See https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details #[arg(long, env)] - block_engine_url: String, + block_engine_url: Option, /// Manual override for authentication service address of the block-engine. /// Defaults to `--block-engine-url` @@ -194,6 +196,10 @@ struct Args { /// Number of packets to send in each packet batch to the validator #[arg(long, env, default_value_t = 4)] validator_packet_batch_size: usize, + + /// Disable Mempool forwarding + #[arg(long, env, default_value_t = false)] + disable_mempool: bool, } #[derive(Debug)] @@ -356,14 +362,25 @@ fn main() { args.packet_delay_ms, block_engine_sender, 1, + args.disable_mempool, &exit, ); let is_connected_to_block_engine = Arc::new(AtomicBool::new(false)); + let block_engine_config = if !args.disable_mempool && args.block_engine_url.is_some() { + let block_engine_url = args.block_engine_url.unwrap(); + let auth_service_url = args + .block_engine_auth_service_url + .unwrap_or(block_engine_url.clone()); + Some(BlockEngineConfig { + block_engine_url, + auth_service_url, + }) + } else { + None + }; let block_engine_forwarder = BlockEngineRelayerHandler::new( - args.block_engine_url.clone(), - args.block_engine_auth_service_url - .unwrap_or(args.block_engine_url), + block_engine_config, block_engine_receiver, keypair, exit.clone(), @@ -479,7 +496,7 @@ fn main() { t.join().unwrap(); } lookup_table_refresher.join().unwrap(); - block_engine_forwarder.join().unwrap(); + block_engine_forwarder.join(); } pub async fn shutdown_signal(exit: Arc) {