Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Introduce SchedulingStateMachine for unified scheduler [NO-MERGE&REVIEW-ONLY-MODE] #35286

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a5503af
Introduce SchedulingStateMachine
ryoqun Feb 22, 2024
324b2d0
Apply all typo fixes from code review
ryoqun Feb 24, 2024
8d8eb3d
Update word wrapping
ryoqun Feb 24, 2024
a6022cc
Clarify Token::assume_exclusive_mutating_thread()
ryoqun Feb 24, 2024
a54bc7f
Use slice instead of &Vec<_>
ryoqun Feb 24, 2024
e0415ea
Improve non-const explanation
ryoqun Feb 25, 2024
446a2ff
Document consecutive readonly rescheduling opt.
ryoqun Feb 25, 2024
7a72de8
Make test_gradual_locking terminate for miri
ryoqun Feb 25, 2024
92a9ba4
Avoid unnecessary Task::clone()
ryoqun Feb 26, 2024
62e5a38
Rename: lock_{status,result} and no attempt_...()
ryoqun Feb 26, 2024
192f0f5
Add safety comment for get_account_locks_unchecked
ryoqun Feb 28, 2024
efee73c
Reduce and comment about Page::blocked_tasks cap.
ryoqun Feb 28, 2024
05d9e40
Document SchedulingStateMachine::schedule_task()
ryoqun Feb 28, 2024
b72fda0
Add justification of closure in create_task
ryoqun Feb 28, 2024
745f4d9
Use the From trait for PageUsage
ryoqun Feb 28, 2024
d99d06b
Replace unneeded if-let with .expect()
ryoqun Feb 29, 2024
8ceb1b5
Add helpful comments for peculiar crossbeam usage
ryoqun Feb 29, 2024
c62e835
Fix typo
ryoqun Feb 29, 2024
4c703a9
Make bug-bounty-exempt statement more clear
ryoqun Feb 29, 2024
1d4b087
Add test_enfoced_get_account_locks_verification
ryoqun Feb 29, 2024
eb87f1d
Fix typos...
ryoqun Feb 29, 2024
03a0bd1
Big rename: Page => UsageQueue
ryoqun Feb 29, 2024
fe2efa2
Document UsageQueueLoader
ryoqun Feb 29, 2024
16ac5b3
Various minor cleanings for beautifier diff
ryoqun Feb 29, 2024
737e473
Ensure reinitialize() is maintained for new fields
ryoqun Mar 1, 2024
4fcb360
Remove uneeded impl Send for TokenCell & doc upd.
ryoqun Mar 7, 2024
a0baa10
Apply typo fixes from code review
ryoqun Mar 7, 2024
2cbf64a
Merge similar tests into one
ryoqun Mar 7, 2024
3b9fb2c
Remove test_debug
ryoqun Mar 7, 2024
d072efd
Remove assertions of task_index()
ryoqun Mar 7, 2024
001b10e
Fix UB in TokenCell
ryoqun Mar 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4248,6 +4248,9 @@ impl Bank {
transaction: &'a SanitizedTransaction,
) -> TransactionBatch<'_, '_> {
let tx_account_lock_limit = self.get_transaction_account_lock_limit();
// Note that switching this to .get_account_locks_unchecked() is unacceptable currently.
// The unified scheduler relies on the checks enforced here.
// See a comment in SchedulingStateMachine::create_task().
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
let lock_result = transaction
.get_account_locks(tx_account_lock_limit)
.map(|_| ());
Expand Down
2 changes: 2 additions & 0 deletions unified-scheduler-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
assert_matches = { workspace = true }
solana-sdk = { workspace = true }
static_assertions = { workspace = true }
1,295 changes: 1,288 additions & 7 deletions unified-scheduler-logic/src/lib.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition = { workspace = true }
[dependencies]
assert_matches = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
derivative = { workspace = true }
log = { workspace = true }
solana-ledger = { workspace = true }
Expand Down
238 changes: 172 additions & 66 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! NOTE: While the unified scheduler is fully functional and moderately performant even with
//! mainnet-beta, it has known resource-exhaustion related security issues for replaying
//! specially-crafted blocks produced by malicious leaders. Thus, this experimental and
//! nondefault functionality is exempt from the bug bounty program for now.
//!
//! Transaction scheduling code.
//!
//! This crate implements 3 solana-runtime traits (`InstalledScheduler`, `UninstalledScheduler` and
Expand All @@ -10,7 +15,8 @@

use {
assert_matches::assert_matches,
crossbeam_channel::{select, unbounded, Receiver, SendError, Sender},
crossbeam_channel::{never, select, unbounded, Receiver, RecvError, SendError, Sender},
dashmap::DashMap,
derivative::Derivative,
log::*,
solana_ledger::blockstore_processor::{
Expand All @@ -26,8 +32,11 @@ use {
},
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::transaction::{Result, SanitizedTransaction},
solana_unified_scheduler_logic::Task,
solana_sdk::{
pubkey::Pubkey,
transaction::{Result, SanitizedTransaction},
},
solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue},
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
fmt::Debug,
Expand Down Expand Up @@ -90,10 +99,8 @@ where
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(1);
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
assert!(handler_count >= 1);

Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
Expand Down Expand Up @@ -386,13 +393,35 @@ mod chained_channel {
}
}

/// The primary owner of all [`UsageQueue`]s used for particular [`PooledScheduler`].
///
/// Currently, the simplest implementation. This grows memory usage in unbounded way. Cleaning will
/// be added later. This struct is here to be put outside `solana-unified-scheduler-logic` for the
/// crate's original intent (separation of logics from this crate). Some practical and mundane
/// pruning will be implemented in this type.
#[derive(Default, Debug)]
pub struct UsageQueueLoader {
usage_queues: DashMap<Pubkey, UsageQueue>,
}

impl UsageQueueLoader {
pub fn load(&self, address: Pubkey) -> UsageQueue {
self.usage_queues.entry(address).or_default().clone()
}
}

// (this is slow needing atomic mem reads. However, this can be turned into a lot faster
// optimizer-friendly version as shown in this crossbeam pr:
// https://github.com/crossbeam-rs/crossbeam/pull/1047)
fn disconnected<T>() -> Receiver<T> {
// drop the sender residing at .0, returning an always-disconnected receiver.
unbounded().1
}

fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}

// Currently, simplest possible implementation (i.e. single-threaded)
// this will be replaced with more proper implementation...
// not usable at all, especially for mainnet-beta
#[derive(Debug)]
pub struct PooledScheduler<TH: TaskHandler> {
inner: PooledSchedulerInner<Self, TH>,
Expand All @@ -402,6 +431,7 @@ pub struct PooledScheduler<TH: TaskHandler> {
#[derive(Debug)]
pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
thread_manager: ThreadManager<S, TH>,
usage_queue_loader: UsageQueueLoader,
}

// This type manages the OS threads for scheduling and executing transactions. The term
Expand All @@ -427,6 +457,7 @@ impl<TH: TaskHandler> PooledScheduler<TH> {
Self::from_inner(
PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
},
initial_context,
)
Expand Down Expand Up @@ -518,7 +549,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let new_task_receiver = self.new_task_receiver.clone();

let mut session_ending = false;
let mut active_task_count: usize = 0;

// Now, this is the main loop for the scheduler thread, which is a special beast.
//
Expand Down Expand Up @@ -558,61 +588,97 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// cycles out of the scheduler thread. Thus, any kinds of unessential overhead sources
// like syscalls, VDSO, and even memory (de)allocation should be avoided at all costs
// by design or by means of offloading at the last resort.
move || loop {
let mut is_finished = false;
while !is_finished {
select! {
recv(finished_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

active_task_count = active_task_count.checked_sub(1).unwrap();
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
// so, we're NOT scheduling at all here; rather, just execute
// tx straight off. the inter-tx locking deps aren't needed to
// be resolved in the case of single-threaded FIFO like this.
runnable_task_sender
.send_payload(task)
.unwrap();
active_task_count = active_task_count.checked_add(1).unwrap();
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
result_with_timings.replace(initialized_result_with_timings()),
None
);
}
NewTaskPayload::CloseSubchannel => {
session_ending = true;
}
}
},
};
move || {
let (do_now, dont_now) = (&disconnected::<()>(), &never::<()>());
let dummy_receiver = |trigger| {
if trigger {
do_now
} else {
dont_now
}
};

// a really simplistic termination condition, which only works under the
// assumption of single handler thread...
is_finished = session_ending && active_task_count == 0;
}
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
};

if session_ending {
session_result_sender
.send(Some(
result_with_timings
.take()
.unwrap_or_else(initialized_result_with_timings),
))
.unwrap();
session_ending = false;
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never exit this loop except for panic? I think we may have already talked about this in a previous PR, and you're just planning on error-handling in a follow-up. Please correct me if I'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never exit this loop except for panic? I think we may have already talked about this in a previous PR, and you're just planning on error-handling in a follow-up.

This understanding is completely aligns with what i said previously. The proper scheduler management code is complicated, flavored to my own taste (expecting back-and-force review session), (i.e. yet another beast by itself). so, i wanted to focus on the logic with this pr.

let mut is_finished = false;
while !is_finished {
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl,
// which isn't great and is inconsistent with `if`s in the Rust's match
// arm. So, eagerly binding the result to a variable unconditionally here
// makes no perf. difference...
let dummy_unblocked_task_receiver =
dummy_receiver(state_machine.has_unblocked_task());

// (Assume this is biased; i.e. select_biased! in this crossbeam pr:
// https://github.com/rust-lang/futures-rs/pull/1976)
//
// There's something special called dummy_unblocked_task_receiver here.
// This odd pattern was needed to react to newly unblocked tasks from
// _not-crossbeam-channel_ event sources, precisely at the specified
// precedence among other selectors, while delegating the conrol flow to
// select_biased!.
//
// In this way, hot looping is avoided and overall control flow is much
// consistent. Note that unified scheduler will go
// into busy looping to seek lowest latency eventually. However, not now,
// to measure _actual_ cpu usage easily with the select approach.
select! {
recv(finished_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
recv(dummy_unblocked_task_receiver) -> dummy => {
assert_matches!(dummy, Err(RecvError));

let task = state_machine.schedule_unblocked_task().expect("unblocked task");
runnable_task_sender.send_payload(task).unwrap();
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
if let Some(task) = state_machine.schedule_task(task) {
runnable_task_sender.send_payload(task).unwrap();
}
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
result_with_timings.replace(initialized_result_with_timings()),
None
);
}
NewTaskPayload::CloseSubchannel => {
session_ending = true;
}
}
},
};

is_finished = session_ending && state_machine.has_no_active_task();
}

if session_ending {
state_machine.reinitialize();
session_result_sender
.send(Some(
result_with_timings
.take()
.unwrap_or_else(initialized_result_with_timings),
))
.unwrap();
session_ending = false;
}
}
}
};
Expand Down Expand Up @@ -741,7 +807,9 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
}

fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) {
let task = Task::create_task(transaction.clone(), index);
let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| {
self.inner.usage_queue_loader.load(pubkey)
});
self.inner.thread_manager.send_task(task);
}

Expand Down Expand Up @@ -1023,7 +1091,7 @@ mod tests {
.result,
Ok(_)
);
scheduler.schedule_execution(&(good_tx_after_bad_tx, 0));
scheduler.schedule_execution(&(good_tx_after_bad_tx, 1));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is due to task_index is started to be assert!()-ed....

scheduler.pause_for_recent_blockhash();
// transaction_count should remain same as scheduler should be bailing out.
// That's because we're testing the serialized failing execution case in this test.
Expand Down Expand Up @@ -1247,4 +1315,42 @@ mod tests {
4
);
}

// See comment in SchedulingStateMachine::create_task() for the justification of this test
#[test]
fn test_enfoced_get_account_locks_validation() {
solana_logger::setup();

let GenesisConfigInfo {
genesis_config,
ref mint_keypair,
..
} = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank = &setup_dummy_fork_graph(bank);

let mut tx = system_transaction::transfer(
mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
);
// mangle the transfer tx to try to lock fee_payer (= mint_keypair) address twice!
tx.message.account_keys.push(tx.message.account_keys[0]);
let tx = &SanitizedTransaction::from_transaction_for_tests(tx);

// this internally should call SanitizedTransaction::get_account_locks().
let result = &mut Ok(());
let timings = &mut ExecuteTimings::default();
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let handler_context = &HandlerContext {
log_messages_bytes_limit: None,
transaction_status_sender: None,
replay_vote_sender: None,
prioritization_fee_cache,
};

DefaultTaskHandler::handle(result, timings, bank, tx, 0, handler_context);
assert_matches!(result, Err(TransactionError::AccountLoadedTwice));
}
}
Loading