Skip to content

Commit

Permalink
update to prio graph 0.3.0 (anza-xyz#3254)
Browse files Browse the repository at this point in the history
* update to prio graph 0.3.0

* re-use prio-graph
  • Loading branch information
apfitzge authored and ray-kast committed Nov 27, 2024
1 parent 2db1942 commit 11cf164
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ members = [
"zk-token-sdk",
]

exclude = [
"programs/sbf",
"svm/tests/example-programs",
]
exclude = ["programs/sbf", "svm/tests/example-programs"]

resolver = "2"

Expand Down Expand Up @@ -337,7 +334,7 @@ percentage = "0.1.0"
pickledb = { version = "0.5.1", default-features = false }
predicates = "2.1"
pretty-hex = "0.3.0"
prio-graph = "0.2.1"
prio-graph = "0.3.0"
proc-macro2 = "1.0.89"
proptest = "1.5"
prost = "0.11.9"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,34 @@ use {
},
crossbeam_channel::{Receiver, Sender, TryRecvError},
itertools::izip,
prio_graph::{AccessKind, PrioGraph},
prio_graph::{AccessKind, GraphNode, PrioGraph},
solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS,
solana_measure::measure_us,
solana_sdk::{pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction},
};

#[inline(always)]
fn passthrough_priority(
id: &TransactionPriorityId,
_graph_node: &GraphNode<TransactionPriorityId>,
) -> TransactionPriorityId {
*id
}

type SchedulerPrioGraph = PrioGraph<
TransactionPriorityId,
Pubkey,
TransactionPriorityId,
fn(&TransactionPriorityId, &GraphNode<TransactionPriorityId>) -> TransactionPriorityId,
>;

pub(crate) struct PrioGraphScheduler {
in_flight_tracker: InFlightTracker,
account_locks: ThreadAwareAccountLocks,
consume_work_senders: Vec<Sender<ConsumeWork>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork>,
look_ahead_window_size: usize,
prio_graph: SchedulerPrioGraph,
}

impl PrioGraphScheduler {
Expand All @@ -44,6 +60,7 @@ impl PrioGraphScheduler {
consume_work_senders,
finished_consume_work_receiver,
look_ahead_window_size: 2048,
prio_graph: PrioGraph::new(passthrough_priority),
}
}

Expand Down Expand Up @@ -94,7 +111,6 @@ impl PrioGraphScheduler {
// these transactions to be scheduled before them.
let mut unschedulable_ids = Vec::new();
let mut blocking_locks = ReadWriteAccountSet::default();
let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id);

// Track metrics on filter.
let mut num_filtered_out: usize = 0;
Expand Down Expand Up @@ -150,7 +166,7 @@ impl PrioGraphScheduler {

// Create the initial look-ahead window.
// Check transactions against filter, remove from container if it fails.
chunked_pops(container, &mut prio_graph, &mut window_budget);
chunked_pops(container, &mut self.prio_graph, &mut window_budget);

let mut unblock_this_batch =
Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH);
Expand All @@ -160,11 +176,11 @@ impl PrioGraphScheduler {
let mut num_unschedulable: usize = 0;
while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if prio_graph.is_empty() {
if self.prio_graph.is_empty() {
break;
}

while let Some(id) = prio_graph.pop() {
while let Some(id) = self.prio_graph.pop() {
unblock_this_batch.push(id);

// Should always be in the container, during initial testing phase panic.
Expand Down Expand Up @@ -242,11 +258,11 @@ impl PrioGraphScheduler {

// Refresh window budget and do chunked pops
saturating_add_assign!(window_budget, unblock_this_batch.len());
chunked_pops(container, &mut prio_graph, &mut window_budget);
chunked_pops(container, &mut self.prio_graph, &mut window_budget);

// Unblock all transactions that were blocked by the transactions that were just sent.
for id in unblock_this_batch.drain(..) {
prio_graph.unblock(&id);
self.prio_graph.unblock(&id);
}
}

Expand All @@ -259,9 +275,13 @@ impl PrioGraphScheduler {
}

// Push remaining transactions back into the container
while let Some((id, _)) = prio_graph.pop_and_unblock() {
while let Some((id, _)) = self.prio_graph.pop_and_unblock() {
container.push_id_into_queue(id);
}
// No more remaining items in the queue.
// Clear here to make sure the next scheduling pass starts fresh
// without detecting any conflicts.
self.prio_graph.clear();

assert_eq!(
num_scheduled, num_sent,
Expand Down
7 changes: 5 additions & 2 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 11cf164

Please sign in to comment.