-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
only reroute if relayer connected (jito-foundation#123) feat: add client tls config (jito-foundation#121) remove extra val (jito-foundation#129) fix clippy (jito-foundation#130) copy all binaries to docker-output (jito-foundation#131) Ledger tool halts at slot passed to create-snapshot (jito-foundation#118) update program submodule (jito-foundation#133) quick fix for tips and clearing old bundles (jito-foundation#135) update submodule to new program (jito-foundation#136) Improve stake-meta-generator usability (jito-foundation#134) pinning submodule head (jito-foundation#140) Use BundleAccountLocker when handling tip txs (jito-foundation#147) Add metrics for relayer + block engine proxy (jito-foundation#149) Build claim-mev in docker (jito-foundation#141) Rework bundle receiving and add metrics (jito-foundation#152) (jito-foundation#154) update submodule + dev files (jito-foundation#158) Deterministically find tip amounts, add meta to stake info, and cleanup pubkey/strings in MEV tips (jito-foundation#159) update jito-programs submodule (jito-foundation#160) Separate MEV tip related workflow (jito-foundation#161) Add block builder fee protos (jito-foundation#162) fix jito programs (jito-foundation#163) update submodule so autosnapshot exits out of ledger tool early (jito-foundation#164) Pipe through block builder fee (jito-foundation#167) pull in new snapshot code (jito-foundation#171) block builder bug (jito-foundation#172) Pull in new slack autosnapshot submodule (jito-foundation#174) sort stake meta json and use int math (jito-foundation#176) add accountsdb conn submod (jito-foundation#169) Update tip distribution parameters (jito-foundation#177) new submodules (jito-foundation#180) Add buildkite link for jito CI (jito-foundation#183) Fixed broken links to repositories (jito-foundation#184) Changed from ssh to https transfer for clone Seg/update submods (jito-foundation#187) fix tests (jito-foundation#190) rm geyser submod (jito-foundation#192) rm dangling geyser references (jito-foundation#193) fix syntax err (jito-foundation#195) use deterministic req ids in batch calls (jito-foundation#199) update jito-programs revert cargo update Cargo lock update with path fix fix cargo update autosnapshot with block lookback (jito-foundation#201) [JIT-460] When claiming mev tips, skip accounts that won't have min rent exempt amount after claiming (jito-foundation#203) Add logging for sol balance desired (jito-foundation#205) * add logging * add logging * update msg * tweak vars update submodule (jito-foundation#204) use efficient data structures when calling batch_simulate_bundles (jito-foundation#206) [JIT-504] Add low balance check in uploading merkle roots (jito-foundation#209) add config to simulate on top of working bank (jito-foundation#211) rm frozen bank check simulate_bundle rpc bugfixes (jito-foundation#214) rm frozen bank check in simulate_bundle rpc method [JIT-519] Store ClaimStatus address in merkle-root-json (jito-foundation#210) * add files * switch to include bump update submodule (jito-foundation#217) add amount filter (jito-foundation#218) update autosnapshot (jito-foundation#222) Print TX error in Bundles (jito-foundation#223) add new args to support single relayer and block-engine endpoints (jito-foundation#224) point to new jito-programs submod and invoke updated init tda instruction (jito-foundation#228) fix clippy errors (jito-foundation#230) fix validator start scripts (jito-foundation#232) Point README to gitbook (jito-foundation#237) use packaged cargo bin to build (jito-foundation#239) Add validator identity pubkey to StakeMeta (jito-foundation#226) The vote account associated with a validator is not a permanent link, so log the validator identity as well. bugfix: conditionally compile with debug flags (jito-foundation#240) Seg/tip distributor master (jito-foundation#242) * validate tree nodes * fix unit tests * pr feedback * bump jito-programs submod Simplify bootstrapping (jito-foundation#241) * startup without precompile * update spacing * use release mode * spacing fix validation rm validation skip Account for block builder fee when generating excess tip balance (jito-foundation#247) Improve docker caching delay constructing claim mev txs (jito-foundation#253) fix stake meta tests from bb fee (jito-foundation#254) fix tests Buffer bundles that exceed cost model (jito-foundation#225) * buffer bundles that exceed cost model clear qos failed bundles buffer if not leader soon (jito-foundation#260) update Cargo.lock to correct solana versions in jito-programs submodule (jito-foundation#265) fix simulate_bundle client and better error handling (jito-foundation#267) update submod (jito-foundation#272) Preallocate Bundle Cost (jito-foundation#238) fix Dockerfile (jito-foundation#278) Fix Tests (jito-foundation#279) Fix Tests (jito-foundation#281) * fix tests update jito-programs submod (jito-foundation#282) add reclaim rent workflow (jito-foundation#283) update jito-programs submod fix clippy errs rm wrong assertion and swap out file write fn call (jito-foundation#292) Remove security.md (jito-foundation#293) demote frequent relayer_stage-stream_error to warn (jito-foundation#275) account for case where TDA exists but not allocated (jito-foundation#295) implement better retries for tip-distributor workflows (jito-foundation#297) limit number of concurrent rpc calls (jito-foundation#298) Discard Empty Packet Batches (jito-foundation#299) Identity Hotswap (jito-foundation#290) small fixes (jito-foundation#305) Set backend config from admin rpc (jito-foundation#304) Admin Shred Receiver Change (jito-foundation#306) Seg/rm bundle UUID (jito-foundation#309) Fix github workflow to recursively clone (jito-foundation#327) Add recursive checkout for downstream-project-spl.yaml (jito-foundation#341) Use cluster info functions for tpu (jito-foundation#345) Use git rev-parse for git sha Remove blacklisted tx from message_hash_to_transaction (jito-foundation#374) Updates bootstrap and start scripts needed for local dev. (jito-foundation#384) Remove Deprecated Cli Args (jito-foundation#387) Master Rebase improve simulate_bundle errors and response (jito-foundation#404) derive Clone on accountoverrides (jito-foundation#416) Add upsert to AccountOverrides (jito-foundation#419) update jito-programs (jito-foundation#430) [JIT-1661] Faster Autosnapshot (jito-foundation#436) Reverts simulate_transaction result calls to upstream (jito-foundation#446) Don't unlock accounts in TransactionBatches used during simulation (jito-foundation#449) first pass at wiring up jito-plugin (jito-foundation#428) [JIT-1713] Fix bundle's blockspace preallocation (jito-foundation#489) [JIT-1708] Fix TOC TOU condition for relayer and block engine config (jito-foundation#491) [JIT-1710] - Optimize Bundle Consumer Checks (jito-foundation#490) Add Blockhash Metrics to Bundle Committer (jito-foundation#500) add priority fee ix to mev-claim (jito-foundation#520) Update Autosnapshot (jito-foundation#548) Run MEV claims + reclaiming rent-exempt amounts in parallel. (jito-foundation#582) Update CI (jito-foundation#584) - Add recursive submodule checkouts. - Re-add solana-secondary step Add more release fixes (jito-foundation#585) Fix more release urls (jito-foundation#588) [JIT-1812] Fix blocking mutexs (jito-foundation#495) [JIT-1711] Compare the unprocessed transaction storage BundleStorage against a constant instead of VecDeque::capacity() (jito-foundation#587) Automatically rebase Jito-Solana on a periodic basis. Send message on slack during any failures or success. Fix periodic rebase jito-foundation#594 Fixes the following bugs in the periodic rebase: Sends multiple messages on failure instead of one Cancels entire job if one branch fails Ignore buildkite curl errors for rebasing and try to keep curling until job times out (jito-foundation#597) Sleep longer waiting for buildkite to start (jito-foundation#598) correctly initialize account overrides (jito-foundation#595) Fix: Ensure set contact info to UDP port instead of QUIC (jito-foundation#603) Add fast replay branch to daily rebase (jito-foundation#607) take a snapshot of all bundle accounts before sim (jito-foundation#13) (jito-foundation#615) update jito-programs submodule Export agave binaries during docker build (BP jito-foundation#627) (jito-foundation#628) Backport jito-foundation#611 (jito-foundation#631) Publish releases to S3 and GCS (jito-foundation#633) (jito-foundation#634) Add packet flag for from staked sender (jito-foundation#655) Co-authored-by: Jed <4679729+jedleggett@users.noreply.github.com> Add bundle storage to new unprocessed transaction storage method Loosen tip requirement [v2.0] (jito-foundation#685) Add comments around ignoring the slot returned from ImmutableDeserializedPacket::build_sanitized_transaction
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
.dockerignore | ||
.git/ | ||
.github/ | ||
.gitignore | ||
.idea/ | ||
README.md | ||
Dockerfile | ||
f | ||
target/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
# This workflow runs a periodic rebase process, pulling in updates from an upstream repository | ||
# The workflow for rebasing a jito-solana branch to a solana labs branch locally is typically: | ||
# $ git checkout v1.17 | ||
# $ git pull --rebase # --rebase needed locally | ||
# $ git branch -D lb/v1.17_rebase # deletes branch from last v1.17 rebase | ||
# $ git checkout -b lb/v1.17_rebase | ||
# $ git fetch upstream | ||
# $ git rebase upstream/v1.17 # rebase + fix merge conflicts | ||
# $ git rebase --continue | ||
# $ git push origin +lb/v1.17_rebase # force needed to overwrite remote. wait for CI, fix if any issues | ||
# $ git checkout v1.17 | ||
# $ git reset --hard lb/v1.17_rebase | ||
# $ git push origin +v1.17 | ||
# | ||
# This workflow automates this process, with periodic status updates over slack. | ||
# It will also run CI and wait for it to pass before performing the force push to v1.17. | ||
# In the event there's a failure in the process, it's reported to slack and the job stops. | ||
|
||
name: "Rebase jito-solana from upstream anza-xyz/agave" | ||
|
||
on: | ||
# push: | ||
schedule: | ||
- cron: "30 18 * * 1-5" | ||
|
||
jobs: | ||
rebase: | ||
runs-on: ubuntu-latest | ||
strategy: | ||
matrix: | ||
include: | ||
- branch: master | ||
rebase: upstream/master | ||
- branch: v1.18 | ||
rebase: upstream/v1.18 | ||
- branch: v1.17 | ||
rebase: upstream/v1.17 | ||
# note: this will always be a day behind because we're rebasing from the previous day's rebase | ||
# and NOT upstream | ||
- branch: v1.17-fast-replay | ||
rebase: origin/v1.17 | ||
fail-fast: false | ||
steps: | ||
- uses: actions/checkout@v4 | ||
with: | ||
ref: ${{ matrix.branch }} | ||
submodules: recursive | ||
fetch-depth: 0 | ||
token: ${{ secrets.JITO_SOLANA_RELEASE_TOKEN }} | ||
- name: Add upstream | ||
run: git remote add upstream https://github.com/anza-xyz/agave.git | ||
- name: Fetch upstream | ||
run: git fetch upstream | ||
- name: Fetch origin | ||
run: git fetch origin | ||
- name: Set REBASE_BRANCH | ||
run: echo "REBASE_BRANCH=ci/nightly/${{ matrix.branch }}/$(date +'%Y-%m-%d-%H-%M')" >> $GITHUB_ENV | ||
- name: echo $REBASE_BRANCH | ||
run: echo $REBASE_BRANCH | ||
- name: Create rebase branch | ||
run: git checkout -b $REBASE_BRANCH | ||
- name: Setup email | ||
run: | | ||
git config --global user.email "infra@jito.wtf" | ||
git config --global user.name "Jito Infrastructure" | ||
- name: Rebase | ||
id: rebase | ||
run: git rebase ${{ matrix.rebase }} | ||
- name: Send warning for rebase error | ||
if: failure() && steps.rebase.outcome == 'failure' | ||
uses: slackapi/slack-github-action@v1.25.0 | ||
with: | ||
payload: | | ||
{ | ||
"text": "Nightly rebase on branch ${{ matrix.branch }}\nStatus: Rebase failed to apply cleanly" | ||
} | ||
env: | ||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} | ||
- name: Check if rebase applied | ||
id: check_rebase_applied | ||
run: | | ||
PRE_REBASE_SHA=$(git rev-parse ${{ matrix.branch }}) | ||
POST_REBASE_SHA=$(git rev-parse HEAD) | ||
if [ "$PRE_REBASE_SHA" = "$POST_REBASE_SHA" ]; then | ||
echo "No rebase was applied, exiting..." | ||
exit 1 | ||
else | ||
echo "Rebase applied successfully." | ||
fi | ||
- name: Send warning for rebase error | ||
if: failure() && steps.check_rebase_applied.outcome == 'failure' | ||
uses: slackapi/slack-github-action@v1.25.0 | ||
with: | ||
payload: | | ||
{ | ||
"text": "Nightly rebase on branch ${{ matrix.branch }}\nStatus: Rebase not needed" | ||
} | ||
env: | ||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} | ||
- name: Set REBASE_SHA | ||
run: echo "REBASE_SHA=$(git rev-parse HEAD)" >> $GITHUB_ENV | ||
- name: Push changes | ||
uses: ad-m/github-push-action@master | ||
with: | ||
github_token: ${{ secrets.GITHUB_TOKEN }} | ||
branch: ${{ env.REBASE_BRANCH }} | ||
- name: Wait for buildkite to start build | ||
run: sleep 300 | ||
- name: Wait for buildkite to finish | ||
id: wait_for_buildkite | ||
timeout-minutes: 300 | ||
run: | | ||
while true; do | ||
response=$(curl -s -f -H "Authorization: Bearer ${{ secrets.BUILDKITE_TOKEN }}" "https://api.buildkite.com/v2/organizations/jito/pipelines/jito-solana/builds?commit=${{ env.REBASE_SHA }}") | ||
if [ $? -ne 0 ]; then | ||
echo "Curl request failed." | ||
exit 1 | ||
fi | ||
state=$(echo $response | jq --exit-status -r '.[0].state') | ||
echo "Current build state: $state" | ||
# Check if the state is one of the finished states | ||
case $state in | ||
"passed"|"finished") | ||
echo "Build finished successfully." | ||
exit 0 | ||
;; | ||
"canceled"|"canceling"|"not_run") | ||
# ignoring "failing"|"failed" because flaky CI, can restart and hope it finishes or times out | ||
echo "Build failed or was cancelled." | ||
exit 2 | ||
;; | ||
esac | ||
sleep 30 | ||
done | ||
- name: Send failure update | ||
uses: slackapi/slack-github-action@v1.25.0 | ||
if: failure() && steps.wait_for_buildkite.outcome == 'failure' | ||
with: | ||
payload: | | ||
{ | ||
"text": "Nightly rebase on branch ${{ matrix.branch }}\nStatus: CI failed\nBranch: ${{ env.REBASE_BRANCH}}\nBuild: https://buildkite.com/jito/jito-solana/builds?commit=${{ env.REBASE_SHA }}" | ||
} | ||
env: | ||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} | ||
# check to see if different branch since CI build can take awhile and these steps are not atomic | ||
- name: Fetch the latest remote changes | ||
run: git fetch origin ${{ matrix.branch }} | ||
- name: Check if origin HEAD has changed from the beginning of the workflow | ||
run: | | ||
LOCAL_SHA=$(git rev-parse ${{ matrix.branch }}) | ||
ORIGIN_SHA=$(git rev-parse origin/${{ matrix.branch }}) | ||
if [ "$ORIGIN_SHA" != "$LOCAL_SHA" ]; then | ||
echo "The remote HEAD of ${{ matrix.branch }} does not match the local HEAD of ${{ matrix.branch }} at the beginning of CI." | ||
echo "origin sha: $ORIGIN_SHA" | ||
echo "local sha: $LOCAL_SHA" | ||
exit 1 | ||
else | ||
echo "The remote HEAD matches the local REBASE_SHA at the beginning of CI. Proceeding." | ||
fi | ||
- name: Reset ${{ matrix.branch }} to ${{ env.REBASE_BRANCH }} | ||
run: | | ||
git checkout ${{ matrix.branch }} | ||
git reset --hard ${{ env.REBASE_BRANCH }} | ||
- name: Push rebased %{{ matrix.branch }} | ||
uses: ad-m/github-push-action@master | ||
with: | ||
github_token: ${{ secrets.JITO_SOLANA_RELEASE_TOKEN }} | ||
branch: ${{ matrix.branch }} | ||
force: true | ||
- name: Send success update | ||
uses: slackapi/slack-github-action@v1.25.0 | ||
with: | ||
payload: | | ||
{ | ||
"text": "Nightly rebase on branch ${{ matrix.branch }}\nStatus: CI success, rebased, and pushed\nBranch: ${{ env.REBASE_BRANCH}}\nBuild: https://buildkite.com/jito/jito-solana/builds?commit=${{ env.REBASE_SHA }}" | ||
} | ||
env: | ||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ jobs: | |
with: | ||
ref: master | ||
fetch-depth: 0 | ||
submodules: 'recursive' | ||
|
||
- name: Setup Rust | ||
shell: bash | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
[submodule "anchor"] | ||
path = anchor | ||
url = https://github.com/jito-foundation/anchor.git | ||
[submodule "jito-programs"] | ||
path = jito-programs | ||
url = https://github.com/jito-foundation/jito-programs.git | ||
[submodule "jito-protos/protos"] | ||
path = jito-protos/protos | ||
url = https://github.com/jito-labs/mev-protos.git |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
#!/usr/bin/env bash | ||
set -eu | ||
|
||
BANK_HASH=$(cargo run --release --bin solana-ledger-tool -- -l config/bootstrap-validator bank-hash) | ||
|
||
# increase max file handle limit | ||
ulimit -Hn 1000000 | ||
|
||
# if above fails, run: | ||
# sudo bash -c 'echo "* hard nofile 1000000" >> /etc/security/limits.conf' | ||
|
||
# NOTE: make sure tip-payment and tip-distribution program are deployed using the correct pubkeys | ||
RUST_LOG=INFO,solana_core::bundle_stage=DEBUG \ | ||
NDEBUG=1 ./multinode-demo/bootstrap-validator.sh \ | ||
--wait-for-supermajority 0 \ | ||
--expected-bank-hash "$BANK_HASH" \ | ||
--block-engine-url http://127.0.0.1 \ | ||
--relayer-url http://127.0.0.1:11226 \ | ||
--rpc-pubsub-enable-block-subscription \ | ||
--enable-rpc-transaction-history \ | ||
--tip-payment-program-pubkey T1pyyaTNZsKv2WcRAB8oVnk93mLJw2XzjtVYqCsaHqt \ | ||
--tip-distribution-program-pubkey 4R3gSG8BpU4t19KYj8CfnbtRpnT8gtk4dvTHxVRwc2r7 \ | ||
--commission-bps 0 \ | ||
--shred-receiver-address 127.0.0.1:1002 \ | ||
--trust-relayer-packets \ | ||
--trust-block-engine-packets |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
[package] | ||
name = "solana-bundle" | ||
description = "Library related to handling bundles" | ||
documentation = "https://docs.rs/solana-bundle" | ||
readme = "../README.md" | ||
version = { workspace = true } | ||
authors = { workspace = true } | ||
repository = { workspace = true } | ||
homepage = { workspace = true } | ||
license = { workspace = true } | ||
edition = { workspace = true } | ||
|
||
[dependencies] | ||
anchor-lang = { workspace = true } | ||
itertools = { workspace = true } | ||
log = { workspace = true } | ||
serde = { workspace = true } | ||
solana-accounts-db = { workspace = true } | ||
solana-ledger = { workspace = true } | ||
solana-logger = { workspace = true } | ||
solana-measure = { workspace = true } | ||
solana-poh = { workspace = true } | ||
solana-program-runtime = { workspace = true } | ||
solana-runtime = { workspace = true } | ||
solana-sdk = { workspace = true } | ||
solana-svm = { workspace = true } | ||
solana-transaction-status = { workspace = true } | ||
thiserror = { workspace = true } | ||
|
||
[dev-dependencies] | ||
assert_matches = { workspace = true } | ||
solana-logger = { workspace = true } | ||
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } | ||
|
||
[lib] | ||
crate-type = ["lib"] | ||
name = "solana_bundle" |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
use { | ||
crate::bundle_execution::LoadAndExecuteBundleError, | ||
anchor_lang::error::Error, | ||
serde::{Deserialize, Serialize}, | ||
solana_poh::poh_recorder::PohRecorderError, | ||
solana_sdk::pubkey::Pubkey, | ||
thiserror::Error, | ||
}; | ||
|
||
pub mod bundle_execution; | ||
|
||
#[derive(Error, Debug, Clone, Serialize, Deserialize, PartialEq)] | ||
pub enum TipError { | ||
#[error("account is missing from bank: {0}")] | ||
AccountMissing(Pubkey), | ||
|
||
#[error("Anchor error: {0}")] | ||
AnchorError(String), | ||
|
||
#[error("Lock error")] | ||
LockError, | ||
|
||
#[error("Error executing initialize programs")] | ||
InitializeProgramsError, | ||
|
||
#[error("Error cranking tip programs")] | ||
CrankTipError, | ||
} | ||
|
||
impl From<anchor_lang::error::Error> for TipError { | ||
fn from(anchor_err: Error) -> Self { | ||
match anchor_err { | ||
Error::AnchorError(e) => Self::AnchorError(e.error_msg), | ||
Error::ProgramError(e) => Self::AnchorError(e.to_string()), | ||
} | ||
} | ||
} | ||
|
||
pub type BundleExecutionResult<T> = Result<T, BundleExecutionError>; | ||
|
||
#[derive(Error, Debug, Clone)] | ||
pub enum BundleExecutionError { | ||
#[error("The bank has hit the max allotted time for processing transactions")] | ||
BankProcessingTimeLimitReached, | ||
|
||
#[error("The bundle exceeds the cost model")] | ||
ExceedsCostModel, | ||
|
||
#[error("Runtime error while executing the bundle: {0}")] | ||
TransactionFailure(#[from] LoadAndExecuteBundleError), | ||
|
||
#[error("Error locking bundle because a transaction is malformed")] | ||
LockError, | ||
|
||
#[error("PoH record error: {0}")] | ||
PohRecordError(#[from] PohRecorderError), | ||
|
||
#[error("Tip payment error {0}")] | ||
TipError(#[from] TipError), | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
#![feature(test)] | ||
|
||
extern crate test; | ||
|
||
use { | ||
jito_protos::proto::packet::{ | ||
Meta as PbMeta, Packet as PbPacket, PacketBatch, PacketFlags as PbFlags, | ||
}, | ||
solana_core::proto_packet_to_packet, | ||
solana_sdk::packet::{Packet, PACKET_DATA_SIZE}, | ||
std::iter::repeat, | ||
test::{black_box, Bencher}, | ||
}; | ||
|
||
fn get_proto_packet(i: u8) -> PbPacket { | ||
PbPacket { | ||
data: repeat(i).take(PACKET_DATA_SIZE).collect(), | ||
meta: Some(PbMeta { | ||
size: PACKET_DATA_SIZE as u64, | ||
addr: "255.255.255.255:65535".to_string(), | ||
port: 65535, | ||
flags: Some(PbFlags { | ||
discard: false, | ||
forwarded: false, | ||
repair: false, | ||
simple_vote_tx: false, | ||
tracer_packet: false, | ||
from_staked_node: false, | ||
}), | ||
sender_stake: 0, | ||
}), | ||
} | ||
} | ||
|
||
#[bench] | ||
fn bench_proto_to_packet(bencher: &mut Bencher) { | ||
bencher.iter(|| { | ||
black_box(proto_packet_to_packet(get_proto_packet(1))); | ||
}); | ||
} | ||
|
||
#[bench] | ||
fn bench_batch_list_to_packets(bencher: &mut Bencher) { | ||
let packet_batch = PacketBatch { | ||
packets: (0..128).map(get_proto_packet).collect(), | ||
}; | ||
|
||
bencher.iter(|| { | ||
black_box( | ||
packet_batch | ||
.packets | ||
.iter() | ||
.map(|p| proto_packet_to_packet(p.clone())) | ||
.collect::<Vec<Packet>>(), | ||
); | ||
}); | ||
} |
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,326 @@ | ||
//! Handles pre-locking bundle accounts so that accounts bundles touch can be reserved ahead | ||
// of time for execution. Also, ensures that ALL accounts mentioned across a bundle are locked | ||
// to avoid race conditions between BundleStage and BankingStage. | ||
// | ||
// For instance, imagine a bundle with three transactions and the set of accounts for each transaction | ||
// is: {{A, B}, {B, C}, {C, D}}. We need to lock A, B, and C even though only one is executed at a time. | ||
// Imagine BundleStage is in the middle of processing {C, D} and we didn't have a lock on accounts {A, B, C}. | ||
// In this situation, there's a chance that BankingStage can process a transaction containing A or B | ||
// and commit the results before the bundle completes. By the time the bundle commits the new account | ||
// state for {A, B, C}, A and B would be incorrect and the entries containing the bundle would be | ||
// replayed improperly and that leader would have produced an invalid block. | ||
use { | ||
solana_runtime::bank::Bank, | ||
solana_sdk::{bundle::SanitizedBundle, pubkey::Pubkey, transaction::TransactionAccountLocks}, | ||
std::{ | ||
collections::{hash_map::Entry, HashMap, HashSet}, | ||
sync::{Arc, Mutex, MutexGuard}, | ||
}, | ||
thiserror::Error, | ||
}; | ||
|
||
#[derive(Clone, Error, Debug)] | ||
pub enum BundleAccountLockerError { | ||
#[error("locking error")] | ||
LockingError, | ||
} | ||
|
||
pub type BundleAccountLockerResult<T> = Result<T, BundleAccountLockerError>; | ||
|
||
pub struct LockedBundle<'a, 'b> { | ||
bundle_account_locker: &'a BundleAccountLocker, | ||
sanitized_bundle: &'b SanitizedBundle, | ||
bank: Arc<Bank>, | ||
} | ||
|
||
impl<'a, 'b> LockedBundle<'a, 'b> { | ||
pub fn new( | ||
bundle_account_locker: &'a BundleAccountLocker, | ||
sanitized_bundle: &'b SanitizedBundle, | ||
bank: &Arc<Bank>, | ||
) -> Self { | ||
Self { | ||
bundle_account_locker, | ||
sanitized_bundle, | ||
bank: bank.clone(), | ||
} | ||
} | ||
|
||
pub fn sanitized_bundle(&self) -> &SanitizedBundle { | ||
self.sanitized_bundle | ||
} | ||
} | ||
|
||
// Automatically unlock bundle accounts when destructed | ||
impl<'a, 'b> Drop for LockedBundle<'a, 'b> { | ||
fn drop(&mut self) { | ||
let _ = self | ||
.bundle_account_locker | ||
.unlock_bundle_accounts(self.sanitized_bundle, &self.bank); | ||
} | ||
} | ||
|
||
#[derive(Default, Clone)] | ||
pub struct BundleAccountLocks { | ||
read_locks: HashMap<Pubkey, u64>, | ||
write_locks: HashMap<Pubkey, u64>, | ||
} | ||
|
||
impl BundleAccountLocks { | ||
pub fn read_locks(&self) -> HashSet<Pubkey> { | ||
self.read_locks.keys().cloned().collect() | ||
} | ||
|
||
pub fn write_locks(&self) -> HashSet<Pubkey> { | ||
self.write_locks.keys().cloned().collect() | ||
} | ||
|
||
pub fn lock_accounts( | ||
&mut self, | ||
read_locks: HashMap<Pubkey, u64>, | ||
write_locks: HashMap<Pubkey, u64>, | ||
) { | ||
for (acc, count) in read_locks { | ||
*self.read_locks.entry(acc).or_insert(0) += count; | ||
} | ||
for (acc, count) in write_locks { | ||
*self.write_locks.entry(acc).or_insert(0) += count; | ||
} | ||
} | ||
|
||
pub fn unlock_accounts( | ||
&mut self, | ||
read_locks: HashMap<Pubkey, u64>, | ||
write_locks: HashMap<Pubkey, u64>, | ||
) { | ||
for (acc, count) in read_locks { | ||
if let Entry::Occupied(mut entry) = self.read_locks.entry(acc) { | ||
let val = entry.get_mut(); | ||
*val = val.saturating_sub(count); | ||
if entry.get() == &0 { | ||
let _ = entry.remove(); | ||
} | ||
} else { | ||
warn!("error unlocking read-locked account, account: {:?}", acc); | ||
} | ||
} | ||
for (acc, count) in write_locks { | ||
if let Entry::Occupied(mut entry) = self.write_locks.entry(acc) { | ||
let val = entry.get_mut(); | ||
*val = val.saturating_sub(count); | ||
if entry.get() == &0 { | ||
let _ = entry.remove(); | ||
} | ||
} else { | ||
warn!("error unlocking write-locked account, account: {:?}", acc); | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[derive(Clone, Default)] | ||
pub struct BundleAccountLocker { | ||
account_locks: Arc<Mutex<BundleAccountLocks>>, | ||
} | ||
|
||
impl BundleAccountLocker { | ||
/// used in BankingStage during TransactionBatch construction to ensure that BankingStage | ||
/// doesn't lock anything currently locked in the BundleAccountLocker | ||
pub fn read_locks(&self) -> HashSet<Pubkey> { | ||
self.account_locks.lock().unwrap().read_locks() | ||
} | ||
|
||
/// used in BankingStage during TransactionBatch construction to ensure that BankingStage | ||
/// doesn't lock anything currently locked in the BundleAccountLocker | ||
pub fn write_locks(&self) -> HashSet<Pubkey> { | ||
self.account_locks.lock().unwrap().write_locks() | ||
} | ||
|
||
/// used in BankingStage during TransactionBatch construction to ensure that BankingStage | ||
/// doesn't lock anything currently locked in the BundleAccountLocker | ||
pub fn account_locks(&self) -> MutexGuard<BundleAccountLocks> { | ||
self.account_locks.lock().unwrap() | ||
} | ||
|
||
/// Prepares a locked bundle and returns a LockedBundle containing locked accounts. | ||
/// When a LockedBundle is dropped, the accounts are automatically unlocked | ||
pub fn prepare_locked_bundle<'a, 'b>( | ||
&'a self, | ||
sanitized_bundle: &'b SanitizedBundle, | ||
bank: &Arc<Bank>, | ||
) -> BundleAccountLockerResult<LockedBundle<'a, 'b>> { | ||
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?; | ||
|
||
self.account_locks | ||
.lock() | ||
.unwrap() | ||
.lock_accounts(read_locks, write_locks); | ||
Ok(LockedBundle::new(self, sanitized_bundle, bank)) | ||
} | ||
|
||
/// Unlocks bundle accounts. Note that LockedBundle::drop will auto-drop the bundle account locks | ||
fn unlock_bundle_accounts( | ||
&self, | ||
sanitized_bundle: &SanitizedBundle, | ||
bank: &Bank, | ||
) -> BundleAccountLockerResult<()> { | ||
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?; | ||
|
||
self.account_locks | ||
.lock() | ||
.unwrap() | ||
.unlock_accounts(read_locks, write_locks); | ||
Ok(()) | ||
} | ||
|
||
/// Returns the read and write locks for this bundle | ||
/// Each lock type contains a HashMap which maps Pubkey to number of locks held | ||
fn get_read_write_locks( | ||
bundle: &SanitizedBundle, | ||
bank: &Bank, | ||
) -> BundleAccountLockerResult<(HashMap<Pubkey, u64>, HashMap<Pubkey, u64>)> { | ||
let transaction_locks: Vec<TransactionAccountLocks> = bundle | ||
.transactions | ||
.iter() | ||
.filter_map(|tx| { | ||
tx.get_account_locks(bank.get_transaction_account_lock_limit()) | ||
.ok() | ||
}) | ||
.collect(); | ||
|
||
if transaction_locks.len() != bundle.transactions.len() { | ||
return Err(BundleAccountLockerError::LockingError); | ||
} | ||
|
||
let bundle_read_locks = transaction_locks | ||
.iter() | ||
.flat_map(|tx| tx.readonly.iter().map(|a| **a)); | ||
let bundle_read_locks = | ||
bundle_read_locks | ||
.into_iter() | ||
.fold(HashMap::new(), |mut map, acc| { | ||
*map.entry(acc).or_insert(0) += 1; | ||
map | ||
}); | ||
|
||
let bundle_write_locks = transaction_locks | ||
.iter() | ||
.flat_map(|tx| tx.writable.iter().map(|a| **a)); | ||
let bundle_write_locks = | ||
bundle_write_locks | ||
.into_iter() | ||
.fold(HashMap::new(), |mut map, acc| { | ||
*map.entry(acc).or_insert(0) += 1; | ||
map | ||
}); | ||
|
||
Ok((bundle_read_locks, bundle_write_locks)) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use { | ||
crate::{ | ||
bundle_stage::bundle_account_locker::BundleAccountLocker, | ||
immutable_deserialized_bundle::ImmutableDeserializedBundle, | ||
packet_bundle::PacketBundle, | ||
}, | ||
solana_ledger::genesis_utils::create_genesis_config, | ||
solana_perf::packet::PacketBatch, | ||
solana_runtime::{bank::Bank, genesis_utils::GenesisConfigInfo}, | ||
solana_sdk::{ | ||
packet::Packet, signature::Signer, signer::keypair::Keypair, system_program, | ||
system_transaction::transfer, transaction::VersionedTransaction, | ||
}, | ||
solana_svm::transaction_error_metrics::TransactionErrorMetrics, | ||
std::collections::HashSet, | ||
}; | ||
|
||
#[test] | ||
fn test_simple_lock_bundles() { | ||
let GenesisConfigInfo { | ||
genesis_config, | ||
mint_keypair, | ||
.. | ||
} = create_genesis_config(2); | ||
let (bank, _) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); | ||
|
||
let bundle_account_locker = BundleAccountLocker::default(); | ||
|
||
let kp0 = Keypair::new(); | ||
let kp1 = Keypair::new(); | ||
|
||
let tx0 = VersionedTransaction::from(transfer( | ||
&mint_keypair, | ||
&kp0.pubkey(), | ||
1, | ||
genesis_config.hash(), | ||
)); | ||
let tx1 = VersionedTransaction::from(transfer( | ||
&mint_keypair, | ||
&kp1.pubkey(), | ||
1, | ||
genesis_config.hash(), | ||
)); | ||
|
||
let mut packet_bundle0 = PacketBundle { | ||
batch: PacketBatch::new(vec![Packet::from_data(None, &tx0).unwrap()]), | ||
bundle_id: tx0.signatures[0].to_string(), | ||
}; | ||
let mut packet_bundle1 = PacketBundle { | ||
batch: PacketBatch::new(vec![Packet::from_data(None, &tx1).unwrap()]), | ||
bundle_id: tx1.signatures[0].to_string(), | ||
}; | ||
|
||
let mut transaction_errors = TransactionErrorMetrics::default(); | ||
|
||
let sanitized_bundle0 = ImmutableDeserializedBundle::new(&mut packet_bundle0, None) | ||
.unwrap() | ||
.build_sanitized_bundle(&bank, &HashSet::default(), &mut transaction_errors) | ||
.expect("sanitize bundle 0"); | ||
let sanitized_bundle1 = ImmutableDeserializedBundle::new(&mut packet_bundle1, None) | ||
.unwrap() | ||
.build_sanitized_bundle(&bank, &HashSet::default(), &mut transaction_errors) | ||
.expect("sanitize bundle 1"); | ||
|
||
let locked_bundle0 = bundle_account_locker | ||
.prepare_locked_bundle(&sanitized_bundle0, &bank) | ||
.unwrap(); | ||
|
||
assert_eq!( | ||
bundle_account_locker.write_locks(), | ||
HashSet::from_iter([mint_keypair.pubkey(), kp0.pubkey()]) | ||
); | ||
assert_eq!( | ||
bundle_account_locker.read_locks(), | ||
HashSet::from_iter([system_program::id()]) | ||
); | ||
|
||
let locked_bundle1 = bundle_account_locker | ||
.prepare_locked_bundle(&sanitized_bundle1, &bank) | ||
.unwrap(); | ||
assert_eq!( | ||
bundle_account_locker.write_locks(), | ||
HashSet::from_iter([mint_keypair.pubkey(), kp0.pubkey(), kp1.pubkey()]) | ||
); | ||
assert_eq!( | ||
bundle_account_locker.read_locks(), | ||
HashSet::from_iter([system_program::id()]) | ||
); | ||
|
||
drop(locked_bundle0); | ||
assert_eq!( | ||
bundle_account_locker.write_locks(), | ||
HashSet::from_iter([mint_keypair.pubkey(), kp1.pubkey()]) | ||
); | ||
assert_eq!( | ||
bundle_account_locker.read_locks(), | ||
HashSet::from_iter([system_program::id()]) | ||
); | ||
|
||
drop(locked_bundle1); | ||
assert!(bundle_account_locker.write_locks().is_empty()); | ||
assert!(bundle_account_locker.read_locks().is_empty()); | ||
} | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
//! Deserializes PacketBundles | ||
use { | ||
crate::{ | ||
immutable_deserialized_bundle::{DeserializedBundleError, ImmutableDeserializedBundle}, | ||
packet_bundle::PacketBundle, | ||
}, | ||
crossbeam_channel::{Receiver, RecvTimeoutError}, | ||
solana_runtime::bank_forks::BankForks, | ||
solana_sdk::saturating_add_assign, | ||
std::{ | ||
sync::{Arc, RwLock}, | ||
time::{Duration, Instant}, | ||
}, | ||
}; | ||
|
||
/// Results from deserializing packet batches. | ||
#[derive(Debug)] | ||
pub struct ReceiveBundleResults { | ||
/// Deserialized bundles from all received bundle packets | ||
pub deserialized_bundles: Vec<ImmutableDeserializedBundle>, | ||
/// Number of dropped bundles | ||
pub num_dropped_bundles: usize, | ||
} | ||
|
||
pub struct BundlePacketDeserializer { | ||
/// Receiver for bundle packets | ||
bundle_packet_receiver: Receiver<Vec<PacketBundle>>, | ||
/// Provides working bank for deserializer to check feature activation | ||
bank_forks: Arc<RwLock<BankForks>>, | ||
/// Max packets per bundle | ||
max_packets_per_bundle: Option<usize>, | ||
} | ||
|
||
impl BundlePacketDeserializer { | ||
pub fn new( | ||
bundle_packet_receiver: Receiver<Vec<PacketBundle>>, | ||
bank_forks: Arc<RwLock<BankForks>>, | ||
max_packets_per_bundle: Option<usize>, | ||
) -> Self { | ||
Self { | ||
bundle_packet_receiver, | ||
bank_forks, | ||
max_packets_per_bundle, | ||
} | ||
} | ||
|
||
/// Handles receiving bundles and deserializing them | ||
pub fn receive_bundles( | ||
&self, | ||
recv_timeout: Duration, | ||
capacity: usize, | ||
) -> Result<ReceiveBundleResults, RecvTimeoutError> { | ||
let (bundle_count, _packet_count, mut bundles) = | ||
self.receive_until(recv_timeout, capacity)?; | ||
|
||
// Note: this can be removed after feature `round_compute_unit_price` is activated in | ||
// mainnet-beta | ||
let _working_bank = self.bank_forks.read().unwrap().working_bank(); | ||
let round_compute_unit_price_enabled = false; // TODO get from working_bank.feature_set | ||
|
||
Ok(Self::deserialize_and_collect_bundles( | ||
bundle_count, | ||
&mut bundles, | ||
round_compute_unit_price_enabled, | ||
self.max_packets_per_bundle, | ||
)) | ||
} | ||
|
||
/// Deserialize packet batches, aggregates tracer packet stats, and collect | ||
/// them into ReceivePacketResults | ||
fn deserialize_and_collect_bundles( | ||
bundle_count: usize, | ||
bundles: &mut [PacketBundle], | ||
round_compute_unit_price_enabled: bool, | ||
max_packets_per_bundle: Option<usize>, | ||
) -> ReceiveBundleResults { | ||
let mut deserialized_bundles = Vec::with_capacity(bundle_count); | ||
let mut num_dropped_bundles: usize = 0; | ||
|
||
for bundle in bundles.iter_mut() { | ||
match Self::deserialize_bundle( | ||
bundle, | ||
round_compute_unit_price_enabled, | ||
max_packets_per_bundle, | ||
) { | ||
Ok(deserialized_bundle) => { | ||
deserialized_bundles.push(deserialized_bundle); | ||
} | ||
Err(_) => { | ||
saturating_add_assign!(num_dropped_bundles, 1); | ||
} | ||
} | ||
} | ||
|
||
ReceiveBundleResults { | ||
deserialized_bundles, | ||
num_dropped_bundles, | ||
} | ||
} | ||
|
||
/// Receives bundle packets | ||
fn receive_until( | ||
&self, | ||
recv_timeout: Duration, | ||
bundle_count_upperbound: usize, | ||
) -> Result<(usize, usize, Vec<PacketBundle>), RecvTimeoutError> { | ||
let start = Instant::now(); | ||
|
||
let mut bundles = self.bundle_packet_receiver.recv_timeout(recv_timeout)?; | ||
let mut num_packets_received: usize = bundles.iter().map(|pb| pb.batch.len()).sum(); | ||
let mut num_bundles_received: usize = bundles.len(); | ||
|
||
if num_bundles_received <= bundle_count_upperbound { | ||
while let Ok(bundle_packets) = self.bundle_packet_receiver.try_recv() { | ||
trace!("got more packet batches in bundle packet deserializer"); | ||
|
||
saturating_add_assign!( | ||
num_packets_received, | ||
bundle_packets | ||
.iter() | ||
.map(|pb| pb.batch.len()) | ||
.sum::<usize>() | ||
); | ||
saturating_add_assign!(num_bundles_received, bundle_packets.len()); | ||
|
||
bundles.extend(bundle_packets); | ||
|
||
if start.elapsed() >= recv_timeout | ||
|| num_bundles_received >= bundle_count_upperbound | ||
{ | ||
break; | ||
} | ||
} | ||
} | ||
|
||
Ok((num_bundles_received, num_packets_received, bundles)) | ||
} | ||
|
||
/// Deserializes the Bundle into DeserializedBundlePackets, returning None if any packet in the | ||
/// bundle failed to deserialize | ||
pub fn deserialize_bundle( | ||
bundle: &mut PacketBundle, | ||
round_compute_unit_price_enabled: bool, | ||
max_packets_per_bundle: Option<usize>, | ||
) -> Result<ImmutableDeserializedBundle, DeserializedBundleError> { | ||
bundle.batch.iter_mut().for_each(|p| { | ||
p.meta_mut() | ||
.set_round_compute_unit_price(round_compute_unit_price_enabled); | ||
}); | ||
|
||
ImmutableDeserializedBundle::new(bundle, max_packets_per_bundle) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use { | ||
super::*, | ||
crossbeam_channel::unbounded, | ||
solana_ledger::genesis_utils::create_genesis_config, | ||
solana_perf::packet::PacketBatch, | ||
solana_runtime::{bank::Bank, genesis_utils::GenesisConfigInfo}, | ||
solana_sdk::{packet::Packet, signature::Signer, system_transaction::transfer}, | ||
}; | ||
|
||
#[test] | ||
fn test_deserialize_and_collect_bundles_empty() { | ||
let results = | ||
BundlePacketDeserializer::deserialize_and_collect_bundles(0, &mut [], false, Some(5)); | ||
assert_eq!(results.deserialized_bundles.len(), 0); | ||
assert_eq!(results.num_dropped_bundles, 0); | ||
} | ||
|
||
#[test] | ||
fn test_receive_bundles_capacity() { | ||
solana_logger::setup(); | ||
|
||
let GenesisConfigInfo { | ||
genesis_config, | ||
mint_keypair, | ||
.. | ||
} = create_genesis_config(10_000); | ||
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); | ||
|
||
let (sender, receiver) = unbounded(); | ||
|
||
let deserializer = BundlePacketDeserializer::new(receiver, bank_forks, Some(10)); | ||
|
||
let packet_bundles: Vec<_> = (0..10) | ||
.map(|_| PacketBundle { | ||
batch: PacketBatch::new(vec![Packet::from_data( | ||
None, | ||
transfer( | ||
&mint_keypair, | ||
&mint_keypair.pubkey(), | ||
100, | ||
genesis_config.hash(), | ||
), | ||
) | ||
.unwrap()]), | ||
bundle_id: String::default(), | ||
}) | ||
.collect(); | ||
|
||
sender.send(packet_bundles.clone()).unwrap(); | ||
|
||
let bundles = deserializer | ||
.receive_bundles(Duration::from_millis(100), 5) | ||
.unwrap(); | ||
// this is confusing, but it's sent as one batch | ||
assert_eq!(bundles.deserialized_bundles.len(), 10); | ||
assert_eq!(bundles.num_dropped_bundles, 0); | ||
|
||
// make sure empty | ||
assert_matches!( | ||
deserializer.receive_bundles(Duration::from_millis(100), 5), | ||
Err(RecvTimeoutError::Timeout) | ||
); | ||
|
||
// send 2x 10 size batches. capacity is 5, but will return 10 since that's the batch size | ||
sender.send(packet_bundles.clone()).unwrap(); | ||
sender.send(packet_bundles).unwrap(); | ||
let bundles = deserializer | ||
.receive_bundles(Duration::from_millis(100), 5) | ||
.unwrap(); | ||
assert_eq!(bundles.deserialized_bundles.len(), 10); | ||
assert_eq!(bundles.num_dropped_bundles, 0); | ||
|
||
let bundles = deserializer | ||
.receive_bundles(Duration::from_millis(100), 5) | ||
.unwrap(); | ||
assert_eq!(bundles.deserialized_bundles.len(), 10); | ||
assert_eq!(bundles.num_dropped_bundles, 0); | ||
|
||
assert_matches!( | ||
deserializer.receive_bundles(Duration::from_millis(100), 5), | ||
Err(RecvTimeoutError::Timeout) | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_receive_bundles_bad_bundles() { | ||
solana_logger::setup(); | ||
|
||
let GenesisConfigInfo { | ||
genesis_config, | ||
mint_keypair: _, | ||
.. | ||
} = create_genesis_config(10_000); | ||
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); | ||
|
||
let (sender, receiver) = unbounded(); | ||
|
||
let deserializer = BundlePacketDeserializer::new(receiver, bank_forks, Some(10)); | ||
|
||
let packet_bundles: Vec<_> = (0..10) | ||
.map(|_| PacketBundle { | ||
batch: PacketBatch::new(vec![]), | ||
bundle_id: String::default(), | ||
}) | ||
.collect(); | ||
sender.send(packet_bundles).unwrap(); | ||
|
||
let bundles = deserializer | ||
.receive_bundles(Duration::from_millis(100), 5) | ||
.unwrap(); | ||
// this is confusing, but it's sent as one batch | ||
assert_eq!(bundles.deserialized_bundles.len(), 0); | ||
assert_eq!(bundles.num_dropped_bundles, 10); | ||
} | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
use {solana_runtime::bank::Bank, solana_sdk::clock::Slot, std::sync::Arc}; | ||
|
||
/// Manager responsible for reserving `bundle_reserved_cost` during the first `reserved_ticks` of a bank | ||
/// and resetting the block cost limit to `block_cost_limit` after the reserved tick period is over | ||
pub struct BundleReservedSpaceManager { | ||
// the bank's cost limit | ||
block_cost_limit: u64, | ||
// bundles get this much reserved space for the first reserved_ticks | ||
bundle_reserved_cost: u64, | ||
// a reduced block_compute_limit is reserved for this many ticks, afterwards it goes back to full cost | ||
reserved_ticks: u64, | ||
last_slot_updated: Slot, | ||
} | ||
|
||
impl BundleReservedSpaceManager { | ||
pub fn new(block_cost_limit: u64, bundle_reserved_cost: u64, reserved_ticks: u64) -> Self { | ||
Self { | ||
block_cost_limit, | ||
bundle_reserved_cost, | ||
reserved_ticks, | ||
last_slot_updated: u64::MAX, | ||
} | ||
} | ||
|
||
/// Call this on creation of new bank and periodically while bundle processing | ||
/// to manage the block_cost_limits | ||
pub fn tick(&mut self, bank: &Arc<Bank>) { | ||
if self.last_slot_updated == bank.slot() && !self.is_in_reserved_tick_period(bank) { | ||
// new slot logic already ran, need to revert the block cost limit to original if | ||
// ticks are past the reserved tick mark | ||
debug!( | ||
"slot: {} ticks: {}, resetting block_cost_limit to {}", | ||
bank.slot(), | ||
bank.tick_height(), | ||
self.block_cost_limit | ||
); | ||
bank.write_cost_tracker() | ||
.unwrap() | ||
.set_block_cost_limit(self.block_cost_limit); | ||
} else if self.last_slot_updated != bank.slot() && self.is_in_reserved_tick_period(bank) { | ||
// new slot, if in the first max_tick - tick_height slots reserve space | ||
// otherwise can leave the current block limit as is | ||
let new_block_cost_limit = self.reduced_block_cost_limit(); | ||
debug!( | ||
"slot: {} ticks: {}, reserving block_cost_limit with block_cost_limit of {}", | ||
bank.slot(), | ||
bank.tick_height(), | ||
new_block_cost_limit | ||
); | ||
bank.write_cost_tracker() | ||
.unwrap() | ||
.set_block_cost_limit(new_block_cost_limit); | ||
self.last_slot_updated = bank.slot(); | ||
} | ||
} | ||
|
||
/// return true if the bank is still in the period where block_cost_limits is reduced | ||
pub fn is_in_reserved_tick_period(&self, bank: &Bank) -> bool { | ||
bank.tick_height() % bank.ticks_per_slot() < self.reserved_ticks | ||
} | ||
|
||
/// return the block_cost_limits as determined by the tick height of the bank | ||
pub fn expected_block_cost_limits(&self, bank: &Bank) -> u64 { | ||
if self.is_in_reserved_tick_period(bank) { | ||
self.reduced_block_cost_limit() | ||
} else { | ||
self.block_cost_limit() | ||
} | ||
} | ||
|
||
pub fn reduced_block_cost_limit(&self) -> u64 { | ||
self.block_cost_limit | ||
.saturating_sub(self.bundle_reserved_cost) | ||
} | ||
|
||
pub fn block_cost_limit(&self) -> u64 { | ||
self.block_cost_limit | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use { | ||
crate::bundle_stage::bundle_reserved_space_manager::BundleReservedSpaceManager, | ||
solana_ledger::genesis_utils::create_genesis_config, solana_runtime::bank::Bank, | ||
solana_sdk::pubkey::Pubkey, std::sync::Arc, | ||
}; | ||
|
||
#[test] | ||
fn test_reserve_block_cost_limits_during_reserved_ticks() { | ||
const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; | ||
|
||
let genesis_config_info = create_genesis_config(100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); | ||
|
||
let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); | ||
|
||
let mut reserved_space = BundleReservedSpaceManager::new( | ||
block_cost_limits, | ||
BUNDLE_BLOCK_COST_LIMITS_RESERVATION, | ||
5, | ||
); | ||
reserved_space.tick(&bank); | ||
|
||
assert_eq!( | ||
bank.read_cost_tracker().unwrap().block_cost_limit(), | ||
block_cost_limits - BUNDLE_BLOCK_COST_LIMITS_RESERVATION | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_dont_reserve_block_cost_limits_after_reserved_ticks() { | ||
const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; | ||
|
||
let genesis_config_info = create_genesis_config(100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); | ||
|
||
let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); | ||
|
||
for _ in 0..5 { | ||
bank.register_default_tick_for_test(); | ||
} | ||
|
||
let mut reserved_space = BundleReservedSpaceManager::new( | ||
block_cost_limits, | ||
BUNDLE_BLOCK_COST_LIMITS_RESERVATION, | ||
5, | ||
); | ||
reserved_space.tick(&bank); | ||
|
||
assert_eq!( | ||
bank.read_cost_tracker().unwrap().block_cost_limit(), | ||
block_cost_limits | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_dont_reset_block_cost_limits_during_reserved_ticks() { | ||
const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; | ||
|
||
let genesis_config_info = create_genesis_config(100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); | ||
|
||
let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); | ||
|
||
let mut reserved_space = BundleReservedSpaceManager::new( | ||
block_cost_limits, | ||
BUNDLE_BLOCK_COST_LIMITS_RESERVATION, | ||
5, | ||
); | ||
|
||
reserved_space.tick(&bank); | ||
bank.register_default_tick_for_test(); | ||
reserved_space.tick(&bank); | ||
|
||
assert_eq!( | ||
bank.read_cost_tracker().unwrap().block_cost_limit(), | ||
block_cost_limits - BUNDLE_BLOCK_COST_LIMITS_RESERVATION | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_reset_block_cost_limits_after_reserved_ticks() { | ||
const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; | ||
|
||
let genesis_config_info = create_genesis_config(100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); | ||
|
||
let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); | ||
|
||
let mut reserved_space = BundleReservedSpaceManager::new( | ||
block_cost_limits, | ||
BUNDLE_BLOCK_COST_LIMITS_RESERVATION, | ||
5, | ||
); | ||
|
||
reserved_space.tick(&bank); | ||
|
||
for _ in 0..5 { | ||
bank.register_default_tick_for_test(); | ||
} | ||
reserved_space.tick(&bank); | ||
|
||
assert_eq!( | ||
bank.read_cost_tracker().unwrap().block_cost_limit(), | ||
block_cost_limits | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_block_limits_after_first_slot() { | ||
const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; | ||
const RESERVED_TICKS: u64 = 5; | ||
let genesis_config_info = create_genesis_config(100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); | ||
|
||
for _ in 0..genesis_config_info.genesis_config.ticks_per_slot { | ||
bank.register_default_tick_for_test(); | ||
} | ||
assert!(bank.is_complete()); | ||
bank.freeze(); | ||
assert_eq!( | ||
bank.read_cost_tracker().unwrap().block_cost_limit(), | ||
solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, | ||
); | ||
|
||
let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &Pubkey::default(), 1)); | ||
assert_eq!(bank1.slot(), 1); | ||
assert_eq!(bank1.tick_height(), 64); | ||
assert_eq!(bank1.max_tick_height(), 128); | ||
|
||
// reserve space | ||
let block_cost_limits = bank1.read_cost_tracker().unwrap().block_cost_limit(); | ||
let mut reserved_space = BundleReservedSpaceManager::new( | ||
block_cost_limits, | ||
BUNDLE_BLOCK_COST_LIMITS_RESERVATION, | ||
RESERVED_TICKS, | ||
); | ||
reserved_space.tick(&bank1); | ||
|
||
// wait for reservation to be over | ||
(0..RESERVED_TICKS).for_each(|_| { | ||
bank1.register_default_tick_for_test(); | ||
assert_eq!( | ||
bank1.read_cost_tracker().unwrap().block_cost_limit(), | ||
block_cost_limits - BUNDLE_BLOCK_COST_LIMITS_RESERVATION | ||
); | ||
}); | ||
reserved_space.tick(&bank1); | ||
|
||
// after reservation, revert back to normal limit | ||
assert_eq!( | ||
bank1.read_cost_tracker().unwrap().block_cost_limit(), | ||
solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, | ||
); | ||
} | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
use { | ||
crate::banking_stage::{ | ||
committer::CommitTransactionDetails, | ||
leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, | ||
}, | ||
solana_bundle::bundle_execution::LoadAndExecuteBundleOutput, | ||
solana_ledger::blockstore_processor::TransactionStatusSender, | ||
solana_measure::measure_us, | ||
solana_runtime::{ | ||
bank::{Bank, ExecutedTransactionCounts, TransactionBalances, TransactionBalancesSet}, | ||
bank_utils, | ||
prioritization_fee_cache::PrioritizationFeeCache, | ||
vote_sender_types::ReplayVoteSender, | ||
}, | ||
solana_sdk::{hash::Hash, saturating_add_assign, transaction::SanitizedTransaction}, | ||
solana_svm::transaction_results::TransactionResults, | ||
solana_transaction_status::{ | ||
token_balances::{TransactionTokenBalances, TransactionTokenBalancesSet}, | ||
PreBalanceInfo, | ||
}, | ||
std::sync::Arc, | ||
}; | ||
|
||
#[derive(Clone, Debug, Default, PartialEq, Eq)] | ||
pub struct CommitBundleDetails { | ||
pub commit_transaction_details: Vec<Vec<CommitTransactionDetails>>, | ||
} | ||
|
||
pub struct Committer { | ||
transaction_status_sender: Option<TransactionStatusSender>, | ||
replay_vote_sender: ReplayVoteSender, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
} | ||
|
||
impl Committer { | ||
pub fn new( | ||
transaction_status_sender: Option<TransactionStatusSender>, | ||
replay_vote_sender: ReplayVoteSender, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
) -> Self { | ||
Self { | ||
transaction_status_sender, | ||
replay_vote_sender, | ||
prioritization_fee_cache, | ||
} | ||
} | ||
|
||
pub(crate) fn transaction_status_sender_enabled(&self) -> bool { | ||
self.transaction_status_sender.is_some() | ||
} | ||
|
||
/// Very similar to Committer::commit_transactions, but works with bundles. | ||
/// The main difference is there's multiple non-parallelizable transaction vectors to commit | ||
/// and post-balances are collected after execution instead of from the bank in Self::collect_balances_and_send_status_batch. | ||
#[allow(clippy::too_many_arguments)] | ||
pub(crate) fn commit_bundle<'a>( | ||
&self, | ||
bundle_execution_output: &'a mut LoadAndExecuteBundleOutput<'a>, | ||
last_blockhash: Hash, | ||
lamports_per_signature: u64, | ||
mut starting_transaction_index: Option<usize>, | ||
bank: &Arc<Bank>, | ||
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, | ||
) -> (u64, CommitBundleDetails) { | ||
let transaction_output = bundle_execution_output.bundle_transaction_results_mut(); | ||
|
||
let (commit_transaction_details, commit_times): (Vec<_>, Vec<_>) = transaction_output | ||
.iter_mut() | ||
.map(|bundle_results| { | ||
let executed_transactions_count = bundle_results | ||
.load_and_execute_transactions_output() | ||
.executed_transactions_count | ||
as u64; | ||
|
||
let executed_non_vote_transactions_count = bundle_results | ||
.load_and_execute_transactions_output() | ||
.executed_non_vote_transactions_count | ||
as u64; | ||
|
||
let executed_with_failure_result_count = bundle_results | ||
.load_and_execute_transactions_output() | ||
.executed_transactions_count | ||
.saturating_sub( | ||
bundle_results | ||
.load_and_execute_transactions_output() | ||
.executed_with_successful_result_count, | ||
) as u64; | ||
|
||
let signature_count = bundle_results | ||
.load_and_execute_transactions_output() | ||
.signature_count; | ||
|
||
let sanitized_transactions = bundle_results.transactions().to_vec(); | ||
let execution_results = bundle_results.execution_results().to_vec(); | ||
|
||
let loaded_transactions = bundle_results.loaded_transactions_mut(); | ||
debug!("loaded_transactions: {:?}", loaded_transactions); | ||
|
||
let (tx_results, commit_time_us) = measure_us!(bank.commit_transactions( | ||
&sanitized_transactions, | ||
loaded_transactions, | ||
execution_results, | ||
last_blockhash, | ||
lamports_per_signature, | ||
ExecutedTransactionCounts { | ||
executed_transactions_count, | ||
executed_non_vote_transactions_count, | ||
executed_with_failure_result_count, | ||
signature_count, | ||
}, | ||
&mut execute_and_commit_timings.execute_timings, | ||
)); | ||
|
||
let commit_transaction_statuses: Vec<_> = tx_results | ||
.execution_results | ||
.iter() | ||
.zip(tx_results.loaded_accounts_stats.iter()) | ||
.map(|(execution_result, loaded_accounts_stats)| { | ||
match execution_result.details() { | ||
// reports actual execution CUs, and actual loaded accounts size for | ||
// transaction committed to block. qos_service uses these information to adjust | ||
// reserved block space. | ||
Some(details) => CommitTransactionDetails::Committed { | ||
compute_units: details.executed_units, | ||
loaded_accounts_data_size: loaded_accounts_stats | ||
.as_ref() | ||
.map_or(0, |stats| stats.loaded_accounts_data_size), | ||
}, | ||
None => CommitTransactionDetails::NotCommitted, | ||
} | ||
}) | ||
.collect(); | ||
|
||
let ((), find_and_send_votes_us) = measure_us!({ | ||
bank_utils::find_and_send_votes( | ||
&sanitized_transactions, | ||
&tx_results, | ||
Some(&self.replay_vote_sender), | ||
); | ||
|
||
let post_balance_info = bundle_results.post_balance_info().clone(); | ||
let pre_balance_info = bundle_results.pre_balance_info(); | ||
|
||
let num_committed = tx_results | ||
.execution_results | ||
.iter() | ||
.filter(|r| r.was_executed()) | ||
.count(); | ||
|
||
self.collect_balances_and_send_status_batch( | ||
tx_results, | ||
bank, | ||
sanitized_transactions, | ||
pre_balance_info, | ||
post_balance_info, | ||
starting_transaction_index, | ||
); | ||
|
||
// NOTE: we're doing batched records, so we need to increment the poh starting_transaction_index | ||
// by number committed so the next batch will have the correct starting_transaction_index | ||
starting_transaction_index = | ||
starting_transaction_index.map(|starting_transaction_index| { | ||
starting_transaction_index.saturating_add(num_committed) | ||
}); | ||
|
||
self.prioritization_fee_cache | ||
.update(bank, bundle_results.executed_transactions().into_iter()); | ||
}); | ||
saturating_add_assign!( | ||
execute_and_commit_timings.find_and_send_votes_us, | ||
find_and_send_votes_us | ||
); | ||
|
||
(commit_transaction_statuses, commit_time_us) | ||
}) | ||
.unzip(); | ||
|
||
( | ||
commit_times.iter().sum(), | ||
CommitBundleDetails { | ||
commit_transaction_details, | ||
}, | ||
) | ||
} | ||
|
||
fn collect_balances_and_send_status_batch( | ||
&self, | ||
tx_results: TransactionResults, | ||
bank: &Arc<Bank>, | ||
sanitized_transactions: Vec<SanitizedTransaction>, | ||
pre_balance_info: &mut PreBalanceInfo, | ||
(post_balances, post_token_balances): (TransactionBalances, TransactionTokenBalances), | ||
starting_transaction_index: Option<usize>, | ||
) { | ||
if let Some(transaction_status_sender) = &self.transaction_status_sender { | ||
let mut transaction_index = starting_transaction_index.unwrap_or_default(); | ||
let batch_transaction_indexes: Vec<_> = tx_results | ||
.execution_results | ||
.iter() | ||
.map(|result| { | ||
if result.was_executed() { | ||
let this_transaction_index = transaction_index; | ||
saturating_add_assign!(transaction_index, 1); | ||
this_transaction_index | ||
} else { | ||
0 | ||
} | ||
}) | ||
.collect(); | ||
transaction_status_sender.send_transaction_status_batch( | ||
bank.clone(), | ||
sanitized_transactions, | ||
tx_results.execution_results, | ||
TransactionBalancesSet::new( | ||
std::mem::take(&mut pre_balance_info.native), | ||
post_balances, | ||
), | ||
TransactionTokenBalancesSet::new( | ||
std::mem::take(&mut pre_balance_info.token), | ||
post_token_balances, | ||
), | ||
tx_results.rent_debits, | ||
batch_transaction_indexes, | ||
); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
use { | ||
crate::{ | ||
bundle_stage::bundle_account_locker::BundleAccountLockerError, tip_manager::TipPaymentError, | ||
}, | ||
anchor_lang::error::Error, | ||
solana_bundle::bundle_execution::LoadAndExecuteBundleError, | ||
solana_poh::poh_recorder::PohRecorderError, | ||
thiserror::Error, | ||
}; | ||
|
||
pub type BundleExecutionResult<T> = Result<T, BundleExecutionError>; | ||
|
||
#[derive(Error, Debug, Clone)] | ||
pub enum BundleExecutionError { | ||
#[error("PoH record error: {0}")] | ||
PohRecordError(#[from] PohRecorderError), | ||
|
||
#[error("Bank is done processing")] | ||
BankProcessingDone, | ||
|
||
#[error("Execution error: {0}")] | ||
ExecutionError(#[from] LoadAndExecuteBundleError), | ||
|
||
#[error("The bundle exceeds the cost model")] | ||
ExceedsCostModel, | ||
|
||
#[error("Tip error {0}")] | ||
TipError(#[from] TipPaymentError), | ||
|
||
#[error("Error locking bundle")] | ||
LockError(#[from] BundleAccountLockerError), | ||
} | ||
|
||
impl From<anchor_lang::error::Error> for TipPaymentError { | ||
fn from(anchor_err: Error) -> Self { | ||
match anchor_err { | ||
Error::AnchorError(e) => Self::AnchorError(e.error_msg), | ||
Error::ProgramError(e) => Self::AnchorError(e.to_string()), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
use { | ||
solana_runtime::bank::Bank, | ||
solana_sdk::{clock::Epoch, pubkey::Pubkey}, | ||
std::collections::HashSet, | ||
}; | ||
|
||
#[derive(Default)] | ||
pub(crate) struct ConsensusCacheUpdater { | ||
last_epoch_updated: Epoch, | ||
consensus_accounts_cache: HashSet<Pubkey>, | ||
} | ||
|
||
impl ConsensusCacheUpdater { | ||
pub(crate) fn consensus_accounts_cache(&self) -> &HashSet<Pubkey> { | ||
&self.consensus_accounts_cache | ||
} | ||
|
||
/// Builds a HashSet of all consensus related accounts for the Bank's epoch | ||
fn get_consensus_accounts(bank: &Bank) -> HashSet<Pubkey> { | ||
let mut consensus_accounts: HashSet<Pubkey> = HashSet::new(); | ||
if let Some(epoch_stakes) = bank.epoch_stakes(bank.epoch()) { | ||
// votes use the following accounts: | ||
// - vote_account pubkey: writeable | ||
// - authorized_voter_pubkey: read-only | ||
// - node_keypair pubkey: payer (writeable) | ||
let node_id_vote_accounts = epoch_stakes.node_id_to_vote_accounts(); | ||
|
||
let vote_accounts = node_id_vote_accounts | ||
.values() | ||
.flat_map(|v| v.vote_accounts.clone()); | ||
|
||
// vote_account | ||
consensus_accounts.extend(vote_accounts); | ||
// authorized_voter_pubkey | ||
consensus_accounts.extend(epoch_stakes.epoch_authorized_voters().keys()); | ||
// node_keypair | ||
consensus_accounts.extend(epoch_stakes.node_id_to_vote_accounts().keys()); | ||
} | ||
consensus_accounts | ||
} | ||
|
||
/// Updates consensus-related accounts on epoch boundaries | ||
pub(crate) fn maybe_update(&mut self, bank: &Bank) -> bool { | ||
if bank.epoch() > self.last_epoch_updated { | ||
self.consensus_accounts_cache = Self::get_consensus_accounts(bank); | ||
self.last_epoch_updated = bank.epoch(); | ||
true | ||
} else { | ||
false | ||
} | ||
} | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
use solana_perf::packet::PacketBatch; | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct PacketBundle { | ||
pub batch: PacketBatch, | ||
pub bundle_id: String, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
use { | ||
crate::proxy::ProxyError, | ||
chrono::Utc, | ||
jito_protos::proto::auth::{ | ||
auth_service_client::AuthServiceClient, GenerateAuthChallengeRequest, | ||
GenerateAuthTokensRequest, RefreshAccessTokenRequest, Role, Token, | ||
}, | ||
solana_gossip::cluster_info::ClusterInfo, | ||
solana_sdk::signature::{Keypair, Signer}, | ||
std::{ | ||
sync::{Arc, Mutex}, | ||
time::Duration, | ||
}, | ||
tokio::time::timeout, | ||
tonic::{service::Interceptor, transport::Channel, Code, Request, Status}, | ||
}; | ||
|
||
/// Interceptor responsible for adding the access token to request headers. | ||
pub(crate) struct AuthInterceptor { | ||
/// The token added to each request header. | ||
access_token: Arc<Mutex<Token>>, | ||
} | ||
|
||
impl AuthInterceptor { | ||
pub(crate) fn new(access_token: Arc<Mutex<Token>>) -> Self { | ||
Self { access_token } | ||
} | ||
} | ||
|
||
impl Interceptor for AuthInterceptor { | ||
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> { | ||
request.metadata_mut().insert( | ||
"authorization", | ||
format!("Bearer {}", self.access_token.lock().unwrap().value) | ||
.parse() | ||
.unwrap(), | ||
); | ||
|
||
Ok(request) | ||
} | ||
} | ||
|
||
/// Generates an auth challenge then generates and returns validated auth tokens. | ||
pub async fn generate_auth_tokens( | ||
auth_service_client: &mut AuthServiceClient<Channel>, | ||
// used to sign challenges | ||
keypair: &Keypair, | ||
) -> crate::proxy::Result<( | ||
Token, /* access_token */ | ||
Token, /* refresh_token */ | ||
)> { | ||
debug!("generate_auth_challenge"); | ||
let challenge_response = auth_service_client | ||
.generate_auth_challenge(GenerateAuthChallengeRequest { | ||
role: Role::Validator as i32, | ||
pubkey: keypair.pubkey().as_ref().to_vec(), | ||
}) | ||
.await | ||
.map_err(|e: Status| { | ||
if e.code() == Code::PermissionDenied { | ||
ProxyError::AuthenticationPermissionDenied | ||
} else { | ||
ProxyError::AuthenticationError(e.to_string()) | ||
} | ||
})?; | ||
|
||
let formatted_challenge = format!( | ||
"{}-{}", | ||
keypair.pubkey(), | ||
challenge_response.into_inner().challenge | ||
); | ||
|
||
let signed_challenge = keypair | ||
.sign_message(formatted_challenge.as_bytes()) | ||
.as_ref() | ||
.to_vec(); | ||
|
||
debug!( | ||
"formatted_challenge: {} signed_challenge: {:?}", | ||
formatted_challenge, signed_challenge | ||
); | ||
|
||
debug!("generate_auth_tokens"); | ||
let auth_tokens = auth_service_client | ||
.generate_auth_tokens(GenerateAuthTokensRequest { | ||
challenge: formatted_challenge, | ||
client_pubkey: keypair.pubkey().as_ref().to_vec(), | ||
signed_challenge, | ||
}) | ||
.await | ||
.map_err(|e| ProxyError::AuthenticationError(e.to_string()))?; | ||
|
||
let inner = auth_tokens.into_inner(); | ||
let access_token = get_validated_token(inner.access_token)?; | ||
let refresh_token = get_validated_token(inner.refresh_token)?; | ||
|
||
Ok((access_token, refresh_token)) | ||
} | ||
|
||
/// Tries to refresh the access token or run full-reauth if needed. | ||
pub async fn maybe_refresh_auth_tokens( | ||
auth_service_client: &mut AuthServiceClient<Channel>, | ||
access_token: &Arc<Mutex<Token>>, | ||
refresh_token: &Token, | ||
cluster_info: &Arc<ClusterInfo>, | ||
connection_timeout: &Duration, | ||
refresh_within_s: u64, | ||
) -> crate::proxy::Result<( | ||
Option<Token>, // access token | ||
Option<Token>, // refresh token | ||
)> { | ||
let access_token_expiry: u64 = access_token | ||
.lock() | ||
.unwrap() | ||
.expires_at_utc | ||
.as_ref() | ||
.map(|ts| ts.seconds as u64) | ||
.unwrap_or_default(); | ||
let refresh_token_expiry: u64 = refresh_token | ||
.expires_at_utc | ||
.as_ref() | ||
.map(|ts| ts.seconds as u64) | ||
.unwrap_or_default(); | ||
|
||
let now = Utc::now().timestamp() as u64; | ||
|
||
let should_refresh_access = | ||
access_token_expiry.checked_sub(now).unwrap_or_default() <= refresh_within_s; | ||
let should_generate_new_tokens = | ||
refresh_token_expiry.checked_sub(now).unwrap_or_default() <= refresh_within_s; | ||
|
||
if should_generate_new_tokens { | ||
let kp = cluster_info.keypair().clone(); | ||
|
||
let (new_access_token, new_refresh_token) = timeout( | ||
*connection_timeout, | ||
generate_auth_tokens(auth_service_client, kp.as_ref()), | ||
) | ||
.await | ||
.map_err(|_| ProxyError::MethodTimeout("generate_auth_tokens".to_string()))? | ||
.map_err(|e| ProxyError::MethodError(e.to_string()))?; | ||
|
||
return Ok((Some(new_access_token), Some(new_refresh_token))); | ||
} else if should_refresh_access { | ||
let new_access_token = timeout( | ||
*connection_timeout, | ||
refresh_access_token(auth_service_client, refresh_token), | ||
) | ||
.await | ||
.map_err(|_| ProxyError::MethodTimeout("refresh_access_token".to_string()))? | ||
.map_err(|e| ProxyError::MethodError(e.to_string()))?; | ||
|
||
return Ok((Some(new_access_token), None)); | ||
} | ||
|
||
Ok((None, None)) | ||
} | ||
|
||
pub async fn refresh_access_token( | ||
auth_service_client: &mut AuthServiceClient<Channel>, | ||
refresh_token: &Token, | ||
) -> crate::proxy::Result<Token> { | ||
let response = auth_service_client | ||
.refresh_access_token(RefreshAccessTokenRequest { | ||
refresh_token: refresh_token.value.clone(), | ||
}) | ||
.await | ||
.map_err(|e| ProxyError::AuthenticationError(e.to_string()))?; | ||
get_validated_token(response.into_inner().access_token) | ||
} | ||
|
||
/// An invalid token is one where any of its fields are None or the token itself is None. | ||
/// Performs the necessary validations on the auth tokens before returning, | ||
/// i.e. it is safe to call .unwrap() on the token fields from the call-site. | ||
fn get_validated_token(maybe_token: Option<Token>) -> crate::proxy::Result<Token> { | ||
let token = maybe_token | ||
.ok_or_else(|| ProxyError::BadAuthenticationToken("received a null token".to_string()))?; | ||
if token.expires_at_utc.is_none() { | ||
Err(ProxyError::BadAuthenticationToken( | ||
"expires_at_utc field is null".to_string(), | ||
)) | ||
} else { | ||
Ok(token) | ||
} | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
use { | ||
crate::proxy::{HeartbeatEvent, ProxyError}, | ||
crossbeam_channel::{select, tick, Receiver, Sender}, | ||
solana_client::connection_cache::Protocol, | ||
solana_gossip::{cluster_info::ClusterInfo, contact_info}, | ||
solana_perf::packet::PacketBatch, | ||
std::{ | ||
net::SocketAddr, | ||
sync::{ | ||
atomic::{AtomicBool, Ordering}, | ||
Arc, | ||
}, | ||
thread::{self, Builder, JoinHandle}, | ||
time::{Duration, Instant}, | ||
}, | ||
}; | ||
|
||
const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(1500); // Empirically determined from load testing | ||
const DISCONNECT_DELAY: Duration = Duration::from_secs(60); | ||
const METRICS_CADENCE: Duration = Duration::from_secs(1); | ||
|
||
/// Manages switching between the validator's tpu ports and that of the proxy's. | ||
/// Switch-overs are triggered by late and missed heartbeats. | ||
pub struct FetchStageManager { | ||
t_hdl: JoinHandle<()>, | ||
} | ||
|
||
impl FetchStageManager { | ||
pub fn new( | ||
// ClusterInfo is used to switch between advertising the proxy's TPU ports and that of this validator's. | ||
cluster_info: Arc<ClusterInfo>, | ||
// Channel that heartbeats are received from. Entirely responsible for triggering switch-overs. | ||
heartbeat_rx: Receiver<HeartbeatEvent>, | ||
// Channel that packets from FetchStage are intercepted from. | ||
packet_intercept_rx: Receiver<PacketBatch>, | ||
// Intercepted packets get piped through here. | ||
packet_tx: Sender<PacketBatch>, | ||
exit: Arc<AtomicBool>, | ||
) -> Self { | ||
let t_hdl = Self::start( | ||
cluster_info, | ||
heartbeat_rx, | ||
packet_intercept_rx, | ||
packet_tx, | ||
exit, | ||
); | ||
|
||
Self { t_hdl } | ||
} | ||
|
||
/// Disconnect fetch behaviour | ||
/// Starts connected | ||
/// When connected and a packet is received, forward it | ||
/// When disconnected, packet is dropped | ||
/// When receiving heartbeat while connected and not pending disconnect | ||
/// Sets pending_disconnect to true and records time | ||
/// When receiving heartbeat while connected, and pending for > DISCONNECT_DELAY_SEC | ||
/// Sets fetch_connected to false, pending_disconnect to false | ||
/// Advertises TPU ports sent in heartbeat | ||
/// When tick is received without heartbeat_received | ||
/// Sets fetch_connected to true, pending_disconnect to false | ||
/// Advertises saved contact info | ||
fn start( | ||
cluster_info: Arc<ClusterInfo>, | ||
heartbeat_rx: Receiver<HeartbeatEvent>, | ||
packet_intercept_rx: Receiver<PacketBatch>, | ||
packet_tx: Sender<PacketBatch>, | ||
exit: Arc<AtomicBool>, | ||
) -> JoinHandle<()> { | ||
Builder::new().name("fetch-stage-manager".into()).spawn(move || { | ||
let my_fallback_contact_info = cluster_info.my_contact_info(); | ||
|
||
let mut fetch_connected = true; | ||
let mut heartbeat_received = false; | ||
let mut pending_disconnect = false; | ||
|
||
let mut pending_disconnect_ts = Instant::now(); | ||
|
||
let heartbeat_tick = tick(HEARTBEAT_TIMEOUT); | ||
let metrics_tick = tick(METRICS_CADENCE); | ||
let mut packets_forwarded = 0; | ||
let mut heartbeats_received = 0; | ||
loop { | ||
select! { | ||
recv(packet_intercept_rx) -> pkt => { | ||
match pkt { | ||
Ok(pkt) => { | ||
if fetch_connected { | ||
if packet_tx.send(pkt).is_err() { | ||
error!("{:?}", ProxyError::PacketForwardError); | ||
return; | ||
} | ||
packets_forwarded += 1; | ||
} | ||
} | ||
Err(_) => { | ||
warn!("packet intercept receiver disconnected, shutting down"); | ||
return; | ||
} | ||
} | ||
} | ||
recv(heartbeat_tick) -> _ => { | ||
if exit.load(Ordering::Relaxed) { | ||
break; | ||
} | ||
if !heartbeat_received && (!fetch_connected || pending_disconnect) { | ||
warn!("heartbeat late, reconnecting fetch stage"); | ||
fetch_connected = true; | ||
pending_disconnect = false; | ||
|
||
// yes, using UDP here is extremely confusing for the validator | ||
// since the entire network is running QUIC. However, it's correct. | ||
if let Err(e) = Self::set_tpu_addresses(&cluster_info, my_fallback_contact_info.tpu(Protocol::UDP).unwrap(), my_fallback_contact_info.tpu_forwards(Protocol::UDP).unwrap()) { | ||
error!("error setting tpu or tpu_fwd to ({:?}, {:?}), error: {:?}", my_fallback_contact_info.tpu(Protocol::UDP).unwrap(), my_fallback_contact_info.tpu_forwards(Protocol::UDP).unwrap(), e); | ||
} | ||
heartbeats_received = 0; | ||
} | ||
heartbeat_received = false; | ||
} | ||
recv(heartbeat_rx) -> tpu_info => { | ||
if let Ok((tpu_addr, tpu_forward_addr)) = tpu_info { | ||
heartbeats_received += 1; | ||
heartbeat_received = true; | ||
if fetch_connected && !pending_disconnect { | ||
info!("received heartbeat while fetch stage connected, pending disconnect after delay"); | ||
pending_disconnect_ts = Instant::now(); | ||
pending_disconnect = true; | ||
} | ||
if fetch_connected && pending_disconnect && pending_disconnect_ts.elapsed() > DISCONNECT_DELAY { | ||
info!("disconnecting fetch stage"); | ||
fetch_connected = false; | ||
pending_disconnect = false; | ||
if let Err(e) = Self::set_tpu_addresses(&cluster_info, tpu_addr, tpu_forward_addr) { | ||
error!("error setting tpu or tpu_fwd to ({:?}, {:?}), error: {:?}", tpu_addr, tpu_forward_addr, e); | ||
} | ||
} | ||
} else { | ||
{ | ||
warn!("relayer heartbeat receiver disconnected, shutting down"); | ||
return; | ||
} | ||
} | ||
} | ||
recv(metrics_tick) -> _ => { | ||
datapoint_info!( | ||
"relayer-heartbeat", | ||
("fetch_stage_packets_forwarded", packets_forwarded, i64), | ||
("heartbeats_received", heartbeats_received, i64), | ||
); | ||
|
||
} | ||
} | ||
} | ||
}).unwrap() | ||
} | ||
|
||
fn set_tpu_addresses( | ||
cluster_info: &Arc<ClusterInfo>, | ||
tpu_address: SocketAddr, | ||
tpu_forward_address: SocketAddr, | ||
) -> Result<(), contact_info::Error> { | ||
cluster_info.set_tpu(tpu_address)?; | ||
cluster_info.set_tpu_forwards(tpu_forward_address)?; | ||
Ok(()) | ||
} | ||
|
||
pub fn join(self) -> thread::Result<()> { | ||
self.t_hdl.join() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
//! This module contains logic for connecting to an external Relayer and Block Engine. | ||
//! The Relayer acts as an external TPU and TPU Forward socket while the Block Engine | ||
//! is tasked with streaming high value bundles to the validator. The validator can run | ||
//! in one of 3 modes: | ||
//! 1. Connected to Relayer and Block Engine. | ||
//! - This is the ideal mode as it increases the probability of building the most profitable blocks. | ||
//! 2. Connected only to Relayer. | ||
//! - A validator may choose to run in this mode if the main concern is to offload ingress traffic deduplication and sig-verification. | ||
//! 3. Connected only to Block Engine. | ||
//! - Running in this mode means pending transactions are not exposed to external actors. This mode is ideal if the validator wishes | ||
//! to accept bundles while maintaining some level of privacy for in-flight transactions. | ||
mod auth; | ||
pub mod block_engine_stage; | ||
pub mod fetch_stage_manager; | ||
pub mod relayer_stage; | ||
|
||
use { | ||
std::{ | ||
net::{AddrParseError, SocketAddr}, | ||
result, | ||
}, | ||
thiserror::Error, | ||
tonic::Status, | ||
}; | ||
|
||
type Result<T> = result::Result<T, ProxyError>; | ||
type HeartbeatEvent = (SocketAddr, SocketAddr); | ||
|
||
#[derive(Error, Debug)] | ||
pub enum ProxyError { | ||
#[error("grpc error: {0}")] | ||
GrpcError(#[from] Status), | ||
|
||
#[error("stream disconnected")] | ||
GrpcStreamDisconnected, | ||
|
||
#[error("heartbeat error")] | ||
HeartbeatChannelError, | ||
|
||
#[error("heartbeat expired")] | ||
HeartbeatExpired, | ||
|
||
#[error("error forwarding packet to banking stage")] | ||
PacketForwardError, | ||
|
||
#[error("missing tpu config: {0:?}")] | ||
MissingTpuSocket(String), | ||
|
||
#[error("invalid socket address: {0:?}")] | ||
InvalidSocketAddress(#[from] AddrParseError), | ||
|
||
#[error("invalid gRPC data: {0:?}")] | ||
InvalidData(String), | ||
|
||
#[error("timeout: {0:?}")] | ||
ConnectionError(#[from] tonic::transport::Error), | ||
|
||
#[error("AuthenticationConnectionTimeout")] | ||
AuthenticationConnectionTimeout, | ||
|
||
#[error("AuthenticationTimeout")] | ||
AuthenticationTimeout, | ||
|
||
#[error("AuthenticationConnectionError: {0:?}")] | ||
AuthenticationConnectionError(String), | ||
|
||
#[error("BlockEngineConnectionTimeout")] | ||
BlockEngineConnectionTimeout, | ||
|
||
#[error("BlockEngineTimeout")] | ||
BlockEngineTimeout, | ||
|
||
#[error("BlockEngineConnectionError: {0:?}")] | ||
BlockEngineConnectionError(String), | ||
|
||
#[error("RelayerConnectionTimeout")] | ||
RelayerConnectionTimeout, | ||
|
||
#[error("RelayerTimeout")] | ||
RelayerEngineTimeout, | ||
|
||
#[error("RelayerConnectionError: {0:?}")] | ||
RelayerConnectionError(String), | ||
|
||
#[error("AuthenticationError: {0:?}")] | ||
AuthenticationError(String), | ||
|
||
#[error("AuthenticationPermissionDenied")] | ||
AuthenticationPermissionDenied, | ||
|
||
#[error("BadAuthenticationToken: {0:?}")] | ||
BadAuthenticationToken(String), | ||
|
||
#[error("MethodTimeout: {0:?}")] | ||
MethodTimeout(String), | ||
|
||
#[error("MethodError: {0:?}")] | ||
MethodError(String), | ||
} |
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
#!/usr/bin/env sh | ||
# Deploys the tip payment and tip distribution programs on local validator at predetermined address | ||
set -eux | ||
|
||
WALLET_LOCATION=~/.config/solana/id.json | ||
|
||
# build this solana binary to ensure we're using a version compatible with the validator | ||
cargo b --release --bin solana | ||
|
||
./target/release/solana airdrop -ul 1000 $WALLET_LOCATION | ||
|
||
(cd jito-programs/tip-payment && anchor build) | ||
|
||
# NOTE: make sure the declare_id! is set correctly in the programs | ||
# Also, || true to make sure if fails the first time around, tip_payment can still be deployed | ||
RUST_INFO=trace ./target/release/solana deploy --keypair $WALLET_LOCATION -ul ./jito-programs/tip-payment/target/deploy/tip_distribution.so ./jito-programs/tip-payment/dev/dev_tip_distribution.json || true | ||
RUST_INFO=trace ./target/release/solana deploy --keypair $WALLET_LOCATION -ul ./jito-programs/tip-payment/target/deploy/tip_payment.so ./jito-programs/tip-payment/dev/dev_tip_payment.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
FROM rust:1.64-slim-bullseye as builder | ||
|
||
# Add Google Protocol Buffers for Libra's metrics library. | ||
ENV PROTOC_VERSION 3.8.0 | ||
ENV PROTOC_ZIP protoc-$PROTOC_VERSION-linux-x86_64.zip | ||
|
||
RUN set -x \ | ||
&& apt update \ | ||
&& apt install -y \ | ||
clang \ | ||
cmake \ | ||
libudev-dev \ | ||
make \ | ||
unzip \ | ||
libssl-dev \ | ||
pkg-config \ | ||
zlib1g-dev \ | ||
curl \ | ||
&& rustup component add rustfmt \ | ||
&& rustup component add clippy \ | ||
&& rustc --version \ | ||
&& cargo --version \ | ||
&& curl -OL https://github.com/google/protobuf/releases/download/v$PROTOC_VERSION/$PROTOC_ZIP \ | ||
&& unzip -o $PROTOC_ZIP -d /usr/local bin/protoc \ | ||
&& unzip -o $PROTOC_ZIP -d /usr/local include/* \ | ||
&& rm -f $PROTOC_ZIP | ||
|
||
|
||
WORKDIR /solana | ||
COPY . . | ||
RUN mkdir -p docker-output | ||
|
||
ARG ci_commit | ||
# NOTE: Keep this here before build since variable is referenced during CI build step. | ||
ENV CI_COMMIT=$ci_commit | ||
|
||
ARG debug | ||
|
||
# Uses docker buildkit to cache the image. | ||
# /usr/local/cargo/git needed for crossbeam patch | ||
RUN --mount=type=cache,mode=0777,target=/solana/target \ | ||
--mount=type=cache,mode=0777,target=/usr/local/cargo/registry \ | ||
--mount=type=cache,mode=0777,target=/usr/local/cargo/git \ | ||
if [ "$debug" = "false" ] ; then \ | ||
./cargo stable build --release && cp target/release/solana* ./docker-output && cp target/release/agave* ./docker-output; \ | ||
else \ | ||
RUSTFLAGS='-g -C force-frame-pointers=yes' ./cargo stable build --release && cp target/release/solana* ./docker-output && cp target/release/agave* ./docker-output; \ | ||
fi |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
#!/usr/bin/env bash | ||
# Builds jito-solana in a docker container. | ||
# Useful for running on machines that might not have cargo installed but can run docker (Flatcar Linux). | ||
# run `./f true` to compile with debug flags | ||
|
||
set -eux | ||
|
||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" | ||
|
||
GIT_SHA="$(git rev-parse --short HEAD)" | ||
|
||
echo "Git hash: $GIT_SHA" | ||
|
||
DEBUG_FLAGS=${1-false} | ||
|
||
DOCKER_BUILDKIT=1 docker build \ | ||
--build-arg debug=$DEBUG_FLAGS \ | ||
--build-arg ci_commit=$GIT_SHA \ | ||
-t jitolabs/build-solana \ | ||
-f dev/Dockerfile . \ | ||
--progress=plain | ||
|
||
# Creates a temporary container, copies solana-validator built inside container there and | ||
# removes the temporary container. | ||
docker rm temp || true | ||
docker container create --name temp jitolabs/build-solana | ||
mkdir -p $SCRIPT_DIR/docker-output | ||
# Outputs the solana-validator binary to $SOLANA/docker-output/solana-validator | ||
docker container cp temp:/solana/docker-output $SCRIPT_DIR/ | ||
docker rm temp |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
[package] | ||
name = "jito-protos" | ||
version = { workspace = true } | ||
edition = { workspace = true } | ||
publish = false | ||
|
||
[dependencies] | ||
bytes = { workspace = true } | ||
prost = { workspace = true } | ||
prost-types = { workspace = true } | ||
tonic = { workspace = true } | ||
|
||
[build-dependencies] | ||
tonic-build = { workspace = true } | ||
|
||
# windows users should install the protobuf compiler manually and set the PROTOC | ||
# envar to point to the installed binary | ||
[target."cfg(not(windows))".build-dependencies] | ||
protobuf-src = { workspace = true } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
use tonic_build::configure; | ||
|
||
fn main() -> Result<(), std::io::Error> { | ||
const PROTOC_ENVAR: &str = "PROTOC"; | ||
if std::env::var(PROTOC_ENVAR).is_err() { | ||
#[cfg(not(windows))] | ||
std::env::set_var(PROTOC_ENVAR, protobuf_src::protoc()); | ||
} | ||
|
||
let proto_base_path = std::path::PathBuf::from("protos"); | ||
let proto_files = [ | ||
"auth.proto", | ||
"block_engine.proto", | ||
"bundle.proto", | ||
"packet.proto", | ||
"relayer.proto", | ||
"shared.proto", | ||
]; | ||
let mut protos = Vec::new(); | ||
for proto_file in &proto_files { | ||
let proto = proto_base_path.join(proto_file); | ||
println!("cargo:rerun-if-changed={}", proto.display()); | ||
protos.push(proto); | ||
} | ||
|
||
configure() | ||
.build_client(true) | ||
.build_server(false) | ||
.type_attribute( | ||
"TransactionErrorType", | ||
"#[cfg_attr(test, derive(enum_iterator::Sequence))]", | ||
) | ||
.type_attribute( | ||
"InstructionErrorType", | ||
"#[cfg_attr(test, derive(enum_iterator::Sequence))]", | ||
) | ||
.compile(&protos, &[proto_base_path]) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
pub mod proto { | ||
pub mod auth { | ||
tonic::include_proto!("auth"); | ||
} | ||
|
||
pub mod block_engine { | ||
tonic::include_proto!("block_engine"); | ||
} | ||
|
||
pub mod bundle { | ||
tonic::include_proto!("bundle"); | ||
} | ||
|
||
pub mod packet { | ||
tonic::include_proto!("packet"); | ||
} | ||
|
||
pub mod relayer { | ||
tonic::include_proto!("relayer"); | ||
} | ||
|
||
pub mod shared { | ||
tonic::include_proto!("shared"); | ||
} | ||
} |