Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Disable Mempool Flag #108

Merged
merged 10 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/actions/setup-rust/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ runs:

- name: Install cargo dependencies
run: |
rustup install nightly-2023-10-05
rustup component add rustfmt --toolchain nightly-2023-10-05-x86_64-unknown-linux-gnu
rustup component add clippy --toolchain nightly-2023-10-05-x86_64-unknown-linux-gnu
rustup install nightly-2024-02-01
rustup component add rustfmt --toolchain nightly-2024-02-01-x86_64-unknown-linux-gnu
rustup component add clippy --toolchain nightly-2024-02-01-x86_64-unknown-linux-gnu
cargo install cargo-udeps --locked
cargo install cargo-sort
shell: bash
Expand All @@ -63,6 +63,6 @@ runs:
run: |
rustc --version;
cargo --version;
cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu clippy --version;
cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu fmt --version;
cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu clippy --version;
cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu fmt --version;
shell: bash
4 changes: 2 additions & 2 deletions .github/workflows/clean_code.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
caller-workflow-name: clippy_and_udeps_check

- name: cargo clippy
run: cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu clippy --all-targets
run: cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu clippy --all-targets

- name: cargo udeps
run: cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu udeps --locked
run: cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu udeps --locked
83 changes: 45 additions & 38 deletions block_engine/src/block_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread,
thread::{Builder, JoinHandle},
time::{Duration, Instant, SystemTime},
};
Expand Down Expand Up @@ -53,6 +52,11 @@ use tonic::{

use crate::block_engine_stats::BlockEngineStats;

pub struct BlockEngineConfig {
pub block_engine_url: String,
pub auth_service_url: String,
}

#[derive(Clone)]
struct AuthInterceptor {
access_token: Arc<Mutex<Token>>,
Expand Down Expand Up @@ -95,16 +99,15 @@ pub type BlockEngineResult<T> = Result<T, BlockEngineError>;

/// Attempts to maintain a connection to a Block Engine and forward packets to it
pub struct BlockEngineRelayerHandler {
block_engine_forwarder: JoinHandle<()>,
block_engine_forwarder: Option<JoinHandle<()>>,
}

impl BlockEngineRelayerHandler {
const BLOCK_ENGINE_PACKET_QUEUE_CAPACITY: usize = 1_000;

#[allow(clippy::too_many_arguments)]
pub fn new(
block_engine_url: String,
auth_service_url: String,
block_engine_config: Option<BlockEngineConfig>,
mut block_engine_receiver: Receiver<BlockEnginePackets>,
keypair: Arc<Keypair>,
exit: Arc<AtomicBool>,
Expand All @@ -114,46 +117,50 @@ impl BlockEngineRelayerHandler {
ofac_addresses: HashSet<Pubkey>,
) -> BlockEngineRelayerHandler {
let is_connected_to_block_engine = is_connected_to_block_engine.clone();
let block_engine_forwarder = Builder::new()
.name("block_engine_relayer_handler_thread".into())
.spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
while !exit.load(Ordering::Relaxed) {
let result = Self::auth_and_connect(
&block_engine_url,
&auth_service_url,
&mut block_engine_receiver,
&keypair,
&exit,
aoi_cache_ttl_s,
&address_lookup_table_cache,
&is_connected_to_block_engine,
&ofac_addresses,
)
.await;
is_connected_to_block_engine.store(false, Ordering::Relaxed);

if let Err(e) = result {
error!("error authenticating and connecting: {:?}", e);
datapoint_error!("block_engine_relayer-error",
"block_engine_url" => block_engine_url,
"auth_service_url" => auth_service_url,
("error", e.to_string(), String)
);
sleep(Duration::from_secs(2)).await;
let block_engine_forwarder = block_engine_config.map(|config| {
Builder::new()
.name("block_engine_relayer_handler_thread".into())
.spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
while !exit.load(Ordering::Relaxed) {
let result = Self::auth_and_connect(
&config.block_engine_url,
&config.auth_service_url,
&mut block_engine_receiver,
&keypair,
&exit,
aoi_cache_ttl_s,
&address_lookup_table_cache,
&is_connected_to_block_engine,
&ofac_addresses,
)
.await;
is_connected_to_block_engine.store(false, Ordering::Relaxed);

if let Err(e) = result {
error!("error authenticating and connecting: {:?}", e);
datapoint_error!("block_engine_relayer-error",
"block_engine_url" => &config.block_engine_url,
"auth_service_url" => &config.auth_service_url,
("error", e.to_string(), String)
);
sleep(Duration::from_secs(2)).await;
}
}
}
});
})
.unwrap();
});
})
.unwrap()
});
BlockEngineRelayerHandler {
block_engine_forwarder,
}
}

pub fn join(self) -> thread::Result<()> {
self.block_engine_forwarder.join()
pub fn join(self) {
if let Some(forwarder) = self.block_engine_forwarder {
forwarder.join().unwrap()
}
}

/// Relayers are whitelisted in the block engine. In order to auth, a challenge-response handshake
Expand Down
10 changes: 4 additions & 6 deletions core/src/ofac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ fn is_ofac_address_in_lookup_table(

#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::collections::HashSet;

use crossbeam_channel::unbounded;
use dashmap::DashMap;
use solana_perf::packet::PacketBatch;
use solana_sdk::{
address_lookup_table_account::AddressLookupTableAccount,
hash::Hash,
Expand Down Expand Up @@ -167,7 +165,7 @@ mod tests {

let lookup_table_pubkey = Pubkey::new_unique();
let lookup_table = AddressLookupTableAccount {
key: lookup_table_pubkey.clone(),
key: lookup_table_pubkey,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clippy suggestion

addresses: vec![ofac_pubkey, Pubkey::new_unique()],
};

Expand Down Expand Up @@ -281,7 +279,7 @@ mod tests {
Hash::default(),
);
let random_tx = VersionedTransaction::from(random_tx);
let random_packet = Packet::from_data(None, &random_tx).expect("can create packet");
let random_packet = Packet::from_data(None, random_tx).expect("can create packet");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clippy suggestion


let ofac_tx = Transaction::new_signed_with_payer(
&[Instruction::new_with_bytes(
Expand All @@ -298,7 +296,7 @@ mod tests {
Hash::default(),
);
let ofac_tx = VersionedTransaction::from(ofac_tx);
let ofac_packet = Packet::from_data(None, &ofac_tx).expect("can create packet");
let ofac_packet = Packet::from_data(None, ofac_tx).expect("can create packet");

assert!(!is_tx_ofac_related(
&random_packet.deserialize_slice(..).unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.73.0"
channel = "1.76.0"
38 changes: 21 additions & 17 deletions transaction-relayer/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn start_forward_and_delay_thread(
packet_delay_ms: u32,
block_engine_sender: tokio::sync::mpsc::Sender<BlockEnginePackets>,
num_threads: u64,
disable_mempool: bool,
exit: &Arc<AtomicBool>,
) -> Vec<JoinHandle<()>> {
const SLEEP_DURATION: Duration = Duration::from_millis(5);
Expand Down Expand Up @@ -77,23 +78,26 @@ pub fn start_forward_and_delay_thread(

// try_send because the block engine receiver only drains when it's connected
// and we don't want to OOM on packet_receiver
match block_engine_sender.try_send(BlockEnginePackets {
banking_packet_batch: banking_packet_batch.clone(),
stamp: system_time,
expiration: packet_delay_ms,
}) {
Ok(_) => {
forwarder_metrics.num_be_packets_forwarded += num_packets;
}
Err(TrySendError::Closed(_)) => {
panic!(
"error sending packet batch to block engine handler"
);
}
Err(TrySendError::Full(_)) => {
// block engine most likely not connected
forwarder_metrics.num_be_packets_dropped += num_packets;
forwarder_metrics.num_be_sender_full += 1;
if !disable_mempool {
match block_engine_sender.try_send(BlockEnginePackets {
banking_packet_batch: banking_packet_batch.clone(),
stamp: system_time,
expiration: packet_delay_ms,
}) {
Ok(_) => {
forwarder_metrics.num_be_packets_forwarded +=
num_packets;
}
Err(TrySendError::Closed(_)) => {
panic!(
"error sending packet batch to block engine handler"
);
}
Err(TrySendError::Full(_)) => {
// block engine most likely not connected
forwarder_metrics.num_be_packets_dropped += num_packets;
forwarder_metrics.num_be_sender_full += 1;
}
}
}
buffered_packet_batches.push_back(RelayerPacketBatches {
Expand Down
29 changes: 23 additions & 6 deletions transaction-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use clap::Parser;
use crossbeam_channel::tick;
use dashmap::DashMap;
use env_logger::Env;
use jito_block_engine::block_engine::BlockEngineRelayerHandler;
use jito_block_engine::block_engine::{BlockEngineConfig, BlockEngineRelayerHandler};
use jito_core::{
graceful_panic,
tpu::{Tpu, TpuSockets},
Expand Down Expand Up @@ -50,6 +50,8 @@ use tikv_jemallocator::Jemalloc;
use tokio::{runtime::Builder, signal, sync::mpsc::channel};
use tonic::transport::Server;

// no-op change to test ci

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

Expand Down Expand Up @@ -117,7 +119,7 @@ struct Args {
/// Address for Jito Block Engine.
/// See https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details
#[arg(long, env)]
block_engine_url: String,
block_engine_url: Option<String>,

/// Manual override for authentication service address of the block-engine.
/// Defaults to `--block-engine-url`
Expand Down Expand Up @@ -194,6 +196,10 @@ struct Args {
/// Number of packets to send in each packet batch to the validator
#[arg(long, env, default_value_t = 4)]
validator_packet_batch_size: usize,

/// Disable Mempool forwarding
#[arg(long, env, default_value_t = false)]
disable_mempool: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -356,14 +362,25 @@ fn main() {
args.packet_delay_ms,
block_engine_sender,
1,
args.disable_mempool,
&exit,
);

let is_connected_to_block_engine = Arc::new(AtomicBool::new(false));
let block_engine_config = if !args.disable_mempool && args.block_engine_url.is_some() {
let block_engine_url = args.block_engine_url.unwrap();
let auth_service_url = args
.block_engine_auth_service_url
.unwrap_or(block_engine_url.clone());
Some(BlockEngineConfig {
block_engine_url,
auth_service_url,
})
} else {
None
};
let block_engine_forwarder = BlockEngineRelayerHandler::new(
args.block_engine_url.clone(),
args.block_engine_auth_service_url
.unwrap_or(args.block_engine_url),
block_engine_config,
block_engine_receiver,
keypair,
exit.clone(),
Expand Down Expand Up @@ -479,7 +496,7 @@ fn main() {
t.join().unwrap();
}
lookup_table_refresher.join().unwrap();
block_engine_forwarder.join().unwrap();
block_engine_forwarder.join();
}

pub async fn shutdown_signal(exit: Arc<AtomicBool>) {
Expand Down
Loading