Skip to content

Commit

Permalink
Index txes immediately after execution (MystenLabs#5490)
Browse files Browse the repository at this point in the history
* Fix WAL bench

* Do not rely on in-order insertion of events for deduplication

* Index events immediately, rather than in the post-processing step

* PR feedback
  • Loading branch information
mystenmark authored Oct 26, 2022
1 parent e1145b2 commit 7e98f5c
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 174 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 51 additions & 27 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,31 +847,41 @@ impl AuthorityState {
// will be persisted in the log for later recovery.
let notifier_ticket = self.batch_notifier.ticket(bypass_validator_halt)?;
let seq = notifier_ticket.seq();
if let Err(err) = self
let res = self
.commit_certificate(
inner_temporary_store,
certificate,
&signed_effects,
notifier_ticket,
)
.await
{
if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) {
debug!(
?digest,
"validator halted and this cert will never be committed"
);
tx_guard.release();
} else {
error!(?digest, "commit_certificate failed: {}", err);
.await;

let seq = match res {
Err(err) => {
if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) {
debug!(
?digest,
"validator halted and this cert will never be committed"
);
tx_guard.release();
} else {
error!(?digest, "commit_certificate failed: {}", err);
}
debug!("Failed to notify ticket with sequence number: {}", seq);
return Err(err);
}
debug!("Failed to notify ticket with sequence number: {}", seq);
return Err(err);
}
Ok(seq) => seq,
};

// commit_certificate finished, the tx is fully committed to the store.
tx_guard.commit_tx();

// index certificate
let _ = self
.post_process_one_tx(seq, &digest)
.await
.tap_err(|e| error!(tx_digest = ?digest, "tx post processing failed: {e}"));

// Update metrics.
self.metrics.total_effects.inc();
self.metrics.total_certs.inc();
Expand Down Expand Up @@ -1039,6 +1049,13 @@ impl AuthorityState {
seq: TxSequenceNumber,
digest: &TransactionDigest,
) -> SuiResult {
if self.indexes.is_none()
&& self.transaction_streamer.is_none()
&& self.event_handler.is_none()
{
return Ok(());
}

// Load cert and effects.
let info = self.make_transaction_info(digest).await?;
let (cert, effects) = match info {
Expand Down Expand Up @@ -1090,30 +1107,36 @@ impl AuthorityState {

// TODO: This should persist the last successfully-processed sequence to disk, and upon
// starting up, look for any sequences in the store since then and process them.
#[instrument(level = "debug", skip_all)]
pub async fn run_tx_post_processing_process(&self) -> SuiResult {
let mut subscriber = self.subscribe_batch();
let _guard = scoped_counter!(self.metrics, num_post_processing_tasks);
debug!("subscribed to batch service");

loop {
match subscriber.recv().await {
Ok(item) => {
if let UpdateItem::Transaction((
seq,
ExecutionDigests {
transaction: digest,
..
},
)) = item
{
Ok(item) => match item {
UpdateItem::Batch(batch) => {
debug!(
batch_seq = ?batch.data().next_sequence_number,
"post process received batch"
);
}
UpdateItem::Transaction((seq, ExecutionDigests { .. })) => {
self.metrics.post_processing_latest_seq_seen.set(seq as i64);
self.metrics
.post_processing_total_tx_sent_to_post_processing
.inc();
/*
* TODO: we are temporarily not processing txes here because the batch
* system is flaky somehow. The metrics above are left alone so that we can
* continue debugging.
if let Err(e) = self.post_process_one_tx(seq, &digest).await {
warn!(?digest, "Couldn't process tx: {e}");
}
*/
}
}
},
Err(RecvError::Closed) => {
// This shall not happen because the sender of batch notifier should not be closed.
error!("run_tx_post_processing_process receiver channel closed. If this happens there is a bug");
Expand Down Expand Up @@ -2091,14 +2114,15 @@ impl AuthorityState {
certificate: &CertifiedTransaction,
signed_effects: &SignedTransactionEffects,
notifier_ticket: TransactionNotifierTicket,
) -> SuiResult {
) -> SuiResult<TxSequenceNumber> {
let _metrics_guard = start_timer(self.metrics.commit_certificate_latency.clone());

let seq = notifier_ticket.seq();

let digest = certificate.digest();
let effects_digest = &signed_effects.digest();
self.database
let seq = self
.database
.update_state(
inner_temporary_store,
certificate,
Expand All @@ -2112,7 +2136,7 @@ impl AuthorityState {
})?;
// We only notify i.e. update low watermark once database changes are committed
notifier_ticket.notify();
Ok(())
Ok(seq)
}

/// Returns true if certificate is a shared-object cert but has not been sequenced.
Expand Down
32 changes: 18 additions & 14 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
proposed_seq: TxSequenceNumber,
effects: &TransactionEffectsEnvelope<S>,
effects_digest: &TransactionEffectsDigest,
) -> SuiResult {
) -> SuiResult<TxSequenceNumber> {
// Extract the new state from the execution
// TODO: events are already stored in the TxDigest -> TransactionEffects store. Is that enough?
let mut write_batch = self.perpetual_tables.certificates.batch();
Expand All @@ -655,20 +655,23 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
iter::once((transaction_digest, certificate)),
)?;

self.sequence_tx(
write_batch,
inner_temporary_store,
transaction_digest,
proposed_seq,
effects,
effects_digest,
)
.await?;
let seq = self
.sequence_tx(
write_batch,
inner_temporary_store,
transaction_digest,
proposed_seq,
effects,
effects_digest,
)
.await?;

// Cleanup the lock of the shared objects. This must be done after we write effects, as
// effects_exists is used as the guard to avoid re-locking objects for a previously
// executed tx. remove_shared_objects_locks.
self.remove_shared_objects_locks(transaction_digest, certificate)
self.remove_shared_objects_locks(transaction_digest, certificate)?;

Ok(seq)
}

/// Persist temporary storage to DB for genesis modules
Expand Down Expand Up @@ -729,7 +732,8 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
&effects,
effects_digest,
)
.await
.await?;
Ok(())
}

async fn sequence_tx(
Expand All @@ -740,7 +744,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
proposed_seq: TxSequenceNumber,
effects: &TransactionEffectsEnvelope<S>,
effects_digest: &TransactionEffectsDigest,
) -> SuiResult {
) -> SuiResult<TxSequenceNumber> {
// Safe to unwrap since UpdateType::Transaction ensures we get a sequence number back.
let assigned_seq = self
.batch_update_objects(
Expand Down Expand Up @@ -790,7 +794,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

batch.write()?;

Ok(())
Ok(assigned_seq)
}

/// Helper function for updating the objects in the state
Expand Down
13 changes: 12 additions & 1 deletion crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ impl EventHandler {
let res: Result<Vec<_>, _> = effects
.events
.iter()
.map(|e| self.create_envelope(e, effects.transaction_digest, seq_num, timestamp_ms))
.enumerate()
.map(|(event_num, e)| {
self.create_envelope(
e,
effects.transaction_digest,
event_num.try_into().unwrap(),
seq_num,
timestamp_ms,
)
})
.collect();
let envelopes = res?;

Expand All @@ -78,6 +87,7 @@ impl EventHandler {
&self,
event: &Event,
digest: TransactionDigest,
event_num: u64,
seq_num: u64,
timestamp_ms: u64,
) -> Result<EventEnvelope, SuiError> {
Expand All @@ -103,6 +113,7 @@ impl EventHandler {
timestamp_ms,
Some(digest),
seq_num,
event_num,
event.clone(),
json_value,
))
Expand Down
1 change: 1 addition & 0 deletions crates/sui-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tempfile = "3.3.0"
num_cpus = "1.13.1"
pretty_assertions = "1.2.0"
telemetry-subscribers.workspace = true
once_cell = "1"

[[bench]]
name = "write_ahead_log"
Expand Down
30 changes: 25 additions & 5 deletions crates/sui-storage/benches/event_store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,38 @@
#[macro_use]
extern crate criterion;

use std::sync::Mutex;

use criterion::{BenchmarkId, Criterion, Throughput};
use once_cell::sync::OnceCell;
use tempfile::NamedTempFile;
use tokio::runtime::Builder;

use sui_storage::event_store::{sql::SqlEventStore, test_utils, EventStore};
use sui_types::{
base_types::SuiAddress,
base_types::{SuiAddress, TransactionDigest},
event::{EventEnvelope, TransferType},
};

async fn repeat_batch_insert(db: &SqlEventStore, events: &[EventEnvelope], batch_size: usize) {
// Reset sequence number so we can insert events with old sequence numbers
db.testing_only_reset_seq_num();
for chunk in events.chunks(batch_size) {
static NEXT_SEQ: OnceCell<Mutex<u64>> = OnceCell::new();

let mut events: Vec<EventEnvelope> = events.to_vec();

let mut seq: u64 = {
let mutex = NEXT_SEQ.get_or_init(Default::default);
let mut inner_seq = mutex.lock().unwrap();
let ret: u64 = *inner_seq;
let len: u64 = events.len().try_into().unwrap();
*inner_seq += len;
ret
};

for chunk in events.chunks_mut(batch_size) {
for e in chunk.iter_mut() {
e.seq_num = seq;
seq += 1;
}
db.add_events(chunk).await.expect("Inserts should not fail");
}
}
Expand Down Expand Up @@ -48,7 +66,9 @@ fn bench_sqlite_ingestion_varying_batch_size(c: &mut Criterion) {
for n in 0..100 {
let transfer_obj = test_utils::new_test_transfer_event(
1_666_003 + n * 100,
4 + n,
TransactionDigest::random(),
0,
0,
n,
TransferType::ToAddress,
None,
Expand Down
20 changes: 10 additions & 10 deletions crates/sui-storage/benches/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ fn main() {
.build()
.unwrap();

let working_dir = tempfile::tempdir().unwrap();
let wal = Arc::new(DBWriteAheadLog::<usize>::new(
working_dir.path().to_path_buf(),
));

let num_tasks = 20000;
let num_txes_per_task = 10;

let start = std::time::Instant::now();

// TODO: this is not a very good benchmark but perhaps it can at least find regressions
runtime.block_on(async move {
let duration = runtime.block_on(async move {
let working_dir = tempfile::tempdir().unwrap();
let wal = Arc::new(DBWriteAheadLog::<usize>::new(
working_dir.path().to_path_buf(),
));

let start = std::time::Instant::now();

let mut futures = Vec::new();
for _ in 0..num_tasks {
let wal = wal.clone();
Expand All @@ -44,9 +44,9 @@ fn main() {
while let Some(f) = futures.pop() {
f.await.unwrap();
}
});

let duration = start.elapsed();
start.elapsed()
});

println!(
"WriteAheadLog throughput: {} txes/s",
Expand Down
11 changes: 7 additions & 4 deletions crates/sui-storage/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,13 @@ pub enum EventValue {
#[async_trait]
#[enum_dispatch]
pub trait EventStore {
/// Adds events to the EventStore.
/// Semantics: events are appended. The sequence number must be nondecreasing - EventEnvelopes
/// which have sequence numbers below the current one will be skipped. This feature
/// is intended for deduplication.
/// Adds a batch of transaction-related events to the EventStore.
/// Semantics:
/// - The batch is appended to the store.
/// - The batch may contain events from multiple transactions.
/// - Each event must have a unique (seq_num, event_num) tuple - events that duplicate this key
/// will be ignored.
///
/// Returns Ok(rows_affected).
async fn add_events(&self, events: &[EventEnvelope]) -> Result<u64, SuiError>;

Expand Down
Loading

0 comments on commit 7e98f5c

Please sign in to comment.