Skip to content

Commit

Permalink
fix(node): remove deprecated lock details in AuthorityStore (#3838)
Browse files Browse the repository at this point in the history
* fix(node): remove deprecated lock details in `AuthorityStore`

* fix(node): address review comment

* fix(node): renames for better readability

* fix(node): more renames and comments

* fix(node): rustfmt
  • Loading branch information
muXxer authored Nov 4, 2024
1 parent 4ccd89d commit fd86e35
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 194 deletions.
2 changes: 1 addition & 1 deletion crates/iota-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4118,7 +4118,7 @@ impl AuthorityState {
ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
};

epoch_store.get_signed_transaction(&lock_info.tx_digest)
epoch_store.get_signed_transaction(&lock_info)
}

pub async fn get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
Expand Down
4 changes: 2 additions & 2 deletions crates/iota-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ pub struct AuthorityEpochTables {
DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, AuthoritySignInfo>>,

/// Map from ObjectRef to transaction locking that object
#[default_options_override_fn = "owned_object_transaction_locks_table_default_config"]
#[default_options_override_fn = "owned_object_locked_transactions_table_default_config"]
owned_object_locked_transactions: DBMap<ObjectRef, LockDetailsWrapper>,

/// Signatures over transaction effects that we have signed and returned to
Expand Down Expand Up @@ -607,7 +607,7 @@ fn signed_transactions_table_default_config() -> DBOptions {
.optimize_for_large_values_no_scan(1 << 10)
}

fn owned_object_transaction_locks_table_default_config() -> DBOptions {
fn owned_object_locked_transactions_table_default_config() -> DBOptions {
DBOptions {
options: default_db_options()
.optimize_for_write_throughput()
Expand Down
173 changes: 34 additions & 139 deletions crates/iota-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use iota_types::{
};
use itertools::izip;
use move_core_types::resolver::ModuleResolver;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{RwLockReadGuard, RwLockWriteGuard},
time::Instant,
Expand All @@ -45,7 +44,7 @@ use super::{
};
use crate::{
authority::{
authority_per_epoch_store::AuthorityPerEpochStore,
authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails},
authority_store_pruner::{
AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
},
Expand Down Expand Up @@ -712,9 +711,9 @@ impl AuthorityStore {

// Update the index
if object.get_single_owner().is_some() {
// Only initialize lock for address owned objects.
// Only initialize live object markers for address owned objects.
if !object.is_child_object() {
self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref])?;
}
}

Expand Down Expand Up @@ -758,11 +757,7 @@ impl AuthorityStore {
.map(|(oref, _)| *oref)
.collect();

self.initialize_live_object_markers_impl(
&mut batch,
&non_child_object_refs,
false, // is_force_reset
)?;
self.initialize_live_object_markers_impl(&mut batch, &non_child_object_refs)?;

batch.write()?;

Expand Down Expand Up @@ -801,7 +796,6 @@ impl AuthorityStore {
&perpetual_db.live_owned_object_markers,
&mut batch,
&[object.compute_object_reference()],
false, // is_force_reset
)?;
}
}
Expand Down Expand Up @@ -922,8 +916,8 @@ impl AuthorityStore {
deleted,
written,
events,
locks_to_delete,
new_locks_to_init,
live_object_markers_to_delete,
new_live_object_markers_to_init,
..
} = tx_outputs;

Expand Down Expand Up @@ -1006,11 +1000,11 @@ impl AuthorityStore {

write_batch.insert_batch(&self.perpetual_tables.events, events)?;

self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
self.initialize_live_object_markers_impl(write_batch, new_live_object_markers_to_init)?;

// Note: deletes locks for received objects as well (but not for objects that
// were in `Receiving` arguments which were not received)
self.delete_live_object_markers(write_batch, locks_to_delete)?;
// Note: deletes live object markers for received objects as well (but not for
// objects that were in `Receiving` arguments which were not received)
self.delete_live_object_markers(write_batch, live_object_markers_to_delete)?;

write_batch
.insert_batch(&self.perpetual_tables.effects, [(
Expand Down Expand Up @@ -1051,13 +1045,12 @@ impl AuthorityStore {
transaction: VerifiedSignedTransaction,
) -> IotaResult {
let tx_digest = *transaction.digest();
let epoch = epoch_store.epoch();
// Other writers may be attempting to acquire locks on the same objects, so a
// mutex is required.
// TODO: replace with optimistic db_transactions (i.e. set lock to tx if none)
let _mutexes = self.acquire_locks(owned_input_objects).await;

trace!(?owned_input_objects, "acquire_locks");
trace!(?owned_input_objects, "acquire_transaction_locks");
let mut locks_to_write = Vec::new();

let live_object_markers = self
Expand All @@ -1076,33 +1069,18 @@ impl AuthorityStore {
locks.into_iter(),
owned_input_objects
) {
let Some(live_marker) = live_marker else {
let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
if live_marker.is_none() {
// object at that version does not exist
let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
fp_bail!(
UserInputError::ObjectVersionUnavailableForConsumption {
provided_obj_ref: *obj_ref,
current_version: latest_lock.1
current_version: latest_live_version.1
}
.into()
);
};

let live_marker = live_marker.map(|l| l.migrate().into_inner());

if let Some(LockDetailsDeprecated {
epoch: previous_epoch,
..
}) = &live_marker
{
// this must be from a prior epoch, because we no longer write LockDetails to
// owned_object_transaction_locks
assert!(
previous_epoch < &epoch,
"lock for {:?} should be from a prior epoch",
obj_ref
);
}

if let Some(previous_tx_digest) = &lock {
if previous_tx_digest == &tx_digest {
// no need to re-write lock
Expand Down Expand Up @@ -1144,20 +1122,16 @@ impl AuthorityStore {
.get(&obj_ref)?
.is_none()
{
// object at that version does not exist
return Ok(ObjectLockStatus::LockedAtDifferentVersion {
locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
});
}

let tables = epoch_store.tables()?;
let epoch_id = epoch_store.epoch();

if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
Ok(ObjectLockStatus::LockedToTx {
locked_by_tx: LockDetailsDeprecated {
epoch: epoch_id,
tx_digest,
},
locked_by_tx: tx_digest,
})
} else {
Ok(ObjectLockStatus::Initialized)
Expand Down Expand Up @@ -1200,17 +1174,18 @@ impl AuthorityStore {
/// Returns UserInputError::ObjectVersionUnavailableForConsumption if at
/// least one object lock is not initialized at the given version.
pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> IotaResult {
let locks = self
let live_markers = self
.perpetual_tables
.live_owned_object_markers
.multi_get(objects)?;
for (lock, obj_ref) in locks.into_iter().zip(objects) {
if lock.is_none() {
let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
for (live_marker, obj_ref) in live_markers.into_iter().zip(objects) {
if live_marker.is_none() {
// object at that version does not exist
let latest_live_version = self.get_latest_live_version_for_object_id(obj_ref.0)?;
fp_bail!(
UserInputError::ObjectVersionUnavailableForConsumption {
provided_obj_ref: *obj_ref,
current_version: latest_lock.1
current_version: latest_live_version.1
}
.into()
);
Expand All @@ -1219,59 +1194,29 @@ impl AuthorityStore {
Ok(())
}

/// Initialize a lock to None (but exists) for a given list of ObjectRefs.
/// Returns IotaError::ObjectLockAlreadyInitialized if the lock already
/// exists and is locked to a transaction
/// Initialize live object markers for a given list of ObjectRefs.
fn initialize_live_object_markers_impl(
&self,
write_batch: &mut DBBatch,
objects: &[ObjectRef],
is_force_reset: bool,
) -> IotaResult {
AuthorityStore::initialize_live_object_markers(
&self.perpetual_tables.live_owned_object_markers,
write_batch,
objects,
is_force_reset,
)
}

pub fn initialize_live_object_markers(
live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
live_object_marker_table: &DBMap<ObjectRef, ()>,
write_batch: &mut DBBatch,
objects: &[ObjectRef],
is_force_reset: bool,
) -> IotaResult {
trace!(?objects, "initialize_locks");

let live_object_markers = live_object_marker_table.multi_get(objects)?;

if !is_force_reset {
// If any live_object_markers exist and are not None, return errors for them
// Note we don't check if there is a pre-existing lock. this is because
// initializing the live object marker will not overwrite the lock
// and cause the validator to equivocate.
let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
.iter()
.zip(objects)
.filter_map(|(lock_opt, objref)| {
lock_opt.clone().flatten().map(|_tx_digest| *objref)
})
.collect();
if !existing_live_object_markers.is_empty() {
info!(
?existing_live_object_markers,
"Cannot initialize live_object_markers because some exist already"
);
return Err(IotaError::ObjectLockAlreadyInitialized {
refs: existing_live_object_markers,
});
}
}
trace!(?objects, "initialize_live_object_markers");

write_batch.insert_batch(
live_object_marker_table,
objects.iter().map(|obj_ref| (obj_ref, None)),
objects.iter().map(|obj_ref| (obj_ref, ())),
)?;
Ok(())
}
Expand All @@ -1282,7 +1227,7 @@ impl AuthorityStore {
write_batch: &mut DBBatch,
objects: &[ObjectRef],
) -> IotaResult {
trace!(?objects, "delete_locks");
trace!(?objects, "delete_live_object_markers");
write_batch.delete_batch(
&self.perpetual_tables.live_owned_object_markers,
objects.iter(),
Expand All @@ -1291,7 +1236,7 @@ impl AuthorityStore {
}

#[cfg(test)]
pub(crate) fn reset_locks_for_test(
pub(crate) fn reset_locks_and_live_markers_for_test(
&self,
transactions: &[TransactionDigest],
objects: &[ObjectRef],
Expand All @@ -1312,7 +1257,7 @@ impl AuthorityStore {
batch.write().unwrap();

let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
self.initialize_live_object_markers_impl(&mut batch, objects, false)
self.initialize_live_object_markers_impl(&mut batch, objects)
.unwrap();
batch.write().unwrap();
}
Expand Down Expand Up @@ -1404,10 +1349,10 @@ impl AuthorityStore {

let old_locks: Vec<_> = old_locks.flatten().collect();

// Re-create old locks.
self.initialize_live_object_markers_impl(&mut write_batch, &old_locks, true)?;
// Re-create old live markers.
self.initialize_live_object_markers_impl(&mut write_batch, &old_locks)?;

// Delete new locks
// Delete new live markers
write_batch.delete_batch(
&self.perpetual_tables.live_owned_object_markers,
new_locks.flatten(),
Expand Down Expand Up @@ -1963,56 +1908,6 @@ pub type IotaLockResult = IotaResult<ObjectLockStatus>;
#[derive(Debug, PartialEq, Eq)]
pub enum ObjectLockStatus {
Initialized,
LockedToTx { locked_by_tx: LockDetailsDeprecated },
LockedToTx { locked_by_tx: LockDetails }, // no need to use wrapper, not stored or serialized
LockedAtDifferentVersion { locked_ref: ObjectRef },
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum LockDetailsWrapperDeprecated {
V1(LockDetailsV1Deprecated),
}

impl LockDetailsWrapperDeprecated {
pub fn migrate(self) -> Self {
// TODO: when there are multiple versions, we must iteratively migrate from
// version N to N+1 until we arrive at the latest version
self
}

// Always returns the most recent version. Older versions are migrated to the
// latest version at read time, so there is never a need to access older
// versions.
pub fn inner(&self) -> &LockDetailsDeprecated {
match self {
Self::V1(v1) => v1,

// can remove #[allow] when there are multiple versions
#[allow(unreachable_patterns)]
_ => panic!("lock details should have been migrated to latest version at read time"),
}
}
pub fn into_inner(self) -> LockDetailsDeprecated {
match self {
Self::V1(v1) => v1,

// can remove #[allow] when there are multiple versions
#[allow(unreachable_patterns)]
_ => panic!("lock details should have been migrated to latest version at read time"),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LockDetailsV1Deprecated {
pub epoch: EpochId,
pub tx_digest: TransactionDigest,
}

pub type LockDetailsDeprecated = LockDetailsV1Deprecated;

impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
fn from(details: LockDetailsDeprecated) -> Self {
// always use latest version.
LockDetailsWrapperDeprecated::V1(details)
}
}
Loading

0 comments on commit fd86e35

Please sign in to comment.