From 45adbfe9ee1641694db84a3778a11700e16c5e2d Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 28 Aug 2024 17:26:10 -0600 Subject: [PATCH 1/6] big refator of connection --- xmtp_mls/src/storage/encrypted_store/mod.rs | 41 ++++++++++++--------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index f5d2f843d..341be72ad 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -178,26 +178,33 @@ impl EncryptedMessageStore { pub(crate) fn raw_conn( &self, ) -> Result>, StorageError> { - let pool_guard = self.pool.read(); - - let pool = pool_guard - .as_ref() - .ok_or(StorageError::PoolNeedsConnection)?; - - log::debug!( - "Pulling connection from pool, idle_connections={}, total_connections={}", - pool.state().idle_connections, - pool.state().connections - ); + for _ in 0..3 { + if let Some(pool) = self.pool.read().as_ref() { + log::debug!( + "Pulling connection from pool, idle_connections={}, total_connections={}", + pool.state().idle_connections, + pool.state().connections + ); - let mut conn = pool.get()?; - if let Some(ref key) = self.enc_key { - conn.batch_execute(&format!("PRAGMA key = \"x'{}'\";", hex::encode(key)))?; + match pool.get() { + Ok(mut conn) => { + if let Some(ref key) = self.enc_key { + conn.batch_execute(&format!( + "PRAGMA key = \"x'{}'\";", + hex::encode(key) + ))?; + } + conn.batch_execute("PRAGMA busy_timeout = 5000;")?; + return Ok(conn); + } + Err(_) => continue, + } + } else { + continue; + } } - conn.batch_execute("PRAGMA busy_timeout = 5000;")?; - - Ok(conn) + Err(StorageError::PoolNeedsConnection) } pub fn conn(&self) -> Result { From 81ed1933ef0380e1d54203bb0d650d3cc3477999 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 28 Aug 2024 17:32:19 -0600 Subject: [PATCH 2/6] just make the error retryable --- xmtp_mls/src/storage/encrypted_store/mod.rs | 41 +++++++++------------ xmtp_mls/src/storage/errors.rs | 1 + 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 341be72ad..f5d2f843d 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -178,33 +178,26 @@ impl EncryptedMessageStore { pub(crate) fn raw_conn( &self, ) -> Result>, StorageError> { - for _ in 0..3 { - if let Some(pool) = self.pool.read().as_ref() { - log::debug!( - "Pulling connection from pool, idle_connections={}, total_connections={}", - pool.state().idle_connections, - pool.state().connections - ); + let pool_guard = self.pool.read(); - match pool.get() { - Ok(mut conn) => { - if let Some(ref key) = self.enc_key { - conn.batch_execute(&format!( - "PRAGMA key = \"x'{}'\";", - hex::encode(key) - ))?; - } - conn.batch_execute("PRAGMA busy_timeout = 5000;")?; - return Ok(conn); - } - Err(_) => continue, - } - } else { - continue; - } + let pool = pool_guard + .as_ref() + .ok_or(StorageError::PoolNeedsConnection)?; + + log::debug!( + "Pulling connection from pool, idle_connections={}, total_connections={}", + pool.state().idle_connections, + pool.state().connections + ); + + let mut conn = pool.get()?; + if let Some(ref key) = self.enc_key { + conn.batch_execute(&format!("PRAGMA key = \"x'{}'\";", hex::encode(key)))?; } - Err(StorageError::PoolNeedsConnection) + conn.batch_execute("PRAGMA busy_timeout = 5000;")?; + + Ok(conn) } pub fn conn(&self) -> Result { diff --git a/xmtp_mls/src/storage/errors.rs b/xmtp_mls/src/storage/errors.rs index c73d81b12..7e6b914f3 100644 --- a/xmtp_mls/src/storage/errors.rs +++ b/xmtp_mls/src/storage/errors.rs @@ -68,6 +68,7 @@ impl RetryableError for StorageError { Self::Pool(_) => true, Self::Lock(_) => true, Self::SqlCipherNotLoaded => true, + Self::PoolNeedsConnection => true, _ => false, } } From 0fe99ae5d4d24884761c0e96341a4787a4679977 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 29 Aug 2024 14:00:02 -0600 Subject: [PATCH 3/6] add new error that can be caught --- xmtp_mls/src/groups/mod.rs | 2 ++ xmtp_mls/src/groups/sync.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index e9f4ca1de..ee45e455f 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -187,6 +187,8 @@ pub enum GroupError { SqlKeyStore(#[from] sql_key_store::SqlKeyStoreError), #[error("No pending commit found")] MissingPendingCommit, + #[error("Sync failed to wait for intent: {0}")] + SyncFailedToWait(String), } impl RetryableError for GroupError { diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index ccd1f10b4..73fc56403 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -221,7 +221,7 @@ impl MlsGroup { num_attempts += 1; } - Err(last_err.unwrap_or(GroupError::Generic("failed to wait for intent".to_string()))) + Err(last_err.unwrap_or(GroupError::SyncFailedToWait("failed to wait for intent".to_string()))) } fn is_valid_epoch( From a09431d7f2266f471479fccec3d8aa3b15b3cbea Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 29 Aug 2024 14:45:43 -0600 Subject: [PATCH 4/6] add retries to the places that kick out into signature requests --- xmtp_mls/src/groups/sync.rs | 4 +++- xmtp_mls/src/identity_updates.rs | 39 +++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index 73fc56403..13489c08f 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -221,7 +221,9 @@ impl MlsGroup { num_attempts += 1; } - Err(last_err.unwrap_or(GroupError::SyncFailedToWait("failed to wait for intent".to_string()))) + Err(last_err.unwrap_or(GroupError::SyncFailedToWait( + "failed to wait for intent".to_string(), + ))) } fn is_valid_epoch( diff --git a/xmtp_mls/src/identity_updates.rs b/xmtp_mls/src/identity_updates.rs index 20e656f9f..b4350eb53 100644 --- a/xmtp_mls/src/identity_updates.rs +++ b/xmtp_mls/src/identity_updates.rs @@ -1,6 +1,10 @@ use std::collections::{HashMap, HashSet}; -use crate::{retry::RetryableError, retryable, storage::association_state::StoredAssociationState}; +use crate::{ + retry::{Retry, RetryableError}, + retry_async, retryable, + storage::association_state::StoredAssociationState, +}; use prost::Message; use thiserror::Error; use xmtp_id::associations::{ @@ -239,9 +243,13 @@ where wallets_to_revoke: Vec, ) -> Result { let inbox_id = self.inbox_id(); - let current_state = self - .get_association_state(&self.store().conn()?, &inbox_id, None) - .await?; + let current_state = retry_async!( + Retry::default(), + (async { + self.get_association_state(&self.store().conn()?, &inbox_id, None) + .await + }) + )?; let mut builder = SignatureRequestBuilder::new(inbox_id); for wallet in wallets_to_revoke { @@ -259,9 +267,14 @@ where installation_ids: Vec>, ) -> Result { let inbox_id = self.inbox_id(); - let current_state = self - .get_association_state(&self.store().conn()?, &inbox_id, None) - .await?; + + let current_state = retry_async!( + Retry::default(), + (async { + self.get_association_state(&self.store().conn()?, &inbox_id, None) + .await + }) + )?; let mut builder = SignatureRequestBuilder::new(inbox_id); @@ -291,7 +304,17 @@ where .await?; // Load the identity updates for the inbox so that we have a record in our DB - load_identity_updates(&self.api_client, &self.store().conn()?, vec![inbox_id]).await?; + retry_async!( + Retry::default(), + (async { + load_identity_updates( + &self.api_client, + &self.store().conn()?, + vec![inbox_id.clone()], + ) + .await + }) + )?; Ok(()) } From db0ca10ff4935f44c77f8213fcf99d3ce81d7ca2 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Thu, 29 Aug 2024 18:51:06 -0400 Subject: [PATCH 5/6] Apply suggestions from code review --- xmtp_mls/src/groups/mod.rs | 4 ++-- xmtp_mls/src/groups/sync.rs | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 5a8b87f06..8499fd121 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -188,8 +188,8 @@ pub enum GroupError { SqlKeyStore(#[from] sql_key_store::SqlKeyStoreError), #[error("No pending commit found")] MissingPendingCommit, - #[error("Sync failed to wait for intent: {0}")] - SyncFailedToWait(String), + #[error("Sync failed to wait for intent")] + SyncFailedToWait, } impl RetryableError for GroupError { diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index c699e1a35..9c2f86dd1 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -221,9 +221,8 @@ impl MlsGroup { num_attempts += 1; } - Err(last_err.unwrap_or(GroupError::SyncFailedToWait( - "failed to wait for intent".to_string(), - ))) + Err(last_err.unwrap_or(GroupError::SyncFailedToWait) + )) } fn is_valid_epoch( From 04be5f2ff4e790aca63a9237dbc9cfdcb061d6da Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Thu, 29 Aug 2024 19:04:08 -0400 Subject: [PATCH 6/6] fix parenthesis --- xmtp_mls/src/groups/sync.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index 9c2f86dd1..bbcfbc41d 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -221,8 +221,7 @@ impl MlsGroup { num_attempts += 1; } - Err(last_err.unwrap_or(GroupError::SyncFailedToWait) - )) + Err(last_err.unwrap_or(GroupError::SyncFailedToWait)) } fn is_valid_epoch(