Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade relayer to v1.17 solana dependencies + refactor a few pieces #87

Merged
merged 22 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,660 changes: 2,153 additions & 1,507 deletions Cargo.lock

Large diffs are not rendered by default.

61 changes: 60 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,71 @@ members = [
"block_engine",
"core",
"jito-protos",
"packet_blaster",
# "packet_blaster", // TODO (LB): fix
"relayer",
"rpc",
"transaction-relayer",
"web"
]
resolver = "1"

[workspace.dependencies]
axum = "0.5.17"
bincode = "1.3.3"
bytes = "1.4.0"
cached = "0.42.0"
chrono = "0.4.24"
clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.8"
dashmap = "5.4.0"
ed25519-dalek = "1.0.1"
env_logger = "0.9"
futures-util = "0.3"
histogram = "0.6.9"
hostname = "0.3"
itertools = "0.10.5"
jito-block-engine = { path = "block_engine" }
jito-core = { path = "core" }
jito-protos = { path = "jito-protos" }
jito-relayer = { path = "relayer" }
jito-relayer-web = { path = "web" }
jito-rpc = { path = "rpc" }
jwt = { version = "0.16.0", features = ["openssl"] }
keyed_priority_queue = "0.4.1"
lazy_static = "1.4.0"
log = "0.4.17"
once_cell = "1"
openssl = "0.10.51"
prost = "0.12.1"
prost-types = "0.12.1"
quinn = "0.9"
rand = "0.8.5"
rayon = "1.7.0"
reqwest = "0.11.16"
rustls = { version = "0.20", features = ["dangerous_configuration"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
sha2 = "0.10.6"
solana-address-lookup-table-program = "1.17.5"
solana-client = "1.17.5"
solana-core = "1.17.5"
solana-gossip = "1.17.5"
solana-measure = "1.17.5"
solana-metrics = "1.17.5"
solana-net-utils = "1.17.5"
solana-perf = "1.17.5"
solana-program = "1.17.5"
solana-rayon-threadlimit = "1.17.5"
solana-runtime = "1.17.5"
solana-sdk = "1.17.5"
solana-streamer = "1.17.5"
thiserror = "1.0.40"
tikv-jemallocator = { version = "0.5", features = ["profiling"] }
tokio = { version = "1.29.1", features = ["full"] }
tokio-stream = "0.1.12"
tonic = { version = "0.10.2", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic-build = "0.10.2"
tower = { version = "0.4.1", features = ["limit"] }

[profile.release]
# thin has minimal overhead vs none (default): https://blog.llvm.org/2016/06/thinlto-scalable-and-incremental-lto.html
Expand Down
26 changes: 14 additions & 12 deletions block_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ edition = "2021"
publish = false

[dependencies]
cached = "0.42.0"
dashmap = "5.4.0"
jito-protos = { path = "../jito-protos" }
log = "0.4.17"
prost-types = "0.11.9"
solana-metrics = "=1.14.18"
solana-perf = "=1.14.18"
solana-sdk = "=1.14.18"
thiserror = "1.0.40"
tokio = { version = "~1.14.1", features = ["rt-multi-thread"] }
tokio-stream = "0.1.12"
tonic = { version = "0.9.2", features = ["tls", "tls-roots", "tls-webpki-roots"] }
cached = { workspace = true }
dashmap = { workspace = true }
jito-core = { workspace = true }
jito-protos = { workspace = true }
log = { workspace = true }
prost-types = { workspace = true }
solana-core = { workspace = true }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
84 changes: 53 additions & 31 deletions block_engine/src/block_engine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashSet,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -11,6 +12,7 @@ use std::{

use cached::{Cached, TimedCache};
use dashmap::DashMap;
use jito_core::ofac::is_tx_ofac_related;
use jito_protos::{
auth::{
auth_service_client::AuthServiceClient, GenerateAuthChallengeRequest,
Expand All @@ -23,15 +25,15 @@ use jito_protos::{
PacketBatchUpdate, ProgramsOfInterestRequest, ProgramsOfInterestUpdate,
},
convert::packet_to_proto_packet,
packet::{Packet as ProtoPacket, PacketBatch as ProtoPacketBatch},
packet::PacketBatch as ProtoPacketBatch,
shared::{Header, Heartbeat},
};
use log::{error, *};
use prost_types::Timestamp;
use solana_core::banking_trace::BankingPacketBatch;
use solana_metrics::{datapoint_error, datapoint_info};
use solana_perf::packet::PacketBatch;
use solana_sdk::{
address_lookup_table_account::AddressLookupTableAccount, pubkey::Pubkey, signature::Signer,
address_lookup_table::AddressLookupTableAccount, pubkey::Pubkey, signature::Signer,
signer::keypair::Keypair, transaction::VersionedTransaction,
};
use thiserror::Error;
Expand Down Expand Up @@ -75,7 +77,7 @@ impl Interceptor for AuthInterceptor {
}

pub struct BlockEnginePackets {
pub packet_batches: Vec<PacketBatch>,
pub banking_packet_batch: BankingPacketBatch,
pub stamp: SystemTime,
pub expiration: u32,
}
Expand Down Expand Up @@ -109,6 +111,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s: u64,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
is_connected_to_block_engine: &Arc<AtomicBool>,
ofac_addresses: HashSet<Pubkey>,
) -> BlockEngineRelayerHandler {
let is_connected_to_block_engine = is_connected_to_block_engine.clone();
let block_engine_forwarder = Builder::new()
Expand All @@ -126,6 +129,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s,
&address_lookup_table_cache,
&is_connected_to_block_engine,
&ofac_addresses,
)
.await;
is_connected_to_block_engine.store(false, Ordering::Relaxed);
Expand Down Expand Up @@ -215,6 +219,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s: u64,
address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
is_connected_to_block_engine: &Arc<AtomicBool>,
ofac_addresses: &HashSet<Pubkey>,
) -> BlockEngineResult<()> {
let mut auth_endpoint = Endpoint::from_str(auth_service_url).expect("valid auth url");
if auth_service_url.contains("https") {
Expand Down Expand Up @@ -279,6 +284,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s,
address_lookup_table_cache,
is_connected_to_block_engine,
ofac_addresses,
)
.await
}
Expand All @@ -300,6 +306,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s: u64,
address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
is_connected_to_block_engine: &Arc<AtomicBool>,
ofac_addresses: &HashSet<Pubkey>,
) -> BlockEngineResult<()> {
let subscribe_aoi_stream = client
.subscribe_accounts_of_interest(AccountsOfInterestRequest {})
Expand Down Expand Up @@ -331,6 +338,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s,
address_lookup_table_cache,
is_connected_to_block_engine,
ofac_addresses,
)
.await
}
Expand All @@ -349,6 +357,7 @@ impl BlockEngineRelayerHandler {
aoi_cache_ttl_s: u64,
address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
is_connected_to_block_engine: &Arc<AtomicBool>,
ofac_addresses: &HashSet<Pubkey>,
) -> BlockEngineResult<()> {
let mut aoi_stream = subscribe_aoi_stream.into_inner();
let mut poi_stream = subscribe_poi_stream.into_inner();
Expand Down Expand Up @@ -413,8 +422,12 @@ impl BlockEngineRelayerHandler {
.ok_or_else(|| BlockEngineError::BlockEngineFailure("block engine packet receiver disconnected".to_string()))?;

let now = Instant::now();
block_engine_stats.increment_num_packets_received(block_engine_batches.packet_batches.iter().map(|b|b.len() as u64).sum::<u64>());
let filtered_packets = Self::filter_packets(block_engine_batches, &mut accounts_of_interest, &mut programs_of_interest, address_lookup_table_cache);

// note: this contains discarded packets too
let num_packets: u64 = block_engine_batches.banking_packet_batch.0.iter().map(|b|b.len() as u64).sum::<u64>();
block_engine_stats.increment_num_packets_received(num_packets);

let filtered_packets = Self::filter_packets(block_engine_batches, &mut accounts_of_interest, &mut programs_of_interest, address_lookup_table_cache, ofac_addresses);
block_engine_stats.increment_packet_filter_elapsed_us(now.elapsed().as_micros() as u64);

if let Some(filtered_packets) = filtered_packets {
Expand Down Expand Up @@ -616,33 +629,42 @@ impl BlockEngineRelayerHandler {
accounts_of_interest: &mut TimedCache<Pubkey, u8>,
programs_of_interest: &mut TimedCache<Pubkey, u8>,
address_lookup_table_cache: &DashMap<Pubkey, AddressLookupTableAccount>,
ofac_addresses: &HashSet<Pubkey>,
) -> Option<ExpiringPacketBatch> {
let filtered_packets: Vec<ProtoPacket> = block_engine_batches
.packet_batches
.into_iter()
.flat_map(|b| {
b.iter()
.filter_map(|p| {
let tx: VersionedTransaction = p.deserialize_slice(..).ok()?;

let is_forwardable =
is_aoi_in_static_keys(&tx, accounts_of_interest, programs_of_interest)
|| is_aoi_in_lookup_table(
&tx,
accounts_of_interest,
programs_of_interest,
address_lookup_table_cache,
);

if is_forwardable {
Some(packet_to_proto_packet(p)?)
} else {
None
let mut filtered_packets = Vec::new();
jedleggett marked this conversation as resolved.
Show resolved Hide resolved

for batch in &block_engine_batches.banking_packet_batch.0 {
for packet in batch {
if packet.meta().discard() {
continue;
}

if !ofac_addresses.is_empty() {
if let Ok(tx) = packet.deserialize_slice::<VersionedTransaction, _>(..) {
if !is_tx_ofac_related(&tx, ofac_addresses, address_lookup_table_cache)
&& (is_aoi_in_static_keys(
&tx,
accounts_of_interest,
programs_of_interest,
) || is_aoi_in_lookup_table(
&tx,
accounts_of_interest,
programs_of_interest,
address_lookup_table_cache,
))
{
if let Some(packet) = packet_to_proto_packet(packet) {
filtered_packets.push(packet)
}
}
})
.collect::<Vec<ProtoPacket>>()
})
.collect::<Vec<ProtoPacket>>();
}
} else {
if let Some(packet) = packet_to_proto_packet(packet) {
buffalu marked this conversation as resolved.
Show resolved Hide resolved
filtered_packets.push(packet)
}
}
}
}

if !filtered_packets.is_empty() {
Some(ExpiringPacketBatch {
Expand Down
40 changes: 20 additions & 20 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ edition = "2021"
publish = false

[dependencies]
bincode = "1.3.3"
crossbeam-channel = "0.5.8"
dashmap = "5.4.0"
jito-rpc = { path = "../rpc" }
lazy_static = "1.4.0"
log = "0.4.17"
rayon = "1.7.0"
solana-client = "=1.14.18"
solana-core = "=1.14.18"
solana-gossip = "=1.14.18"
solana-measure = "=1.14.18"
solana-metrics = "=1.14.18"
solana-perf = "=1.14.18"
solana-rayon-threadlimit = "=1.14.18"
solana-runtime = "=1.14.18"
solana-sdk = "=1.14.18"
solana-streamer = "=1.14.18"
thiserror = "1.0.40"
tokio = { version = "~1.14.1", features = ["rt-multi-thread"] }
tokio-stream = "0.1.12"
bincode = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
jito-rpc = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
solana-client = { workspace = true }
solana-core = { workspace = true }
solana-gossip = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
2 changes: 1 addition & 1 deletion core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl FetchStage {
tpu_sender: &PacketBatchSender,
) -> FetchStageResult<()> {
let mark_forwarded = |packet: &mut Packet| {
packet.meta.flags |= PacketFlags::FORWARDED;
packet.meta_mut().flags |= PacketFlags::FORWARDED;
};

let mut packet_batch = tpu_forwards_receiver.recv()?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use log::*;

mod fetch_stage;
mod ofac_stage;
pub mod ofac;
mod staked_nodes_updater_service;
pub mod tpu;

Expand Down
Loading
Loading