Skip to content

Commit

Permalink
Migration process
Browse files Browse the repository at this point in the history
  • Loading branch information
acerone85 committed Oct 31, 2024
1 parent 3d88ac5 commit dbc1786
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 22 deletions.
94 changes: 87 additions & 7 deletions crates/fuel-core/src/state/historical_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use std::{
num::NonZeroU64,
path::Path,
sync::Arc,
thread,
};

pub mod description;
Expand Down Expand Up @@ -112,13 +113,14 @@ where
db: RocksDb<Historical<Description>>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let v1_changes_to_migrate_at_once = 1000;
let shared_migration_state = Arc::new(Mutex::new(MigrationState::new()));
let inner = Arc::new(InnerHistoricalRocksDB::new(
db,
state_rewind_policy,
shared_migration_state,
)?);
Self::migrate_modifications_history(inner.clone());
Self::migrate_modifications_history(inner.clone(), v1_changes_to_migrate_at_once);
Ok(Self { inner })
}

Expand All @@ -131,6 +133,7 @@ where
state_rewind_policy: StateRewindPolicy,
max_fds: i32,
) -> DatabaseResult<Self> {
let v1_changes_to_migrate_at_once = 1000;
let shared_migration_state = Arc::new(Mutex::new(MigrationState::new()));
let inner = Arc::new(InnerHistoricalRocksDB::default_open(
path,
Expand All @@ -139,13 +142,83 @@ where
max_fds,
shared_migration_state,
)?);
Self::migrate_modifications_history(inner.clone());
Self::migrate_modifications_history(inner.clone(), v1_changes_to_migrate_at_once);
Ok(Self { inner })
}

fn migrate_modifications_history(
_historical_rocksdb: Arc<InnerHistoricalRocksDB<Description>>,
historical_rocksdb: Arc<InnerHistoricalRocksDB<Description>>,
num_heights_to_migrate_at_once: usize,
) {
let last_migratable_height = historical_rocksdb
.db
.iter_all_keys::<ModificationsHistoryV1<Description>>(None)
.filter_map(Result::ok) //TODO: How to handle errors here? Should we stop the migration?
.max();
let Some(last_migratable_height) = last_migratable_height else {
historical_rocksdb
.shared_migration_state
.lock()
.signal_migration_complete();
return
};

thread::spawn(move || {
historical_rocksdb
.shared_migration_state
.lock()
.update_last_height_to_be_migrated(last_migratable_height);

let mut v1_modifications_iterator = historical_rocksdb
.db
.iter_all_keys::<ModificationsHistoryV1<Description>>(None)
.chunks(num_heights_to_migrate_at_once);

loop {
let mut should_continue = false;
for v1_modifications in &v1_modifications_iterator {
// We processed at least a V1 modification, so we should continue with the migration
should_continue = true;
let mut storage_transaction =
historical_rocksdb.db.read_transaction();
for height in v1_modifications {
let Ok(height) = height else {
// We can continue with migrating other keys
continue;
};

let Ok(Some(v1_changes)) = storage_transaction
.storage_as_mut::<ModificationsHistoryV1<Description>>()
.take(&height)
else {
continue
};

storage_transaction
.storage_as_mut::<ModificationsHistoryV2<Description>>()
.insert(&height, &v1_changes)
.expect("Insertion on a in-memory transaction cannot fail");
}
historical_rocksdb
.shared_migration_state
.lock()
.add_migration_changes(storage_transaction.into_changes());
}
if should_continue {
v1_modifications_iterator = historical_rocksdb
.db
.iter_all_keys::<ModificationsHistoryV1<Description>>(None)
.chunks(num_heights_to_migrate_at_once);
} else {
break
}
}

historical_rocksdb
.shared_migration_state
.lock()
.signal_migration_complete();
});
}

/// Returns the latest view of the database.
Expand Down Expand Up @@ -527,7 +600,10 @@ where
// TODO: This method doesn't work properly because of
// https://github.com/FuelLabs/fuel-core/issues/2095
fn rollback_last_block(&self) -> StorageResult<u64> {
let modifications_history_migration_in_progress = self.is_migration_in_progress();
let modifications_history_migration_in_progress = self
.shared_migration_state
.lock()
.is_migration_in_progress();

let (v2_latest_height, v1_latest_height) = self.multiversion_changes_heights(
IterDirection::Reverse,
Expand Down Expand Up @@ -575,12 +651,16 @@ where
)
.commit()?;

// Reduce the latest migratable height before the rollback. This is necessary to avoid
// concurrency race condition where stale changes for the level being rollbacked are
// committed before the latest migratable height is reduced.
self.shared_migration_state
.lock()
.update_last_height_to_be_migrated(height_to_rollback);

self.db
.commit_changes(&storage_transaction.into_changes())?;

self.shared_migration_state
.lock()
.set_last_height_to_be_migrated(height_to_rollback);
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use fuel_core_storage::codec::Decode;
pub struct MigrationState<Description> {
changes: Changes,
// The height up to which the migration can be performed, included.
last_height_to_be_migrated: Option<u64>,
last_height_to_be_migrated: u64,
migration_in_progress: bool,
_description: PhantomData<Description>,
}
Expand All @@ -51,17 +51,15 @@ impl<Description> MigrationState<Description> {
pub fn new() -> Self {
Self {
changes: Changes::default(),
// When set to None, the migration process won't do anything as all
// changes will be considered stale.
last_height_to_be_migrated: None,
// Initially assume that the migration is in progress.
migration_in_progress: true,
last_height_to_be_migrated: u64::MAX,
migration_in_progress: false,

_description: PhantomData,
}
}

pub fn is_migration_in_progress(&self) -> bool {
self.migration_in_progress
self.migration_in_progress;
}

pub fn signal_migration_complete(&mut self) {
Expand Down Expand Up @@ -112,13 +110,6 @@ where
self.changes = consistent_changes;
}

pub fn set_last_height_to_be_migrated(&mut self, last_height_to_be_migrated: u64) {
self.last_height_to_be_migrated = Some(last_height_to_be_migrated);
let changes = std::mem::take(&mut self.changes);
let consistent_changes = self.remove_stale_migration_changes(changes).unwrap();
self.changes = consistent_changes;
}

// Remove the changes above the last migration height.
fn remove_stale_migration_changes(&self, changes: Changes) -> StorageResult<Changes> {
let mut revert_changes = Changes::default();
Expand Down Expand Up @@ -147,7 +138,7 @@ where
>
>
>::KeyCodec::decode(&serialized_height)?;
if height > self.last_height_to_be_migrated.unwrap() {
if height > self.last_height_to_be_migrated {
revert_changes
.get_mut(&Column::<Description>::HistoryV2Column.id())
.expect("Changes for HistoryV2Column were inserted in this function")
Expand All @@ -173,4 +164,17 @@ where

Ok(transaction_without_stale_changes.into_changes())
}

pub fn update_last_height_to_be_migrated(&mut self, last_height_to_be_migrated: u64) {
if self.is_migration_in_progress() {
let changes = std::mem::take(&mut self.changes);
self.last_height_to_be_migrated
.min(last_height_to_be_migrated);
if let Ok(consistent_changes) = self.remove_stale_migration_changes(changes) {
// If removing the stale migration changes is not successful, we adopt a pessimistic approach and throw
// away all the changes. This is because we cannot guarantee that the changes are consistent anymore.
self.changes = consistent_changes;
}
}
}
}

0 comments on commit dbc1786

Please sign in to comment.