diff --git a/Cargo.lock b/Cargo.lock index cf5f4a245..f13a17713 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3211,7 +3211,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -6899,7 +6899,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/xmtp_debug/src/args.rs b/xmtp_debug/src/args.rs index 3c2cf67c5..a0241216b 100644 --- a/xmtp_debug/src/args.rs +++ b/xmtp_debug/src/args.rs @@ -154,6 +154,17 @@ pub enum EntityKind { Identity, } +impl std::fmt::Display for EntityKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use EntityKind::*; + match self { + Group => write!(f, "group"), + Message => write!(f, "message"), + Identity => write!(f, "identity"), + } + } +} + /// specify the log output #[derive(Args, Debug)] pub struct LogOptions { diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 02327f11b..d1041dfcf 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -50,7 +50,7 @@ use crate::{ group_message::StoredGroupMessage, refresh_state::EntityKind, wallet_addresses::WalletEntry, - EncryptedMessageStore, StorageError, + EncryptedMessageStore, NotFound, StorageError, }, subscriptions::{LocalEventError, LocalEvents}, types::InstallationId, @@ -108,6 +108,12 @@ pub enum ClientError { Generic(String), } +impl From for ClientError { + fn from(value: NotFound) -> Self { + ClientError::Storage(StorageError::NotFound(value)) + } +} + impl From for ClientError { fn from(err: GroupError) -> ClientError { ClientError::Group(Box::new(err)) @@ -309,11 +315,7 @@ where address: String, ) -> Result, ClientError> { let results = self.find_inbox_ids_from_addresses(conn, &[address]).await?; - if let Some(first_result) = results.into_iter().next() { - Ok(first_result) - } else { - Ok(None) - } + Ok(results.into_iter().next().flatten()) } /// Calls the server to look up the `inbox_id`s` associated with a list of addresses. @@ -556,10 +558,9 @@ where { Some(id) => id, None => { - return Err(ClientError::Storage(StorageError::NotFound(format!( - "inbox id for address {} not found", - account_address - )))) + return Err(ClientError::Storage(StorageError::NotFound( + NotFound::InboxIdForAddress(account_address), + ))); } }; @@ -610,13 +611,10 @@ where group_id: Vec, ) -> Result, ClientError> { let stored_group: Option = conn.fetch(&group_id)?; - match stored_group { - Some(group) => Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)), - None => Err(ClientError::Storage(StorageError::NotFound(format!( - "group {}", - hex::encode(group_id) - )))), - } + stored_group + .map(|g| MlsGroup::new(self.clone(), g.id, g.created_at_ns)) + .ok_or(NotFound::GroupById(group_id)) + .map_err(Into::into) } /// Look up a group by its ID @@ -638,17 +636,10 @@ where target_inbox_id: String, ) -> Result, ClientError> { let conn = self.store().conn()?; - match conn.find_dm_group(&target_inbox_id)? { - Some(dm_group) => Ok(MlsGroup::new( - self.clone(), - dm_group.id, - dm_group.created_at_ns, - )), - None => Err(ClientError::Storage(StorageError::NotFound(format!( - "dm_target_inbox_id {}", - hex::encode(target_inbox_id) - )))), - } + let group = conn + .find_dm_group(&target_inbox_id)? + .ok_or(NotFound::DmByInbox(target_inbox_id))?; + Ok(MlsGroup::new(self.clone(), group.id, group.created_at_ns)) } /// Look up a message by its ID @@ -656,13 +647,7 @@ where pub fn message(&self, message_id: Vec) -> Result { let conn = &mut self.store().conn()?; let message = conn.get_group_message(&message_id)?; - match message { - Some(message) => Ok(message), - None => Err(ClientError::Storage(StorageError::NotFound(format!( - "message {}", - hex::encode(message_id) - )))), - } + Ok(message.ok_or(NotFound::MessageById(message_id))?) } /// Query for groups with optional filters diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 070caabe6..43660e643 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -6,11 +6,9 @@ use crate::{ configuration::NS_IN_HOUR, storage::{ consent_record::StoredConsentRecord, - group::StoredGroup, - group::{ConversationType, GroupQueryArgs}, - group_message::MsgQueryArgs, - group_message::{GroupMessageKind, StoredGroupMessage}, - DbConnection, StorageError, + group::{ConversationType, GroupQueryArgs, StoredGroup}, + group_message::{GroupMessageKind, MsgQueryArgs, StoredGroupMessage}, + DbConnection, NotFound, StorageError, }, subscriptions::{LocalEvents, StreamMessages, SubscribeError, SyncMessage}, xmtp_openmls_provider::XmtpOpenMlsProvider, @@ -115,6 +113,12 @@ impl RetryableError for DeviceSyncError { } } +impl From for DeviceSyncError { + fn from(value: NotFound) -> Self { + DeviceSyncError::Storage(StorageError::NotFound(value)) + } +} + impl Client where ApiClient: XmtpApi + Send + Sync + 'static, @@ -211,9 +215,9 @@ where retry, (async { conn.get_group_message(&message_id)? - .ok_or(DeviceSyncError::Storage(StorageError::NotFound(format!( - "Message id {message_id:?} not found." - )))) + .ok_or(DeviceSyncError::from(NotFound::MessageById( + message_id.clone(), + ))) }) )?; @@ -240,9 +244,9 @@ where retry, (async { conn.get_group_message(&message_id)? - .ok_or(DeviceSyncError::Storage(StorageError::NotFound(format!( - "Message id {message_id:?} not found." - )))) + .ok_or(DeviceSyncError::from(NotFound::MessageById( + message_id.clone(), + ))) }) )?; diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 3c1320f6c..5f0bfa22c 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -58,7 +58,7 @@ use self::{ intents::IntentError, validated_commit::CommitValidationError, }; -use crate::storage::{group_message::ContentType, StorageError}; +use crate::storage::{group_message::ContentType, NotFound, StorageError}; use xmtp_common::time::now_ns; use xmtp_proto::xmtp::mls::{ api::v1::{ @@ -418,7 +418,9 @@ impl MlsGroup { let mls_group = OpenMlsGroup::load(provider.storage(), &GroupId::from_slice(&self.group_id)) .map_err(crate::StorageError::from)? - .ok_or(crate::StorageError::NotFound("Group Not Found".into()))?; + .ok_or(StorageError::from(NotFound::GroupById( + self.group_id.to_vec(), + )))?; // Perform the operation with the MLS group operation(mls_group).await.map_err(Into::into) diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index 010eab713..547f85134 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -5,7 +5,7 @@ use super::{ schema::groups::{self, dsl}, Sqlite, }; -use crate::{impl_fetch, impl_store, DuplicateItem, StorageError}; +use crate::{impl_fetch, impl_store, storage::NotFound, DuplicateItem, StorageError}; use diesel::{ backend::Backend, deserialize::{self, FromSql, FromSqlRow}, @@ -379,9 +379,8 @@ impl DbConnection { Ok::, StorageError>(ts) })?; - last_ts.ok_or(StorageError::NotFound(format!( - "installation time for group {}", - hex::encode(group_id) + last_ts.ok_or(StorageError::NotFound(NotFound::InstallationTimeForGroup( + group_id, ))) } @@ -407,10 +406,7 @@ impl DbConnection { Ok::<_, StorageError>(ts) })?; - last_ts.ok_or(StorageError::NotFound(format!( - "installation time for group {}", - hex::encode(group_id) - ))) + last_ts.ok_or(NotFound::InstallationTimeForGroup(group_id).into()) } /// Updates the 'last time checked' we checked for new installations. diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 70edb0956..743eccc5e 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -17,7 +17,7 @@ use super::{ use crate::{ groups::intents::{IntentError, SendMessageIntentData}, impl_fetch, impl_store, - storage::StorageError, + storage::{NotFound, StorageError}, utils::id::calculate_message_id, Delete, }; @@ -197,7 +197,7 @@ impl DbConnection { staged_commit: Option>, published_in_epoch: i64, ) -> Result<(), StorageError> { - let res = self.raw_query(|conn| { + let rows_changed = self.raw_query(|conn| { diesel::update(dsl::group_intents) .filter(dsl::id.eq(intent_id)) // State machine requires that the only valid state transition to Published is from @@ -213,30 +213,25 @@ impl DbConnection { .execute(conn) })?; - match res { - // If nothing matched the query, check if its already published, otherwise return an error. Either ID or state was wrong - 0 => { - let already_published = self.raw_query(|conn| { - dsl::group_intents - .filter(dsl::id.eq(intent_id)) - .first::(conn) - }); - - if already_published.is_ok() { - Ok(()) - } else { - Err(StorageError::NotFound(format!( - "Published intent {intent_id} for commit" - ))) - } + if rows_changed == 0 { + let already_published = self.raw_query(|conn| { + dsl::group_intents + .filter(dsl::id.eq(intent_id)) + .first::(conn) + }); + + if already_published.is_ok() { + return Ok(()); + } else { + return Err(NotFound::IntentForToPublish(intent_id).into()); } - _ => Ok(()), } + Ok(()) } // Set the intent with the given ID to `Committed` pub fn set_group_intent_committed(&self, intent_id: ID) -> Result<(), StorageError> { - let res = self.raw_query(|conn| { + let rows_changed = self.raw_query(|conn| { diesel::update(dsl::group_intents) .filter(dsl::id.eq(intent_id)) // State machine requires that the only valid state transition to Committed is from @@ -246,19 +241,18 @@ impl DbConnection { .execute(conn) })?; - match res { - // If nothing matched the query, return an error. Either ID or state was wrong - 0 => Err(StorageError::NotFound(format!( - "Published intent {intent_id} for commit" - ))), - _ => Ok(()), + // If nothing matched the query, return an error. Either ID or state was wrong + if rows_changed == 0 { + return Err(NotFound::IntentForCommitted(intent_id).into()); } + + Ok(()) } // Set the intent with the given ID to `ToPublish`. Wipe any values for `payload_hash` and // `post_commit_data` pub fn set_group_intent_to_publish(&self, intent_id: ID) -> Result<(), StorageError> { - let res = self.raw_query(|conn| { + let rows_changed = self.raw_query(|conn| { diesel::update(dsl::group_intents) .filter(dsl::id.eq(intent_id)) // State machine requires that the only valid state transition to ToPublish is from @@ -275,32 +269,27 @@ impl DbConnection { .execute(conn) })?; - match res { - // If nothing matched the query, return an error. Either ID or state was wrong - 0 => Err(StorageError::NotFound(format!( - "Published intent {intent_id} for ToPublish" - ))), - _ => Ok(()), + if rows_changed == 0 { + return Err(NotFound::IntentForPublish(intent_id).into()); } + Ok(()) } /// Set the intent with the given ID to `Error` #[tracing::instrument(level = "trace", skip(self))] pub fn set_group_intent_error(&self, intent_id: ID) -> Result<(), StorageError> { - let res = self.raw_query(|conn| { + let rows_changed = self.raw_query(|conn| { diesel::update(dsl::group_intents) .filter(dsl::id.eq(intent_id)) .set(dsl::state.eq(IntentState::Error)) .execute(conn) })?; - match res { - // If nothing matched the query, return an error. Either ID or state was wrong - 0 => Err(StorageError::NotFound(format!( - "state for intent {intent_id}" - ))), - _ => Ok(()), + if rows_changed == 0 { + return Err(NotFound::IntentById(intent_id).into()); } + + Ok(()) } // Simple lookup of intents by payload hash, meant to be used when processing messages off the diff --git a/xmtp_mls/src/storage/encrypted_store/refresh_state.rs b/xmtp_mls/src/storage/encrypted_store/refresh_state.rs index f5cf0ba33..b1cfefcb0 100644 --- a/xmtp_mls/src/storage/encrypted_store/refresh_state.rs +++ b/xmtp_mls/src/storage/encrypted_store/refresh_state.rs @@ -8,7 +8,11 @@ use diesel::{ }; use super::{db_connection::DbConnection, schema::refresh_state, Sqlite}; -use crate::{impl_store, impl_store_or_ignore, storage::StorageError, StoreOrIgnore}; +use crate::{ + impl_store, impl_store_or_ignore, + storage::{NotFound, StorageError}, + StoreOrIgnore, +}; #[repr(i32)] #[derive(Debug, Clone, Copy, PartialEq, Eq, AsExpression, Hash, FromSqlRow)] @@ -18,6 +22,16 @@ pub enum EntityKind { Group = 2, } +impl std::fmt::Display for EntityKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use EntityKind::*; + match self { + Welcome => write!(f, "welcome"), + Group => write!(f, "group"), + } + } +} + impl ToSql for EntityKind where i32: ToSql, @@ -96,24 +110,18 @@ impl DbConnection { entity_kind: EntityKind, cursor: i64, ) -> Result { - let state: Option = self.get_refresh_state(&entity_id, entity_kind)?; - match state { - Some(state) => { - use super::schema::refresh_state::dsl; - let num_updated = self.raw_query(|conn| { - diesel::update(&state) - .filter(dsl::cursor.lt(cursor)) - .set(dsl::cursor.eq(cursor)) - .execute(conn) - })?; - Ok(num_updated == 1) - } - None => Err(StorageError::NotFound(format!( - "state for entity ID {} with kind {:?}", - hex::encode(entity_id.as_ref()), - entity_kind - ))), - } + use super::schema::refresh_state::dsl; + let state: RefreshState = self.get_refresh_state(&entity_id, entity_kind)?.ok_or( + NotFound::RefreshStateByIdAndKind(entity_id.as_ref().to_vec(), entity_kind), + )?; + + let num_updated = self.raw_query(|conn| { + diesel::update(&state) + .filter(dsl::cursor.lt(cursor)) + .set(dsl::cursor.eq(cursor)) + .execute(conn) + })?; + Ok(num_updated == 1) } } diff --git a/xmtp_mls/src/storage/encrypted_store/sqlcipher_connection.rs b/xmtp_mls/src/storage/encrypted_store/sqlcipher_connection.rs index fe9350b26..6723f0df1 100644 --- a/xmtp_mls/src/storage/encrypted_store/sqlcipher_connection.rs +++ b/xmtp_mls/src/storage/encrypted_store/sqlcipher_connection.rs @@ -12,7 +12,7 @@ use std::{ path::{Path, PathBuf}, }; -use crate::storage::StorageError; +use crate::storage::{NotFound, StorageError}; use super::{EncryptionKey, StorageOption}; @@ -165,9 +165,9 @@ impl EncryptedConnection { ) -> Result<(), StorageError> { let mut row_iter = conn.load(sql_query("PRAGMA cipher_salt"))?; // cipher salt should always exist. if it doesn't SQLCipher is misconfigured. - let row = row_iter.next().ok_or(StorageError::NotFound( - "Cipher salt doesn't exist in database".into(), - ))??; + let row = row_iter + .next() + .ok_or(NotFound::CipherSalt(path.to_string()))??; let salt = >::build_from_row(&row)?; tracing::debug!( salt, diff --git a/xmtp_mls/src/storage/errors.rs b/xmtp_mls/src/storage/errors.rs index 3cc3df2a8..8351acbe6 100644 --- a/xmtp_mls/src/storage/errors.rs +++ b/xmtp_mls/src/storage/errors.rs @@ -3,7 +3,10 @@ use std::sync::PoisonError; use diesel::result::DatabaseErrorKind; use thiserror::Error; -use super::sql_key_store::{self, SqlKeyStoreError}; +use super::{ + refresh_state::EntityKind, + sql_key_store::{self, SqlKeyStoreError}, +}; use crate::groups::intents::IntentError; use xmtp_common::{retryable, RetryableError}; @@ -27,10 +30,9 @@ pub enum StorageError { Serialization(String), #[error("deserialization error")] Deserialization(String), - // TODO:insipx Make NotFound into an enum of possible items that may not be found - #[error("{0} not found")] - NotFound(String), - #[error("lock")] + #[error(transparent)] + NotFound(#[from] NotFound), + #[error("lock {0}")] Lock(String), #[error("Pool needs to reconnect before use")] PoolNeedsConnection, @@ -50,6 +52,35 @@ pub enum StorageError { OpenMlsStorage(#[from] SqlKeyStoreError), } +#[derive(Error, Debug)] +// Monolithic enum for all things lost +pub enum NotFound { + #[error("group with welcome id {0} not found")] + GroupByWelcome(i64), + #[error("group with id {id} not found", id = hex::encode(_0))] + GroupById(Vec), + #[error("installation time for group {id}", id = hex::encode(_0))] + InstallationTimeForGroup(Vec), + #[error("inbox id for address {0} not found")] + InboxIdForAddress(String), + #[error("message id {id} not found", id = hex::encode(_0))] + MessageById(Vec), + #[error("dm by dm_target_inbox_id {0} not found")] + DmByInbox(String), + #[error("intent with id {0} for state Publish from ToPublish not found")] + IntentForToPublish(i32), + #[error("intent with id {0} for state ToPublish from Published not found")] + IntentForPublish(i32), + #[error("intent with id {0} for state Committed from Published not found")] + IntentForCommitted(i32), + #[error("Intent with id {0} not found")] + IntentById(i32), + #[error("refresh state with id {id} and kind {1} not found", id = hex::encode(_0))] + RefreshStateByIdAndKind(Vec, EntityKind), + #[error("Cipher salt for db at [`{0}`] not found")] + CipherSalt(String), +} + #[derive(Error, Debug)] pub enum DuplicateItem { #[error("the welcome id {0:?} already exists")] @@ -105,6 +136,12 @@ impl RetryableError for StorageError { } } +impl RetryableError for NotFound { + fn is_retryable(&self) -> bool { + true + } +} + // OpenMLS KeyStore errors impl RetryableError for openmls::group::AddMembersError { fn is_retryable(&self) -> bool {