From 7e98f5c7791520289230e9ec54ef082805c7e86b Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Tue, 25 Oct 2022 17:32:34 -0700 Subject: [PATCH] Index txes immediately after execution (#5490) * Fix WAL bench * Do not rely on in-order insertion of events for deduplication * Index events immediately, rather than in the post-processing step * PR feedback --- Cargo.lock | 1 + crates/sui-core/src/authority.rs | 78 +++-- .../sui-core/src/authority/authority_store.rs | 32 +- crates/sui-core/src/event_handler.rs | 13 +- crates/sui-storage/Cargo.toml | 1 + .../sui-storage/benches/event_store_bench.rs | 30 +- crates/sui-storage/benches/write_ahead_log.rs | 20 +- crates/sui-storage/src/event_store/mod.rs | 11 +- crates/sui-storage/src/event_store/sql.rs | 321 ++++++++++++------ .../sui-storage/src/event_store/test_utils.rs | 25 +- .../tests/event_store_integration_test.rs | 33 +- crates/sui-types/src/event.rs | 4 + .../src/unit_tests/event_filter_tests.rs | 7 + 13 files changed, 402 insertions(+), 174 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d2645d884888..564f5e681be78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8603,6 +8603,7 @@ dependencies = [ "futures", "move-core-types", "num_cpus", + "once_cell", "pretty_assertions", "rocksdb", "serde 1.0.147", diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 5192dffe2d2d9..d20e13ec5a63c 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -847,31 +847,41 @@ impl AuthorityState { // will be persisted in the log for later recovery. let notifier_ticket = self.batch_notifier.ticket(bypass_validator_halt)?; let seq = notifier_ticket.seq(); - if let Err(err) = self + let res = self .commit_certificate( inner_temporary_store, certificate, &signed_effects, notifier_ticket, ) - .await - { - if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) { - debug!( - ?digest, - "validator halted and this cert will never be committed" - ); - tx_guard.release(); - } else { - error!(?digest, "commit_certificate failed: {}", err); + .await; + + let seq = match res { + Err(err) => { + if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) { + debug!( + ?digest, + "validator halted and this cert will never be committed" + ); + tx_guard.release(); + } else { + error!(?digest, "commit_certificate failed: {}", err); + } + debug!("Failed to notify ticket with sequence number: {}", seq); + return Err(err); } - debug!("Failed to notify ticket with sequence number: {}", seq); - return Err(err); - } + Ok(seq) => seq, + }; // commit_certificate finished, the tx is fully committed to the store. tx_guard.commit_tx(); + // index certificate + let _ = self + .post_process_one_tx(seq, &digest) + .await + .tap_err(|e| error!(tx_digest = ?digest, "tx post processing failed: {e}")); + // Update metrics. self.metrics.total_effects.inc(); self.metrics.total_certs.inc(); @@ -1039,6 +1049,13 @@ impl AuthorityState { seq: TxSequenceNumber, digest: &TransactionDigest, ) -> SuiResult { + if self.indexes.is_none() + && self.transaction_streamer.is_none() + && self.event_handler.is_none() + { + return Ok(()); + } + // Load cert and effects. let info = self.make_transaction_info(digest).await?; let (cert, effects) = match info { @@ -1090,30 +1107,36 @@ impl AuthorityState { // TODO: This should persist the last successfully-processed sequence to disk, and upon // starting up, look for any sequences in the store since then and process them. + #[instrument(level = "debug", skip_all)] pub async fn run_tx_post_processing_process(&self) -> SuiResult { let mut subscriber = self.subscribe_batch(); let _guard = scoped_counter!(self.metrics, num_post_processing_tasks); + debug!("subscribed to batch service"); loop { match subscriber.recv().await { - Ok(item) => { - if let UpdateItem::Transaction(( - seq, - ExecutionDigests { - transaction: digest, - .. - }, - )) = item - { + Ok(item) => match item { + UpdateItem::Batch(batch) => { + debug!( + batch_seq = ?batch.data().next_sequence_number, + "post process received batch" + ); + } + UpdateItem::Transaction((seq, ExecutionDigests { .. })) => { self.metrics.post_processing_latest_seq_seen.set(seq as i64); self.metrics .post_processing_total_tx_sent_to_post_processing .inc(); + /* + * TODO: we are temporarily not processing txes here because the batch + * system is flaky somehow. The metrics above are left alone so that we can + * continue debugging. if let Err(e) = self.post_process_one_tx(seq, &digest).await { warn!(?digest, "Couldn't process tx: {e}"); } + */ } - } + }, Err(RecvError::Closed) => { // This shall not happen because the sender of batch notifier should not be closed. error!("run_tx_post_processing_process receiver channel closed. If this happens there is a bug"); @@ -2091,14 +2114,15 @@ impl AuthorityState { certificate: &CertifiedTransaction, signed_effects: &SignedTransactionEffects, notifier_ticket: TransactionNotifierTicket, - ) -> SuiResult { + ) -> SuiResult { let _metrics_guard = start_timer(self.metrics.commit_certificate_latency.clone()); let seq = notifier_ticket.seq(); let digest = certificate.digest(); let effects_digest = &signed_effects.digest(); - self.database + let seq = self + .database .update_state( inner_temporary_store, certificate, @@ -2112,7 +2136,7 @@ impl AuthorityState { })?; // We only notify i.e. update low watermark once database changes are committed notifier_ticket.notify(); - Ok(()) + Ok(seq) } /// Returns true if certificate is a shared-object cert but has not been sequenced. diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index 1438535f24430..2de5160f9f599 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -643,7 +643,7 @@ impl Deserialize<'de>> SuiDataStore { proposed_seq: TxSequenceNumber, effects: &TransactionEffectsEnvelope, effects_digest: &TransactionEffectsDigest, - ) -> SuiResult { + ) -> SuiResult { // Extract the new state from the execution // TODO: events are already stored in the TxDigest -> TransactionEffects store. Is that enough? let mut write_batch = self.perpetual_tables.certificates.batch(); @@ -655,20 +655,23 @@ impl Deserialize<'de>> SuiDataStore { iter::once((transaction_digest, certificate)), )?; - self.sequence_tx( - write_batch, - inner_temporary_store, - transaction_digest, - proposed_seq, - effects, - effects_digest, - ) - .await?; + let seq = self + .sequence_tx( + write_batch, + inner_temporary_store, + transaction_digest, + proposed_seq, + effects, + effects_digest, + ) + .await?; // Cleanup the lock of the shared objects. This must be done after we write effects, as // effects_exists is used as the guard to avoid re-locking objects for a previously // executed tx. remove_shared_objects_locks. - self.remove_shared_objects_locks(transaction_digest, certificate) + self.remove_shared_objects_locks(transaction_digest, certificate)?; + + Ok(seq) } /// Persist temporary storage to DB for genesis modules @@ -729,7 +732,8 @@ impl Deserialize<'de>> SuiDataStore { &effects, effects_digest, ) - .await + .await?; + Ok(()) } async fn sequence_tx( @@ -740,7 +744,7 @@ impl Deserialize<'de>> SuiDataStore { proposed_seq: TxSequenceNumber, effects: &TransactionEffectsEnvelope, effects_digest: &TransactionEffectsDigest, - ) -> SuiResult { + ) -> SuiResult { // Safe to unwrap since UpdateType::Transaction ensures we get a sequence number back. let assigned_seq = self .batch_update_objects( @@ -790,7 +794,7 @@ impl Deserialize<'de>> SuiDataStore { batch.write()?; - Ok(()) + Ok(assigned_seq) } /// Helper function for updating the objects in the state diff --git a/crates/sui-core/src/event_handler.rs b/crates/sui-core/src/event_handler.rs index 8fe023c00d411..43aa7c420b59f 100644 --- a/crates/sui-core/src/event_handler.rs +++ b/crates/sui-core/src/event_handler.rs @@ -52,7 +52,16 @@ impl EventHandler { let res: Result, _> = effects .events .iter() - .map(|e| self.create_envelope(e, effects.transaction_digest, seq_num, timestamp_ms)) + .enumerate() + .map(|(event_num, e)| { + self.create_envelope( + e, + effects.transaction_digest, + event_num.try_into().unwrap(), + seq_num, + timestamp_ms, + ) + }) .collect(); let envelopes = res?; @@ -78,6 +87,7 @@ impl EventHandler { &self, event: &Event, digest: TransactionDigest, + event_num: u64, seq_num: u64, timestamp_ms: u64, ) -> Result { @@ -103,6 +113,7 @@ impl EventHandler { timestamp_ms, Some(digest), seq_num, + event_num, event.clone(), json_value, )) diff --git a/crates/sui-storage/Cargo.toml b/crates/sui-storage/Cargo.toml index a992e93507d0f..7952cffc21ccd 100644 --- a/crates/sui-storage/Cargo.toml +++ b/crates/sui-storage/Cargo.toml @@ -41,6 +41,7 @@ tempfile = "3.3.0" num_cpus = "1.13.1" pretty_assertions = "1.2.0" telemetry-subscribers.workspace = true +once_cell = "1" [[bench]] name = "write_ahead_log" diff --git a/crates/sui-storage/benches/event_store_bench.rs b/crates/sui-storage/benches/event_store_bench.rs index dad7cb2571643..1982f004c8acc 100644 --- a/crates/sui-storage/benches/event_store_bench.rs +++ b/crates/sui-storage/benches/event_store_bench.rs @@ -4,20 +4,38 @@ #[macro_use] extern crate criterion; +use std::sync::Mutex; + use criterion::{BenchmarkId, Criterion, Throughput}; +use once_cell::sync::OnceCell; use tempfile::NamedTempFile; use tokio::runtime::Builder; use sui_storage::event_store::{sql::SqlEventStore, test_utils, EventStore}; use sui_types::{ - base_types::SuiAddress, + base_types::{SuiAddress, TransactionDigest}, event::{EventEnvelope, TransferType}, }; async fn repeat_batch_insert(db: &SqlEventStore, events: &[EventEnvelope], batch_size: usize) { - // Reset sequence number so we can insert events with old sequence numbers - db.testing_only_reset_seq_num(); - for chunk in events.chunks(batch_size) { + static NEXT_SEQ: OnceCell> = OnceCell::new(); + + let mut events: Vec = events.to_vec(); + + let mut seq: u64 = { + let mutex = NEXT_SEQ.get_or_init(Default::default); + let mut inner_seq = mutex.lock().unwrap(); + let ret: u64 = *inner_seq; + let len: u64 = events.len().try_into().unwrap(); + *inner_seq += len; + ret + }; + + for chunk in events.chunks_mut(batch_size) { + for e in chunk.iter_mut() { + e.seq_num = seq; + seq += 1; + } db.add_events(chunk).await.expect("Inserts should not fail"); } } @@ -48,7 +66,9 @@ fn bench_sqlite_ingestion_varying_batch_size(c: &mut Criterion) { for n in 0..100 { let transfer_obj = test_utils::new_test_transfer_event( 1_666_003 + n * 100, - 4 + n, + TransactionDigest::random(), + 0, + 0, n, TransferType::ToAddress, None, diff --git a/crates/sui-storage/benches/write_ahead_log.rs b/crates/sui-storage/benches/write_ahead_log.rs index 2ce6b343ef573..7aa3b7f9b04be 100644 --- a/crates/sui-storage/benches/write_ahead_log.rs +++ b/crates/sui-storage/benches/write_ahead_log.rs @@ -15,18 +15,18 @@ fn main() { .build() .unwrap(); - let working_dir = tempfile::tempdir().unwrap(); - let wal = Arc::new(DBWriteAheadLog::::new( - working_dir.path().to_path_buf(), - )); - let num_tasks = 20000; let num_txes_per_task = 10; - let start = std::time::Instant::now(); - // TODO: this is not a very good benchmark but perhaps it can at least find regressions - runtime.block_on(async move { + let duration = runtime.block_on(async move { + let working_dir = tempfile::tempdir().unwrap(); + let wal = Arc::new(DBWriteAheadLog::::new( + working_dir.path().to_path_buf(), + )); + + let start = std::time::Instant::now(); + let mut futures = Vec::new(); for _ in 0..num_tasks { let wal = wal.clone(); @@ -44,9 +44,9 @@ fn main() { while let Some(f) = futures.pop() { f.await.unwrap(); } - }); - let duration = start.elapsed(); + start.elapsed() + }); println!( "WriteAheadLog throughput: {} txes/s", diff --git a/crates/sui-storage/src/event_store/mod.rs b/crates/sui-storage/src/event_store/mod.rs index be5708562a038..89d8b084f55c0 100644 --- a/crates/sui-storage/src/event_store/mod.rs +++ b/crates/sui-storage/src/event_store/mod.rs @@ -320,10 +320,13 @@ pub enum EventValue { #[async_trait] #[enum_dispatch] pub trait EventStore { - /// Adds events to the EventStore. - /// Semantics: events are appended. The sequence number must be nondecreasing - EventEnvelopes - /// which have sequence numbers below the current one will be skipped. This feature - /// is intended for deduplication. + /// Adds a batch of transaction-related events to the EventStore. + /// Semantics: + /// - The batch is appended to the store. + /// - The batch may contain events from multiple transactions. + /// - Each event must have a unique (seq_num, event_num) tuple - events that duplicate this key + /// will be ignored. + /// /// Returns Ok(rows_affected). async fn add_events(&self, events: &[EventEnvelope]) -> Result; diff --git a/crates/sui-storage/src/event_store/sql.rs b/crates/sui-storage/src/event_store/sql.rs index 5c5322785007e..2588dff891c46 100644 --- a/crates/sui-storage/src/event_store/sql.rs +++ b/crates/sui-storage/src/event_store/sql.rs @@ -10,7 +10,6 @@ use serde_json::{json, Value}; use sqlx::ConnectOptions; use std::collections::BTreeMap; use std::path::Path; -use std::sync::atomic::{AtomicU64, Ordering}; use strum::{EnumMessage, IntoEnumIterator}; use sui_types::base_types::SuiAddress; use sui_types::object::Owner; @@ -32,9 +31,6 @@ use tracing::{info, instrument, log, warn}; /// - fields is JSON for now (for easy JSON filtering) and contains all fields not in main columns pub struct SqlEventStore { pool: SqlitePool, - // Sequence number is used to prevent previously ingested events from being ingested again - // It acts as a cache, as the seq_num field is also written to the DB. - seq_num: AtomicU64, } /// Important for updating Columns: @@ -50,6 +46,8 @@ enum EventsTableColumns { Timestamp = 0, /// seq_num INTEGER SeqNum, + /// event_num INTEGER + EventNum, /// tx_digest BLOB TxDigest, /// event_type INTEGER @@ -74,7 +72,8 @@ enum EventsTableColumns { Recipient, } -const SQL_INSERT_TX: &str = "INSERT INTO events (timestamp, seq_num, tx_digest, event_type, \ +const SQL_INSERT_TX: &str = + "INSERT OR IGNORE INTO events (timestamp, seq_num, event_num, tx_digest, event_type, \ package_id, module_name, object_id, fields, move_event_name, contents, sender, \ recipient) "; @@ -97,10 +96,7 @@ impl SqlEventStore { .await .map_err(convert_sqlx_err)?; info!("Created new in-memory SQLite EventStore for testing"); - Ok(Self { - pool, - seq_num: AtomicU64::new(0), - }) + Ok(Self { pool }) } /// Creates or opens a new SQLite database at a specific path @@ -118,10 +114,7 @@ impl SqlEventStore { .await .map_err(convert_sqlx_err)?; info!(?db_path, "Created/opened SQLite EventStore on disk"); - Ok(Self { - pool, - seq_num: AtomicU64::new(0), - }) + Ok(Self { pool }) } /// Initializes the database, creating tables and indexes as needed @@ -159,13 +152,12 @@ impl SqlEventStore { info!(column, "Index is ready"); } - // Setting last sequence number - let last_seq_num = self.last_seq_num().await?; - self.seq_num.store(last_seq_num, Ordering::Relaxed); - info!( - last_seq_num, - "Recovered last sequence number from event store" - ); + self.pool + .execute( + "CREATE UNIQUE INDEX IF NOT EXISTS event_unique_id_idx on events (seq_num, event_num)", + ) + .await + .map_err(convert_sqlx_err)?; Ok(()) } @@ -181,22 +173,6 @@ impl SqlEventStore { Ok(num_rows as usize) } - async fn last_seq_num(&self) -> Result { - let result = sqlx::query("SELECT MAX(seq_num) FROM events") - .fetch_one(&self.pool) - .await - .map_err(convert_sqlx_err)?; - let num_rows: i64 = result.get(0); - Ok(num_rows as u64) - } - - /// Only use for testing or benchmarking. Resets the last seq number to 0 - /// so we can insert new events without increasing TX sequence number. - #[allow(unused)] - pub fn testing_only_reset_seq_num(&self) { - self.seq_num.store(0, Ordering::Relaxed); - } - fn try_extract_object_id(row: &SqliteRow, col: usize) -> Result, SuiError> { let raw_bytes: Option> = row.get(col); match raw_bytes { @@ -371,40 +347,21 @@ const MAX_INSERT_BATCH: usize = 1000; impl EventStore for SqlEventStore { #[instrument(level = "debug", skip_all, err)] async fn add_events(&self, events: &[EventEnvelope]) -> Result { - let mut cur_seq = self.seq_num.load(Ordering::Acquire); - let initial_seq = cur_seq; let mut rows_affected = 0; if events.is_empty() { return Ok(0); } - let mut start_index = 0; - let mut end_index = 0; - - // Insert event in batches, skipping over events that have lower or decreasing sequence numbers - while end_index < events.len() { - // Skip events that have a lower sequence number. They've been seen before. - if events[start_index].seq_num < cur_seq { - start_index += 1; - end_index += 1; - continue; - } - - let final_index = (start_index + MAX_INSERT_BATCH).min(events.len()); - // Keep going while the sequence number is not decreasing - while end_index < final_index && events[end_index].seq_num >= cur_seq { - cur_seq = events[end_index].seq_num; - end_index += 1; - } - + for chunk in events.chunks(MAX_INSERT_BATCH) { let mut query_builder = QueryBuilder::new(SQL_INSERT_TX); - query_builder.push_values(&events[start_index..end_index], |mut b, event| { + query_builder.push_values(chunk, |mut b, event| { let event_type = EventType::from(&event.event); let sender = event.event.sender().map(|sender| sender.to_vec()); let move_event_name = event.event.move_event_name(); b.push_bind(event.timestamp as i64) .push_bind(event.seq_num as i64) + .push_bind(event.event_num as i64) .push_bind(event.tx_digest.map(|txd| txd.to_bytes())) .push_bind(event_type as u16) .push_bind(event.event.package_id().map(|pid| pid.to_vec())) @@ -431,15 +388,6 @@ impl EventStore for SqlEventStore { rows_affected += res.rows_affected(); } - // CAS is used to detect any concurrency glitches. Note that we assume a single writer - // append model, which is currently true. In single writer the CAS should never fail. - // We also do this after writing all events, for efficiency. - if cur_seq > initial_seq { - self.seq_num - .compare_exchange(initial_seq, cur_seq, Ordering::Acquire, Ordering::Relaxed) - .expect("CAS Failure - event writes are not single threaded"); - } - Ok(rows_affected) } @@ -674,22 +622,48 @@ mod tests { // Insert some records info!("Inserting records!"); + let txfr_digest = TransactionDigest::random(); let to_insert = vec![ - test_utils::new_test_newobj_event(1_000_000, 1, None, None, None), - test_utils::new_test_publish_event(1_001_000, 2, None), + test_utils::new_test_newobj_event( + 1_000_000, + TransactionDigest::random(), + 1, + 0, // event_num + None, + None, + None, + ), + test_utils::new_test_publish_event( + 1_001_000, + TransactionDigest::random(), + 2, + 0, // event_num + None, + ), test_utils::new_test_transfer_event( 1_002_000, + txfr_digest, 3, + 0, // event_num 1, TransferType::Coin, None, None, None, ), - test_utils::new_test_deleteobj_event(1_003_000, 3, None, None), + test_utils::new_test_deleteobj_event( + 1_003_000, + txfr_digest, + 3, + 1, // event_num + None, + None, + ), test_utils::new_test_transfer_event( 1_004_000, + TransactionDigest::random(), 4, + 0, // event_num 1, TransferType::ToAddress, None, @@ -698,7 +672,9 @@ mod tests { ), test_utils::new_test_move_event( 1_005_000, + TransactionDigest::random(), 5, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "test_module", "test_foo", @@ -731,21 +707,46 @@ mod tests { // Insert some records info!("Inserting records!"); let to_insert = vec![ - test_utils::new_test_newobj_event(1_000_000, 1, None, None, None), - test_utils::new_test_publish_event(1_001_000, 2, None), + test_utils::new_test_newobj_event( + 1_000_000, + TransactionDigest::random(), + 1, + 0, // event_num + None, + None, + None, + ), + test_utils::new_test_publish_event( + 1_001_000, + TransactionDigest::random(), + 2, + 0, // event_num + None, + ), test_utils::new_test_transfer_event( - 1_002_000, + 1_003_000, + TransactionDigest::random(), 3, + 0, // event_num 1, TransferType::Coin, None, None, None, ), - test_utils::new_test_deleteobj_event(1_003_000, 3, None, None), + test_utils::new_test_deleteobj_event( + 1_003_000, + TransactionDigest::random(), + 4, + 0, // event_num + None, + None, + ), test_utils::new_test_transfer_event( 1_004_000, - 4, + TransactionDigest::random(), + 5, + 0, // event_num 1, TransferType::ToAddress, None, @@ -754,7 +755,9 @@ mod tests { ), test_utils::new_test_move_event( 1_005_000, - 5, + TransactionDigest::random(), + 6, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "test_module", "test_foo", @@ -789,22 +792,48 @@ mod tests { // Insert some records info!("Inserting records!"); + let txfr_digest = TransactionDigest::random(); let to_insert = vec![ - test_utils::new_test_newobj_event(1_000_000, 1, None, None, None), - test_utils::new_test_publish_event(1_001_000, 2, None), + test_utils::new_test_newobj_event( + 1_000_000, + TransactionDigest::random(), + 1, + 0, // event_num + None, + None, + None, + ), + test_utils::new_test_publish_event( + 1_001_000, + TransactionDigest::random(), + 2, + 0, // event_num + None, + ), test_utils::new_test_transfer_event( - 1_002_000, + 1_003_000, + txfr_digest, 3, + 0, // event_num 1, TransferType::Coin, None, None, None, ), - test_utils::new_test_deleteobj_event(1_003_000, 3, None, None), + test_utils::new_test_deleteobj_event( + 1_003_000, + txfr_digest, + 3, + 1, // event_num + None, + None, + ), test_utils::new_test_transfer_event( 1_004_000, + TransactionDigest::random(), 4, + 0, // event_num 1, TransferType::ToAddress, None, @@ -813,7 +842,9 @@ mod tests { ), test_utils::new_test_move_event( 1_005_000, + TransactionDigest::random(), 5, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "test_module", "test_foo", @@ -892,21 +923,46 @@ mod tests { // Insert some records info!("Inserting records!"); let to_insert = vec![ - test_utils::new_test_newobj_event(1_000_000, 1, None, None, None), - test_utils::new_test_publish_event(1_001_000, 2, None), + test_utils::new_test_newobj_event( + 1_000_000, + TransactionDigest::random(), + 1, + 0, // event_num + None, + None, + None, + ), + test_utils::new_test_publish_event( + 1_001_000, + TransactionDigest::random(), + 2, + 0, // event_num + None, + ), test_utils::new_test_transfer_event( 1_002_000, + TransactionDigest::random(), 3, + 0, // event_num 1, TransferType::Coin, None, None, None, ), - test_utils::new_test_deleteobj_event(1_003_000, 3, None, None), + test_utils::new_test_deleteobj_event( + 1_003_000, + TransactionDigest::random(), + 3, + 0, // event_num + None, + None, + ), test_utils::new_test_transfer_event( 1_004_000, + TransactionDigest::random(), 4, + 0, // event_num 1, TransferType::ToAddress, None, @@ -915,14 +971,18 @@ mod tests { ), test_utils::new_test_move_event( 1_005_000, + TransactionDigest::random(), 5, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "test_module", "test_foo", ), test_utils::new_test_move_event( 1_006_000, + TransactionDigest::random(), 6, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "test_module", "test_foo", @@ -973,21 +1033,27 @@ mod tests { let to_insert = vec![ test_utils::new_test_move_event( 1_000_000, + TransactionDigest::random(), 1, + 0, // event_num ObjectID::from_hex_literal("0x42").unwrap(), "query_by_move_event_struct_name", "test_foo", ), test_utils::new_test_move_event( 1_001_000, + TransactionDigest::random(), 2, + 0, // event_num ObjectID::from_hex_literal("0x42").unwrap(), "query_by_move_event_struct_name", "test_foo", ), test_utils::new_test_move_event( 1_002_000, + TransactionDigest::random(), 3, + 0, // event_num ObjectID::from_hex_literal("0x42").unwrap(), "query_by_move_event_struct_name", "test_bar", @@ -1032,7 +1098,9 @@ mod tests { test_utils::new_test_transfer_event( // 0, object, sender, recipient 1_000_000, + TransactionDigest::random(), 1, + 0, // event_num 1, TransferType::Coin, Some(object_id), @@ -1042,7 +1110,9 @@ mod tests { test_utils::new_test_newobj_event( // 1, object, sender 1_001_000, + TransactionDigest::random(), 2, + 0, // event_num Some(object_id), Some(sender), None, @@ -1050,7 +1120,9 @@ mod tests { test_utils::new_test_transfer_event( // 2, recipient 1_002_000, + TransactionDigest::random(), 3, + 0, // event_num 1, TransferType::Coin, None, @@ -1060,7 +1132,9 @@ mod tests { test_utils::new_test_newobj_event( // 3, object, recipient 1_003_000, + TransactionDigest::random(), 4, + 0, // event_num Some(object_id), None, Some(recipient), @@ -1068,25 +1142,35 @@ mod tests { test_utils::new_test_deleteobj_event( // 4, object, sender 1_004_000, + TransactionDigest::random(), 5, + 0, // event_num Some(object_id), Some(sender), ), test_utils::new_test_deleteobj_event( // 5, sender 1_005_000, + TransactionDigest::random(), 6, + 0, // event_num None, Some(sender), ), test_utils::new_test_publish_event( // 6, None - 1_006_000, 7, None, + 1_006_000, + TransactionDigest::random(), + 7, + 0, // event_num + None, ), test_utils::new_test_publish_event( // 7, sender 1_007_000, + TransactionDigest::random(), 8, + 0, // event_num Some(sender), ), ]; @@ -1141,7 +1225,9 @@ mod tests { let to_insert = vec![test_utils::new_test_transfer_event( 1_000_000, + TransactionDigest::random(), 1, + 0, // event_num u64::MAX, TransferType::Coin, None, @@ -1170,27 +1256,53 @@ mod tests { // Initialize store let dir = tempfile::TempDir::new().unwrap(); // NOTE this must be its own line so dir isn't dropped let db_file = dir.path().join("events.db"); - let db = SqlEventStore::new_from_file(&db_file).await?; - db.initialize().await?; + let db = SqlEventStore::new_from_file(&db_file).await.unwrap(); + db.initialize().await.unwrap(); + let txfr_digest = TransactionDigest::random(); // TODO: these 30 lines are quite duplicated in this file (4 times). // Write in some events, all should succeed let to_insert = vec![ - test_utils::new_test_newobj_event(1_000_000, 1, None, None, None), - test_utils::new_test_publish_event(1_001_000, 2, None), + test_utils::new_test_newobj_event( + 1_000_000, + TransactionDigest::random(), + 1, + 0, // event_num + None, + None, + None, + ), + test_utils::new_test_publish_event( + 1_001_000, + TransactionDigest::random(), + 2, + 0, // event_num + None, + ), test_utils::new_test_transfer_event( 1_002_000, + txfr_digest, 3, + 0, // event_num 1, TransferType::Coin, None, None, None, ), - test_utils::new_test_deleteobj_event(1_003_000, 3, None, None), + test_utils::new_test_deleteobj_event( + 1_003_000, + txfr_digest, + 3, + 1, // event_num + None, + None, + ), test_utils::new_test_transfer_event( 1_004_000, + TransactionDigest::random(), 4, + 0, // event_num 1, TransferType::ToAddress, None, @@ -1199,33 +1311,34 @@ mod tests { ), test_utils::new_test_move_event( 1_005_000, + TransactionDigest::random(), 5, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "test_module", "test_foo", ), ]; - assert_eq!(db.add_events(&to_insert[..4]).await?, 4); - assert_eq!(db.total_event_count().await?, 4); + assert_eq!(db.add_events(&to_insert[..4]).await.unwrap(), 4); + assert_eq!(db.total_event_count().await.unwrap(), 4); - // Write in an older event with older sequence number, should be skipped - assert_eq!(db.add_events(&to_insert[1..2]).await?, 0); - assert_eq!(db.total_event_count().await?, 4); + // Previously inserted event is ignored + assert_eq!(db.add_events(&to_insert[1..2]).await.unwrap(), 0); + assert_eq!(db.total_event_count().await.unwrap(), 4); - // Drop and reload DB from the same file, test that sequence number was recovered + // Drop and reload DB from the same file. drop(db); - let db = SqlEventStore::new_from_file(&db_file).await?; - db.initialize().await?; - assert_eq!(db.last_seq_num().await?, 3); - assert_eq!(db.total_event_count().await?, 4); + let db = SqlEventStore::new_from_file(&db_file).await.unwrap(); + db.initialize().await.unwrap(); + assert_eq!(db.total_event_count().await.unwrap(), 4); - // Try ingesting older event, check still skipped - assert_eq!(db.add_events(&to_insert[1..2]).await?, 0); - assert_eq!(db.total_event_count().await?, 4); + // Try inserting previously ingested events, should be skipped + assert_eq!(db.add_events(&to_insert[1..2]).await.unwrap(), 0); + assert_eq!(db.total_event_count().await.unwrap(), 4); // Check writing new events still succeeds - assert_eq!(db.add_events(&to_insert[4..]).await?, 2); - assert_eq!(db.total_event_count().await?, 6); + assert_eq!(db.add_events(&to_insert[4..]).await.unwrap(), 2); + assert_eq!(db.total_event_count().await.unwrap(), 6); Ok(()) } diff --git a/crates/sui-storage/src/event_store/test_utils.rs b/crates/sui-storage/src/event_store/test_utils.rs index 509ba8b1143d4..7fc68fcd10b90 100644 --- a/crates/sui-storage/src/event_store/test_utils.rs +++ b/crates/sui-storage/src/event_store/test_utils.rs @@ -47,13 +47,16 @@ impl TestEvent { pub fn new_test_publish_event( timestamp: u64, + digest: TransactionDigest, seq_num: u64, + event_num: u64, sender: Option, ) -> EventEnvelope { EventEnvelope::new( timestamp, - None, + Some(digest), seq_num, + event_num, Event::Publish { sender: sender.unwrap_or_else(SuiAddress::random_for_testing_only), package_id: ObjectID::random(), @@ -64,15 +67,18 @@ pub fn new_test_publish_event( pub fn new_test_newobj_event( timestamp: u64, + digest: TransactionDigest, seq_num: u64, + event_num: u64, object_id: Option, sender: Option, recipient: Option, ) -> EventEnvelope { EventEnvelope::new( timestamp, - Some(TransactionDigest::random()), + Some(digest), seq_num, + event_num, Event::NewObject { package_id: ObjectID::random(), transaction_module: Identifier::new("module").unwrap(), @@ -87,14 +93,17 @@ pub fn new_test_newobj_event( pub fn new_test_deleteobj_event( timestamp: u64, + digest: TransactionDigest, seq_num: u64, + event_num: u64, object_id: Option, sender: Option, ) -> EventEnvelope { EventEnvelope::new( timestamp, - Some(TransactionDigest::random()), + Some(digest), seq_num, + event_num, Event::DeleteObject { package_id: ObjectID::random(), transaction_module: Identifier::new("module").unwrap(), @@ -107,7 +116,9 @@ pub fn new_test_deleteobj_event( pub fn new_test_transfer_event( timestamp: u64, + digest: TransactionDigest, seq_num: u64, + event_num: u64, object_version: u64, type_: TransferType, object_id: Option, @@ -116,8 +127,9 @@ pub fn new_test_transfer_event( ) -> EventEnvelope { EventEnvelope::new( timestamp, - Some(TransactionDigest::random()), + Some(digest), seq_num, + event_num, Event::TransferObject { package_id: ObjectID::random(), transaction_module: Identifier::new("module").unwrap(), @@ -135,7 +147,9 @@ pub fn new_test_transfer_event( pub fn new_test_move_event( timestamp: u64, + digest: TransactionDigest, seq_num: u64, + event_num: u64, package_id: ObjectID, module_name: &str, event_struct_name: &'static str, @@ -159,8 +173,9 @@ pub fn new_test_move_event( let json = serde_json::to_value(&move_struct).expect("Cannot serialize move struct to JSON"); EventEnvelope::new( timestamp, - Some(TransactionDigest::random()), + Some(digest), seq_num, + event_num, move_event, Some(json), ) diff --git a/crates/sui-storage/tests/event_store_integration_test.rs b/crates/sui-storage/tests/event_store_integration_test.rs index 439fbb85179ee..f53d27fb17ded 100644 --- a/crates/sui-storage/tests/event_store_integration_test.rs +++ b/crates/sui-storage/tests/event_store_integration_test.rs @@ -4,7 +4,7 @@ use sui_json_rpc_types::SuiEventEnvelope; use sui_storage::event_store::{sql::SqlEventStore, test_utils, EventStore}; use sui_types::{ - base_types::ObjectID, + base_types::{ObjectID, TransactionDigest}, event::{EventEnvelope, EventType, TransferType}, }; #[tokio::test] @@ -14,24 +14,43 @@ async fn test_stored_event_to_sui_event() -> Result<(), anyhow::Error> { let db = SqlEventStore::new_memory_only_not_prod().await?; db.initialize().await.map_err(anyhow::Error::from)?; - let new_obj = test_utils::new_test_newobj_event(1_666_000, 1, None, None, None); + let new_obj = test_utils::new_test_newobj_event( + 1_666_000, + TransactionDigest::random(), + 1, + 0, // event_num + None, + None, + None, + ); insert_and_fetch_by_tx_digest_then_compare(new_obj, &db).await?; let move_ = test_utils::new_test_move_event( 1_666_001, + TransactionDigest::random(), 2, + 0, // event_num ObjectID::from_hex_literal("0x3").unwrap(), "a_module", "whatever", ); insert_and_fetch_by_tx_digest_then_compare(move_, &db).await?; - let delete_obj = test_utils::new_test_deleteobj_event(1_666_002, 3, None, None); + let delete_obj = test_utils::new_test_deleteobj_event( + 1_666_002, + TransactionDigest::random(), + 3, + 0, // event_num + None, + None, + ); insert_and_fetch_by_tx_digest_then_compare(delete_obj, &db).await?; let transfer_obj = test_utils::new_test_transfer_event( 1_666_003, + TransactionDigest::random(), 4, + 0, // event_num 1, TransferType::ToAddress, None, @@ -40,7 +59,13 @@ async fn test_stored_event_to_sui_event() -> Result<(), anyhow::Error> { ); insert_and_fetch_by_tx_digest_then_compare(transfer_obj, &db).await?; - let publish = test_utils::new_test_publish_event(1_001_000, 5, None); + let publish = test_utils::new_test_publish_event( + 1_001_000, + TransactionDigest::random(), + 5, + 0, // event_num + None, + ); assert_eq!(db.add_events(&vec![publish.clone()]).await?, 1); let mut queried_events = db .events_by_type(1_001_000, 1_002_000, EventType::Publish, 1) diff --git a/crates/sui-types/src/event.rs b/crates/sui-types/src/event.rs index f265785f1e52c..5c04c3d48b366 100644 --- a/crates/sui-types/src/event.rs +++ b/crates/sui-types/src/event.rs @@ -37,6 +37,8 @@ pub struct EventEnvelope { pub tx_digest: Option, /// Sequence number, must be nondecreasing for event ingestion idempotency pub seq_num: u64, + /// Consecutive per-tx counter assigned to this event. + pub event_num: u64, /// Specific event type pub event: Event, /// json value for MoveStruct (for MoveEvent only) @@ -48,6 +50,7 @@ impl EventEnvelope { timestamp: u64, tx_digest: Option, seq_num: u64, + event_num: u64, event: Event, move_struct_json_value: Option, ) -> Self { @@ -55,6 +58,7 @@ impl EventEnvelope { timestamp, tx_digest, seq_num, + event_num, event, move_struct_json_value, } diff --git a/crates/sui-types/src/unit_tests/event_filter_tests.rs b/crates/sui-types/src/unit_tests/event_filter_tests.rs index 41790b445fe62..bada54a53dbf0 100644 --- a/crates/sui-types/src/unit_tests/event_filter_tests.rs +++ b/crates/sui-types/src/unit_tests/event_filter_tests.rs @@ -32,6 +32,7 @@ fn test_move_event_filter() { timestamp: 0, tx_digest: Some(TransactionDigest::random()), seq_num: 0, + event_num: 0, event: move_event, move_struct_json_value: Some(json!(BTreeMap::from([("balance", 10000)]))), }; @@ -88,6 +89,7 @@ fn test_transfer_filter() { timestamp: 0, tx_digest: Some(TransactionDigest::random()), seq_num: 1, + event_num: 0, event: move_event, move_struct_json_value: None, }; @@ -122,6 +124,7 @@ fn test_publish_filter() { timestamp: 0, tx_digest: Some(TransactionDigest::random()), seq_num: 0, + event_num: 0, event: move_event, move_struct_json_value: None, }; @@ -158,6 +161,7 @@ fn test_delete_object_filter() { let envelope = EventEnvelope { timestamp: 0, tx_digest: Some(TransactionDigest::random()), + event_num: 0, seq_num: 0, event: move_event, move_struct_json_value: None, @@ -200,6 +204,7 @@ fn test_new_object_filter() { timestamp: 0, tx_digest: Some(TransactionDigest::random()), seq_num: 0, + event_num: 0, event: move_event, move_struct_json_value: None, }; @@ -232,6 +237,7 @@ fn test_epoch_change_filter() { timestamp: 0, tx_digest: Some(TransactionDigest::random()), seq_num: 1, + event_num: 0, event: move_event, move_struct_json_value: None, }; @@ -247,6 +253,7 @@ fn test_checkpoint_filter() { timestamp: 0, tx_digest: Some(TransactionDigest::random()), seq_num: 1, + event_num: 0, event: move_event, move_struct_json_value: None, };