From dbc17866ff9bdbdab5cb85ad64a6f76f6b888a9d Mon Sep 17 00:00:00 2001 From: acerone85 Date: Thu, 31 Oct 2024 15:21:14 +0000 Subject: [PATCH] Migration process --- .../fuel-core/src/state/historical_rocksdb.rs | 94 +++++++++++++++++-- .../modifications_history_migration.rs | 34 ++++--- 2 files changed, 106 insertions(+), 22 deletions(-) diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index 5f1cfa1190..f5040f74fd 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -66,6 +66,7 @@ use std::{ num::NonZeroU64, path::Path, sync::Arc, + thread, }; pub mod description; @@ -112,13 +113,14 @@ where db: RocksDb>, state_rewind_policy: StateRewindPolicy, ) -> DatabaseResult { + let v1_changes_to_migrate_at_once = 1000; let shared_migration_state = Arc::new(Mutex::new(MigrationState::new())); let inner = Arc::new(InnerHistoricalRocksDB::new( db, state_rewind_policy, shared_migration_state, )?); - Self::migrate_modifications_history(inner.clone()); + Self::migrate_modifications_history(inner.clone(), v1_changes_to_migrate_at_once); Ok(Self { inner }) } @@ -131,6 +133,7 @@ where state_rewind_policy: StateRewindPolicy, max_fds: i32, ) -> DatabaseResult { + let v1_changes_to_migrate_at_once = 1000; let shared_migration_state = Arc::new(Mutex::new(MigrationState::new())); let inner = Arc::new(InnerHistoricalRocksDB::default_open( path, @@ -139,13 +142,83 @@ where max_fds, shared_migration_state, )?); - Self::migrate_modifications_history(inner.clone()); + Self::migrate_modifications_history(inner.clone(), v1_changes_to_migrate_at_once); Ok(Self { inner }) } fn migrate_modifications_history( - _historical_rocksdb: Arc>, + historical_rocksdb: Arc>, + num_heights_to_migrate_at_once: usize, ) { + let last_migratable_height = historical_rocksdb + .db + .iter_all_keys::>(None) + .filter_map(Result::ok) //TODO: How to handle errors here? Should we stop the migration? + .max(); + let Some(last_migratable_height) = last_migratable_height else { + historical_rocksdb + .shared_migration_state + .lock() + .signal_migration_complete(); + return + }; + + thread::spawn(move || { + historical_rocksdb + .shared_migration_state + .lock() + .update_last_height_to_be_migrated(last_migratable_height); + + let mut v1_modifications_iterator = historical_rocksdb + .db + .iter_all_keys::>(None) + .chunks(num_heights_to_migrate_at_once); + + loop { + let mut should_continue = false; + for v1_modifications in &v1_modifications_iterator { + // We processed at least a V1 modification, so we should continue with the migration + should_continue = true; + let mut storage_transaction = + historical_rocksdb.db.read_transaction(); + for height in v1_modifications { + let Ok(height) = height else { + // We can continue with migrating other keys + continue; + }; + + let Ok(Some(v1_changes)) = storage_transaction + .storage_as_mut::>() + .take(&height) + else { + continue + }; + + storage_transaction + .storage_as_mut::>() + .insert(&height, &v1_changes) + .expect("Insertion on a in-memory transaction cannot fail"); + } + historical_rocksdb + .shared_migration_state + .lock() + .add_migration_changes(storage_transaction.into_changes()); + } + if should_continue { + v1_modifications_iterator = historical_rocksdb + .db + .iter_all_keys::>(None) + .chunks(num_heights_to_migrate_at_once); + } else { + break + } + } + + historical_rocksdb + .shared_migration_state + .lock() + .signal_migration_complete(); + }); } /// Returns the latest view of the database. @@ -527,7 +600,10 @@ where // TODO: This method doesn't work properly because of // https://github.com/FuelLabs/fuel-core/issues/2095 fn rollback_last_block(&self) -> StorageResult { - let modifications_history_migration_in_progress = self.is_migration_in_progress(); + let modifications_history_migration_in_progress = self + .shared_migration_state + .lock() + .is_migration_in_progress(); let (v2_latest_height, v1_latest_height) = self.multiversion_changes_heights( IterDirection::Reverse, @@ -575,12 +651,16 @@ where ) .commit()?; + // Reduce the latest migratable height before the rollback. This is necessary to avoid + // concurrency race condition where stale changes for the level being rollbacked are + // committed before the latest migratable height is reduced. + self.shared_migration_state + .lock() + .update_last_height_to_be_migrated(height_to_rollback); + self.db .commit_changes(&storage_transaction.into_changes())?; - self.shared_migration_state - .lock() - .set_last_height_to_be_migrated(height_to_rollback); Ok(()) } } diff --git a/crates/fuel-core/src/state/historical_rocksdb/modifications_history_migration.rs b/crates/fuel-core/src/state/historical_rocksdb/modifications_history_migration.rs index 1ead57c763..8d7e2564c2 100644 --- a/crates/fuel-core/src/state/historical_rocksdb/modifications_history_migration.rs +++ b/crates/fuel-core/src/state/historical_rocksdb/modifications_history_migration.rs @@ -42,7 +42,7 @@ use fuel_core_storage::codec::Decode; pub struct MigrationState { changes: Changes, // The height up to which the migration can be performed, included. - last_height_to_be_migrated: Option, + last_height_to_be_migrated: u64, migration_in_progress: bool, _description: PhantomData, } @@ -51,17 +51,15 @@ impl MigrationState { pub fn new() -> Self { Self { changes: Changes::default(), - // When set to None, the migration process won't do anything as all - // changes will be considered stale. - last_height_to_be_migrated: None, - // Initially assume that the migration is in progress. - migration_in_progress: true, + last_height_to_be_migrated: u64::MAX, + migration_in_progress: false, + _description: PhantomData, } } pub fn is_migration_in_progress(&self) -> bool { - self.migration_in_progress + self.migration_in_progress; } pub fn signal_migration_complete(&mut self) { @@ -112,13 +110,6 @@ where self.changes = consistent_changes; } - pub fn set_last_height_to_be_migrated(&mut self, last_height_to_be_migrated: u64) { - self.last_height_to_be_migrated = Some(last_height_to_be_migrated); - let changes = std::mem::take(&mut self.changes); - let consistent_changes = self.remove_stale_migration_changes(changes).unwrap(); - self.changes = consistent_changes; - } - // Remove the changes above the last migration height. fn remove_stale_migration_changes(&self, changes: Changes) -> StorageResult { let mut revert_changes = Changes::default(); @@ -147,7 +138,7 @@ where > > >::KeyCodec::decode(&serialized_height)?; - if height > self.last_height_to_be_migrated.unwrap() { + if height > self.last_height_to_be_migrated { revert_changes .get_mut(&Column::::HistoryV2Column.id()) .expect("Changes for HistoryV2Column were inserted in this function") @@ -173,4 +164,17 @@ where Ok(transaction_without_stale_changes.into_changes()) } + + pub fn update_last_height_to_be_migrated(&mut self, last_height_to_be_migrated: u64) { + if self.is_migration_in_progress() { + let changes = std::mem::take(&mut self.changes); + self.last_height_to_be_migrated + .min(last_height_to_be_migrated); + if let Ok(consistent_changes) = self.remove_stale_migration_changes(changes) { + // If removing the stale migration changes is not successful, we adopt a pessimistic approach and throw + // away all the changes. This is because we cannot guarantee that the changes are consistent anymore. + self.changes = consistent_changes; + } + } + } }