From a97b69fb85da16fa3a5136c7f4efc36b65fb3b38 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Fri, 17 Nov 2023 16:39:46 -0500 Subject: [PATCH 01/10] retry --- Cargo.lock | 12 +++++ xmtp_mls/Cargo.toml | 1 + xmtp_mls/src/client.rs | 1 + xmtp_mls/src/groups/mod.rs | 13 +++++- xmtp_mls/src/lib.rs | 1 + xmtp_mls/src/retry.rs | 89 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 xmtp_mls/src/retry.rs diff --git a/Cargo.lock b/Cargo.lock index 37729ccfe..b10a45058 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5133,6 +5133,17 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "smart-default" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eb01866308440fc64d6c44d9e86c5cc17adfe33c4d6eed55da9145044d0ffc1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "socket2" version = "0.4.10" @@ -6562,6 +6573,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "smart-default", "tempfile", "thiserror", "tls_codec", diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index 144eb5a79..6a5d13b32 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -48,6 +48,7 @@ toml = "0.7.4" xmtp_cryptography = { path = "../xmtp_cryptography" } xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] } tls_codec = "0.3.0" +smart-default = "0.7.1" [dev-dependencies] mockall = "0.11.4" diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 6a379fd9e..dc617258c 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -13,6 +13,7 @@ use crate::{ api_client_wrapper::{ApiClientWrapper, IdentityUpdate}, groups::MlsGroup, identity::Identity, + retry::RetryableError, storage::{group::GroupMembershipState, DbConnection, EncryptedMessageStore, StorageError}, types::Address, utils::topic::get_welcome_topic, diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 5b85838f5..3f897a94a 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -73,6 +73,15 @@ pub enum GroupError { Diesel(#[from] diesel::result::Error), } +impl crate::retry::RetryableError for GroupError { + fn is_retryable(&self) -> bool { + match self { + Self::Diesel(_) => true, + _ => false, + } + } +} + #[derive(Debug, Error)] pub enum MessageProcessingError { #[error("[{message_time_ns:?}] invalid sender with credential: {credential:?}")] @@ -272,7 +281,7 @@ where } _ => (), }; - // TOOD: Handle writing transcript messages for adding/removing members + // TODO: Handle writing transcript messages for adding/removing members } IntentKind::SendMessage => { let intent_data = SendMessageIntentData::from_bytes(intent.data.as_slice())?; @@ -668,7 +677,7 @@ mod tests { use openmls::prelude::Member; use xmtp_cryptography::utils::generate_local_wallet; - use crate::{ + use create::{ builder::ClientBuilder, storage::group_intent::IntentState, storage::EncryptedMessageStore, utils::topic::get_welcome_topic, }; diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index a6ea9c2e6..f693c6496 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -8,6 +8,7 @@ pub mod identity; pub mod mock_xmtp_api_client; pub mod owner; mod proto_wrapper; +pub mod retry; pub mod storage; pub mod types; pub mod utils; diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs new file mode 100644 index 000000000..55d351392 --- /dev/null +++ b/xmtp_mls/src/retry.rs @@ -0,0 +1,89 @@ +use smart_default::SmartDefault; + +pub trait RetryableError: std::error::Error { + fn is_retryable(&self) -> bool { + false + } +} + +// we use &T as a workaround for specialization here +impl RetryableError for &T where T: std::error::Error {} + +#[derive(SmartDefault, PartialEq, Eq, Copy, Clone)] +pub struct Retry { + #[default = 3] + retries: usize, + #[default(_code = "std::time::Duration::from_millis(100)")] + duration: std::time::Duration, +} + +#[derive(Default, PartialEq, Eq, Copy, Clone)] +pub struct RetryBuilder { + retries: Option, + duration: Option, +} + +impl RetryBuilder { + pub fn retries(mut self, retries: usize) -> Self { + self.retries = Some(retries); + self + } + + pub fn duration(mut self, duration: std::time::Duration) -> Self { + self.duration = Some(duration); + self + } + + pub fn build(self) -> Retry { + let mut retry: Retry = Default::default(); + + if let Some(retries) = self.retries { + retry.retries = retries; + } + + if let Some(duration) = self.duration { + retry.duration = duration; + } + + retry + } +} + +impl Retry { + pub fn builder() -> RetryBuilder { + RetryBuilder::default() + } + + pub fn retry(&self, fun: F) -> Result + where + E: RetryableError, + F: FnOnce(Retry), + { + todo!() + } +} + +#[derive(Clone, PartialEq, Eq)] +pub enum RetryableResult { + Ok(T), + Retry(E), + Err(E), +} + +impl From> for RetryableResult +where + E: RetryableError, +{ + fn from(res: Result) -> RetryableResult { + match res { + Result::Ok(value) => RetryableResult::Ok(value), + Result::Err(e) => { + if e.is_retryable() { + RetryableResult::Retry(e) + } else { + RetryableResult::Err(e) + } + } + } + } +} From 1b731208403ea0d53ec617e716501a31d3f3809a Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 20 Nov 2023 18:13:10 -0500 Subject: [PATCH 02/10] finished, need to wrap retryables with retry --- Cargo.lock | 25 +++ xmtp_mls/Cargo.toml | 1 + xmtp_mls/src/client.rs | 1 - xmtp_mls/src/groups/mod.rs | 23 ++- xmtp_mls/src/retry.rs | 311 +++++++++++++++++++++++++++++++++---- 5 files changed, 327 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b10a45058..699f7dc0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1945,6 +1945,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3145,6 +3157,15 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.11", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -5204,6 +5225,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -6560,6 +6584,7 @@ dependencies = [ "diesel_migrations", "ethers", "ethers-core 2.0.10", + "flume", "futures", "hex", "libsqlite3-sys", diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index 6a5d13b32..ff70743b4 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -56,3 +56,4 @@ rand = "0.8.5" tempfile = "3.5.0" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } xmtp_api_grpc = { path = "../xmtp_api_grpc" } +flume = "0.11" diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index dc617258c..6a379fd9e 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -13,7 +13,6 @@ use crate::{ api_client_wrapper::{ApiClientWrapper, IdentityUpdate}, groups::MlsGroup, identity::Identity, - retry::RetryableError, storage::{group::GroupMembershipState, DbConnection, EncryptedMessageStore, StorageError}, types::Address, utils::topic::get_welcome_topic, diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 3f897a94a..d7cc73d34 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -28,6 +28,9 @@ use crate::{ client::ClientError, configuration::CIPHERSUITE, identity::Identity, + retry, + retry::Retry, + retry_async, storage::{ group::{GroupMembershipState, StoredGroup}, group_intent::{IntentKind, IntentState, NewGroupIntent, StoredGroupIntent}, @@ -506,7 +509,10 @@ where )?; for intent in intents { - let result = self.get_publish_intent_data(&provider, &mut openmls_group, &intent); + let result = retry!( + Retry::default(), + (|| self.get_publish_intent_data(&provider, &mut openmls_group, &intent)) + ); if let Err(e) = result { log::error!("error getting publish intent data {:?}", e); // TODO: Figure out which types of errors we should abort completely on and which @@ -517,10 +523,15 @@ where let (payload, post_commit_data) = result.expect("result already checked"); let payload_slice = payload.as_slice(); - self.client - .api_client - .publish_to_group(vec![payload_slice]) - .await?; + retry_async!( + Retry::default(), + (|| async { + self.client + .api_client + .publish_to_group(vec![payload_slice]) + .await + }) + )?; EncryptedMessageStore::set_group_intent_published( &mut provider.conn().borrow_mut(), @@ -677,7 +688,7 @@ mod tests { use openmls::prelude::Member; use xmtp_cryptography::utils::generate_local_wallet; - use create::{ + use crate::{ builder::ClientBuilder, storage::group_intent::IntentState, storage::EncryptedMessageStore, utils::topic::get_welcome_topic, }; diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index 55d351392..e4e2f979a 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -1,14 +1,24 @@ +//! A retry strategy that works with rusts native [`std::error::Error`] type. +//! +//! TODO: Could make the impl of `RetryableError` trait into a proc-macro to auto-derive Retryable +//! on annotated enum variants. + +use std::time::Duration; + use smart_default::SmartDefault; +/// Specifies which errors are retryable. +/// All Errors are not retryable by-default. pub trait RetryableError: std::error::Error { fn is_retryable(&self) -> bool { false } } -// we use &T as a workaround for specialization here +// we use &T and make use of autoref specialization impl RetryableError for &T where T: std::error::Error {} +/// Options to specify how to retry a function #[derive(SmartDefault, PartialEq, Eq, Copy, Clone)] pub struct Retry { #[default = 3] @@ -17,25 +27,52 @@ pub struct Retry { duration: std::time::Duration, } +impl Retry { + /// Get the number of retries this is configured with. + pub fn retries(&self) -> usize { + self.retries + } + + /// Get the duration to wait between retries. + pub fn duration(&self) -> Duration { + self.duration + } +} + +/// Builder for [`Retry`] #[derive(Default, PartialEq, Eq, Copy, Clone)] pub struct RetryBuilder { retries: Option, duration: Option, } +/// Builder for [`Retry`]. +/// +/// # Example +/// ``` +/// use xmtp_mls::retry::RetryBuilder; +/// +/// RetryBuilder::default() +/// .retries(5) +/// .duration(std::time::Duration::from_millis(1000)) +/// .build(); +/// ``` impl RetryBuilder { + /// Specify the numaber of retries to allow pub fn retries(mut self, retries: usize) -> Self { self.retries = Some(retries); self } + /// Specify the duration to wait before retrying again pub fn duration(mut self, duration: std::time::Duration) -> Self { self.duration = Some(duration); self } + /// Build the Retry Strategy pub fn build(self) -> Retry { - let mut retry: Retry = Default::default(); + let mut retry = Retry::default(); if let Some(retries) = self.retries { retry.retries = retries; @@ -50,40 +87,260 @@ impl RetryBuilder { } impl Retry { + /// Get the builder for [`Retry`] pub fn builder() -> RetryBuilder { RetryBuilder::default() } - - pub fn retry(&self, fun: F) -> Result - where - E: RetryableError, - F: FnOnce(Retry), - { - todo!() - } } -#[derive(Clone, PartialEq, Eq)] -pub enum RetryableResult { - Ok(T), - Retry(E), - Err(E), +/// Retry a function, specifying the strategy with $retry. +/// +/// # Example +/// ``` +/// use thiserror::Error; +/// use xmtp_mls::{retry, retry::{RetryableError, Retry}}; +/// +/// #[derive(Debug, Error)] +/// enum MyError { +/// #[error("A retryable error")] +/// Retryable, +/// #[error("An error we don't want to retry")] +/// NotRetryable +/// } +/// +/// impl RetryableError for MyError { +/// fn is_retryable(&self) -> bool { +/// match self { +/// Self::Retryable => true, +/// _=> false, +/// } +/// } +/// } +/// +/// fn fallable_fn(i: usize) -> Result<(), MyError> { +/// if i == 2 { +/// return Ok(()); +/// } +/// +/// Err(MyError::Retryable) +/// } +/// +/// fn main() { +/// let mut i = 0; +/// retry!(Retry::default(), (|| -> Result<(), MyError> { +/// let res = fallable_fn(i); +/// i += 1; +/// res +/// })).unwrap(); +/// +/// } +/// ``` +#[macro_export] +macro_rules! retry { + ($retry: expr, $code: tt) => {{ + use $crate::retry::RetryableError; + let mut attempts = 0; + loop { + match $code() { + Ok(v) => break Ok(v), + Err(e) => { + if (&e).is_retryable() && attempts < $retry.retries() { + log::debug!( + "retrying function that failed with error=`{}`", + e.to_string() + ); + attempts += 1; + std::thread::sleep($retry.duration()); + } else { + break Err(e); + } + } + } + } + }}; } -impl From> for RetryableResult -where - E: RetryableError, -{ - fn from(res: Result) -> RetryableResult { - match res { - Result::Ok(value) => RetryableResult::Ok(value), - Result::Err(e) => { - if e.is_retryable() { - RetryableResult::Retry(e) - } else { - RetryableResult::Err(e) +/// Retry but for an async context +/// ``` +/// use xmtp_mls::{retry_async, retry::{RetryableError, Retry}}; +/// use thiserror::Error; +/// use flume::bounded; +/// +/// #[derive(Debug, Error)] +/// enum MyError { +/// #[error("A retryable error")] +/// Retryable, +/// #[error("An error we don't want to retry")] +/// NotRetryable +/// } +/// +/// impl RetryableError for MyError { +/// fn is_retryable(&self) -> bool { +/// match self { +/// Self::Retryable => true, +/// _=> false, +/// } +/// } +/// } +/// +/// async fn fallable_fn(rx: &flume::Receiver) -> Result<(), MyError> { +/// if rx.recv_async().await.unwrap() == 2 { +/// return Ok(()); +/// } +/// Err(MyError::Retryable) +/// } +/// +/// #[tokio::main] +/// async fn main() -> Result<(), MyError> { +/// +/// let (tx, rx) = flume::bounded(3); +/// tx.send(0); +/// tx.send(1); +/// tx.send(2); +/// +/// retry_async!(Retry::default(), (|| async { +/// fallable_fn(&rx.clone()).await +/// })) +/// } +/// ``` +#[macro_export] +macro_rules! retry_async { + ($retry: expr, $code: tt) => {{ + use $crate::retry::RetryableError; + let mut attempts = 0; + loop { + match $code().await { + Ok(v) => break Ok(v), + Err(e) => { + if (&e).is_retryable() && attempts < $retry.retries() { + log::debug!("retrying function that failed with error={}", e.to_string()); + attempts += 1; + tokio::time::sleep($retry.duration()).await; + } else { + break Err(e); + } } } } + }}; +} + +#[cfg(test)] +mod tests { + use super::*; + use thiserror::Error; + + #[derive(Debug, Error)] + enum SomeError { + #[error("this is a retryable error")] + ARetryableError, + #[error("Dont retry")] + DontRetryThis, + } + + impl RetryableError for SomeError { + fn is_retryable(&self) -> bool { + match self { + Self::ARetryableError => true, + _ => false, + } + } + } + + fn retry_error_fn() -> Result<(), SomeError> { + Err(SomeError::ARetryableError) + } + + fn retryable_with_args(foo: usize, name: String, list: &Vec) -> Result<(), SomeError> { + println!("I am {} of {} with items {:?}", foo, name, list); + Err(SomeError::ARetryableError) + } + + #[test] + fn it_retries_twice_and_succeeds() { + crate::tests::setup(); + + let mut i = 0; + let mut test_fn = || -> Result<(), SomeError> { + if i == 2 { + return Ok(()); + } + i += 1; + retry_error_fn()?; + Ok(()) + }; + + retry!(Retry::default(), test_fn).unwrap(); + } + + #[test] + fn it_works_with_random_args() { + crate::tests::setup(); + + let mut i = 0; + let list = vec!["String".into(), "Foo".into()]; + let mut test_fn = || -> Result<(), SomeError> { + if i == 2 { + return Ok(()); + } + i += 1; + retryable_with_args(i, "Hello".to_string(), &list) + }; + + retry!(Retry::default(), test_fn).unwrap(); + } + + #[test] + fn it_fails_on_three_retries() { + crate::tests::setup(); + + let result: Result<(), SomeError> = retry!( + Retry::default(), + (|| -> Result<(), SomeError> { + retry_error_fn()?; + Ok(()) + }) + ); + + assert!(result.is_err()) + } + + #[test] + fn it_only_runs_non_retryable_once() { + crate::tests::setup(); + + let mut attempts = 0; + let mut test_fn = || -> Result<(), SomeError> { + attempts += 1; + Err(SomeError::DontRetryThis) + }; + + let _r = retry!(Retry::default(), test_fn); + + assert_eq!(attempts, 1); + } + + #[tokio::test] + async fn it_works_async() { + crate::tests::setup(); + + async fn retryable_async_fn(rx: &flume::Receiver) -> Result<(), SomeError> { + let val = rx.recv_async().await.unwrap(); + if val == 2 { + return Ok(()); + } + // do some work + tokio::time::sleep(std::time::Duration::from_nanos(100)).await; + Err(SomeError::ARetryableError) + } + + let (tx, rx) = flume::bounded(3); + tx.send(0).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + let test_future = || async { retryable_async_fn(&rx.clone()).await }; + + retry_async!(Retry::default(), test_future).unwrap(); + assert!(rx.is_empty()); } } From d9ecc5e6a2c771e3b52df7e4bdeb25bc8b89564b Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 20 Nov 2023 18:55:51 -0500 Subject: [PATCH 03/10] for loop --- xmtp_mls/src/retry.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index e4e2f979a..ddd349540 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -194,10 +194,10 @@ macro_rules! retry { /// async fn main() -> Result<(), MyError> { /// /// let (tx, rx) = flume::bounded(3); -/// tx.send(0); -/// tx.send(1); -/// tx.send(2); /// +/// for i in 0..3 { +/// tx.send(i).unwrap(); +/// } /// retry_async!(Retry::default(), (|| async { /// fallable_fn(&rx.clone()).await /// })) @@ -335,11 +335,11 @@ mod tests { } let (tx, rx) = flume::bounded(3); - tx.send(0).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - let test_future = || async { retryable_async_fn(&rx.clone()).await }; + for i in 0..3 { + tx.send(i).unwrap(); + } + let test_future = || async { retryable_async_fn(&rx.clone()).await }; retry_async!(Retry::default(), test_future).unwrap(); assert!(rx.is_empty()); } From 5ddf4b0f9e249485d916358ca9dfb42bfffbd372 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 20 Nov 2023 18:57:03 -0500 Subject: [PATCH 04/10] typo --- xmtp_mls/src/retry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index ddd349540..dec86843d 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -58,7 +58,7 @@ pub struct RetryBuilder { /// .build(); /// ``` impl RetryBuilder { - /// Specify the numaber of retries to allow + /// Specify the number of retries to allow pub fn retries(mut self, retries: usize) -> Self { self.retries = Some(retries); self From 02b5f9d26f5d4ebe9d46086c1473f9190aca5c80 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Tue, 21 Nov 2023 12:00:40 -0500 Subject: [PATCH 05/10] impl retryable on api_client errors --- xmtp_mls/src/retry.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index dec86843d..514dad4bc 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -58,7 +58,7 @@ pub struct RetryBuilder { /// .build(); /// ``` impl RetryBuilder { - /// Specify the number of retries to allow + /// Specify the of retries to allow pub fn retries(mut self, retries: usize) -> Self { self.retries = Some(retries); self @@ -225,6 +225,13 @@ macro_rules! retry_async { }}; } +// network errors should generally be retryable, unless there's a bug in our code +impl RetryableError for xmtp_proto::api_client::Error { + fn is_retryable(&self) -> bool { + true + } +} + #[cfg(test)] mod tests { use super::*; From f589cadf65e5e15e2c7ae4bbcef8dffb30bd039d Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Tue, 21 Nov 2023 15:13:19 -0500 Subject: [PATCH 06/10] testing with vecdeque --- xmtp_mls/src/client.rs | 11 ++- xmtp_mls/src/groups/mod.rs | 104 ++++++++++---------- xmtp_mls/src/retry.rs | 28 ++++++ xmtp_mls/src/storage/encrypted_store/mod.rs | 2 +- xmtp_mls/src/storage/errors.rs | 12 +++ 5 files changed, 102 insertions(+), 55 deletions(-) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index b8b8b2f07..bafa98cb7 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -90,6 +90,15 @@ pub enum MessageProcessingError { UnsupportedMessageType(Discriminant), } +impl crate::retry::RetryableError for MessageProcessingError { + fn is_retryable(&self) -> bool { + match self { + Self::Storage(s) => s.is_retryable(), + _ => false, + } + } +} + impl From for ClientError { fn from(value: String) -> Self { Self::Generic(value) @@ -251,8 +260,6 @@ where where ProcessingFn: FnOnce(XmtpOpenMlsProvider) -> Result, { - // TODO: We can handle errors in the transaction() function to make error handling - // cleaner. Retryable errors can possibly be part of their own enum XmtpOpenMlsProvider::transaction(&mut self.store.conn()?, |provider| { let is_updated = { EncryptedMessageStore::update_last_synced_timestamp_for_topic( diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 6f2c73f75..e94b812fb 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -14,9 +14,9 @@ use openmls::{ prelude_test::KeyPackage, }; use openmls_traits::OpenMlsProvider; -use std::mem::discriminant; #[cfg(test)] use std::println as debug; +use std::{collections::VecDeque, mem::discriminant}; use thiserror::Error; use tls_codec::{Deserialize, Serialize}; use xmtp_proto::api_client::{Envelope, XmtpApiClient, XmtpMlsClient}; @@ -30,7 +30,7 @@ use crate::{ identity::Identity, retry, retry::Retry, - retry_async, + retry_async, retryable, storage::{ group::{GroupMembershipState, StoredGroup}, group_intent::{IntentKind, IntentState, NewGroupIntent, StoredGroupIntent}, @@ -68,8 +68,10 @@ pub enum GroupError { WelcomeError(#[from] openmls::prelude::WelcomeError), #[error("client: {0}")] Client(#[from] ClientError), - #[error("receive errors: {0:?}")] - ReceiveError(Vec), + #[error("receive error: {0}")] + ReceiveError(#[from] MessageProcessingError), + #[error("Receive errors: {0:?}")] + ReceiveErrors(Vec), #[error("generic: {0}")] Generic(String), #[error("diesel error {0}")] @@ -80,41 +82,12 @@ impl crate::retry::RetryableError for GroupError { fn is_retryable(&self) -> bool { match self { Self::Diesel(_) => true, + Self::ReceiveError(msg) => retryable!(msg), _ => false, } } } -#[derive(Debug, Error)] -pub enum MessageProcessingError { - #[error("[{message_time_ns:?}] invalid sender with credential: {credential:?}")] - InvalidSender { - message_time_ns: u64, - credential: Vec, - }, - #[error("openmls process message error: {0}")] - OpenMlsProcessMessage(#[from] openmls::prelude::ProcessMessageError), - #[error("merge pending commit: {0}")] - MergePendingCommit(#[from] openmls::group::MergePendingCommitError), - #[error("merge staged commit: {0}")] - MergeStagedCommit(#[from] openmls::group::MergeCommitError), - #[error( - "no pending commit to merge. group epoch is {group_epoch:?} and got {message_epoch:?}" - )] - NoPendingCommit { - message_epoch: GroupEpoch, - group_epoch: GroupEpoch, - }, - #[error("intent error: {0}")] - Intent(#[from] IntentError), - #[error("storage error: {0}")] - Storage(#[from] crate::storage::StorageError), - #[error("tls deserialization: {0}")] - TlsDeserialization(#[from] tls_codec::Error), - #[error("unsupported message type: {0:?}")] - UnsupportedMessageType(Discriminant), -} - pub struct MlsGroup<'c, ApiClient> { pub group_id: Vec, pub created_at_ns: i64, @@ -420,37 +393,64 @@ where } } - pub(crate) fn process_messages(&self, envelopes: Vec) -> Result<(), GroupError> { + fn consume_message( + &self, + envelopes: &mut VecDeque, + openmls_group: &mut OpenMlsGroup, + ) -> Result<(), MessageProcessingError> { + let envelope = envelopes.pop_front(); + if let Some(envelope) = envelope { + let result = self.client.process_for_topic( + &self.topic(), + envelope.timestamp_ns, + |provider| -> Result<(), MessageProcessingError> { + self.process_message(openmls_group, &provider, &envelope)?; + openmls_group.save(provider.key_store())?; + Ok(()) + }, + ); + if result.is_err() && retryable!(result.as_ref().unwrap_err()) { + envelopes.push_front(envelope); + return result; + } + } + + Ok(()) + } + + pub fn process_messages(&self, envelopes: Vec) -> Result<(), GroupError> { + let mut receive_errors = vec![]; + let mut envelopes = VecDeque::from(envelopes); + let mut conn = self.client.store.conn()?; let provider = self.client.mls_provider(&mut conn); let mut openmls_group = self.load_mls_group(&provider)?; - let receive_errors: Vec = envelopes - .into_iter() - .map(|envelope| -> Result<(), MessageProcessingError> { - self.client.process_for_topic( - &self.topic(), - envelope.timestamp_ns, - |provider| -> Result<(), MessageProcessingError> { - self.process_message(&mut openmls_group, &provider, &envelope)?; - openmls_group.save(provider.key_store())?; - Ok(()) - }, - ) - }) - .filter_map(|result| result.err()) - .collect(); + + loop { + let result = retry!( + Retry::default(), + (|| self.consume_message(&mut envelopes, &mut openmls_group)) + ); + + if envelopes.is_empty() { + break; + } + } if receive_errors.is_empty() { Ok(()) } else { debug!("Message processing errors: {:?}", receive_errors); - Err(GroupError::ReceiveError(receive_errors)) + Err(GroupError::ReceiveErrors(receive_errors)) } } pub async fn receive(&self) -> Result<(), GroupError> { let envelopes = self.client.pull_from_topic(&self.topic()).await?; - self.process_messages(envelopes) + + self.process_messages(envelopes)?; + + Ok(()) } pub async fn send_message(&self, message: &[u8]) -> Result<(), GroupError> { diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index 514dad4bc..ae78a5baa 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -2,6 +2,19 @@ //! //! TODO: Could make the impl of `RetryableError` trait into a proc-macro to auto-derive Retryable //! on annotated enum variants. +//! ```ignore +//! #[derive(Debug, Error)] +//! enum ErrorFoo { +//! #[error("I am retryable")] +//! #[retryable] +//! Retryable, +//! #[error("Nested errors are retryable")] +//! #[retryable(inherit)] +//! NestedRetryable(AnotherErrorWithRetryableVariants), +//! #[error("Always fail")] +//! NotRetryable +//! } +//! ``` use std::time::Duration; @@ -138,6 +151,7 @@ impl Retry { #[macro_export] macro_rules! retry { ($retry: expr, $code: tt) => {{ + #[allow(unused)] use $crate::retry::RetryableError; let mut attempts = 0; loop { @@ -206,6 +220,7 @@ macro_rules! retry { #[macro_export] macro_rules! retry_async { ($retry: expr, $code: tt) => {{ + #[allow(unused)] use $crate::retry::RetryableError; let mut attempts = 0; loop { @@ -225,6 +240,19 @@ macro_rules! retry_async { }}; } +#[macro_export] +macro_rules! retryable { + ($error: ident) => {{ + #[allow(unused)] + use $crate::retry::RetryableError; + (&$error).is_retryable() + }}; + ($error: expr) => {{ + use $crate::retry::RetryableError; + (&$error).is_retryable() + }}; +} + // network errors should generally be retryable, unless there's a bug in our code impl RetryableError for xmtp_proto::api_client::Error { fn is_retryable(&self) -> bool { diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 6f8a9c349..ab4c0c0fb 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -32,7 +32,7 @@ use rand::RngCore; use xmtp_cryptography::utils as crypto_utils; use super::StorageError; -use crate::Store; +use crate::{retry, retry::Retry, Store}; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations/"); diff --git a/xmtp_mls/src/storage/errors.rs b/xmtp_mls/src/storage/errors.rs index fd5f1850d..d10d8ddb5 100644 --- a/xmtp_mls/src/storage/errors.rs +++ b/xmtp_mls/src/storage/errors.rs @@ -19,3 +19,15 @@ pub enum StorageError { #[error("not found")] NotFound, } + +impl crate::retry::RetryableError for StorageError { + fn is_retryable(&self) -> bool { + match self { + Self::DieselConnect(connection) => { + matches!(connection, diesel::ConnectionError::BadConnection(_)) + } + Self::Pool(_) => true, + _ => false, + } + } +} From 24d46dba07dfdf660db8349c8b09fcd1d1d72314 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Tue, 21 Nov 2023 15:28:49 -0500 Subject: [PATCH 07/10] add retryability to process_messages --- xmtp_mls/src/groups/mod.rs | 53 ++++++++------------- xmtp_mls/src/storage/encrypted_store/mod.rs | 2 +- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index e94b812fb..7242c337b 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -14,9 +14,9 @@ use openmls::{ prelude_test::KeyPackage, }; use openmls_traits::OpenMlsProvider; +use std::mem::discriminant; #[cfg(test)] use std::println as debug; -use std::{collections::VecDeque, mem::discriminant}; use thiserror::Error; use tls_codec::{Deserialize, Serialize}; use xmtp_proto::api_client::{Envelope, XmtpApiClient, XmtpMlsClient}; @@ -395,47 +395,36 @@ where fn consume_message( &self, - envelopes: &mut VecDeque, + envelope: &Envelope, openmls_group: &mut OpenMlsGroup, ) -> Result<(), MessageProcessingError> { - let envelope = envelopes.pop_front(); - if let Some(envelope) = envelope { - let result = self.client.process_for_topic( - &self.topic(), - envelope.timestamp_ns, - |provider| -> Result<(), MessageProcessingError> { - self.process_message(openmls_group, &provider, &envelope)?; - openmls_group.save(provider.key_store())?; - Ok(()) - }, - ); - if result.is_err() && retryable!(result.as_ref().unwrap_err()) { - envelopes.push_front(envelope); - return result; - } - } - + self.client.process_for_topic( + &self.topic(), + envelope.timestamp_ns, + |provider| -> Result<(), MessageProcessingError> { + self.process_message(openmls_group, &provider, &envelope)?; + openmls_group.save(provider.key_store())?; + Ok(()) + }, + )?; Ok(()) } pub fn process_messages(&self, envelopes: Vec) -> Result<(), GroupError> { - let mut receive_errors = vec![]; - let mut envelopes = VecDeque::from(envelopes); - let mut conn = self.client.store.conn()?; let provider = self.client.mls_provider(&mut conn); let mut openmls_group = self.load_mls_group(&provider)?; - loop { - let result = retry!( - Retry::default(), - (|| self.consume_message(&mut envelopes, &mut openmls_group)) - ); - - if envelopes.is_empty() { - break; - } - } + let receive_errors: Vec = envelopes + .into_iter() + .map(|envelope| -> Result<(), MessageProcessingError> { + retry!( + Retry::default(), + (|| self.consume_message(&envelope, &mut openmls_group)) + ) + }) + .filter_map(Result::err) + .collect(); if receive_errors.is_empty() { Ok(()) diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index ab4c0c0fb..6f8a9c349 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -32,7 +32,7 @@ use rand::RngCore; use xmtp_cryptography::utils as crypto_utils; use super::StorageError; -use crate::{retry, retry::Retry, Store}; +use crate::Store; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations/"); From 9f2278c3f9c695e122ce9c0d4dd5870703f6c353 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Tue, 21 Nov 2023 15:37:32 -0500 Subject: [PATCH 08/10] clippy --- xmtp_mls/src/groups/mod.rs | 2 +- xmtp_mls/src/retry.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 7242c337b..1861ecc4f 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -402,7 +402,7 @@ where &self.topic(), envelope.timestamp_ns, |provider| -> Result<(), MessageProcessingError> { - self.process_message(openmls_group, &provider, &envelope)?; + self.process_message(openmls_group, &provider, envelope)?; openmls_group.save(provider.key_store())?; Ok(()) }, diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index ae78a5baa..237c37fcf 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -155,6 +155,7 @@ macro_rules! retry { use $crate::retry::RetryableError; let mut attempts = 0; loop { + #[allow(clippy::redundant_closure_call)] match $code() { Ok(v) => break Ok(v), Err(e) => { @@ -224,6 +225,7 @@ macro_rules! retry_async { use $crate::retry::RetryableError; let mut attempts = 0; loop { + #[allow(clippy::redundant_closure_call)] match $code().await { Ok(v) => break Ok(v), Err(e) => { From 47333015386d334212259dfec06fb1dde54f2737 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 22 Nov 2023 13:01:19 -0500 Subject: [PATCH 09/10] wrap api client methods in retries --- xmtp_mls/src/api_client_wrapper.rs | 185 +++++++++++++++------- xmtp_mls/src/client.rs | 3 +- xmtp_mls/src/groups/mod.rs | 15 +- xmtp_mls/src/retry.rs | 2 +- xmtp_proto/src/gen/xmtp.message_api.v1.rs | 61 ++++--- 5 files changed, 168 insertions(+), 98 deletions(-) diff --git a/xmtp_mls/src/api_client_wrapper.rs b/xmtp_mls/src/api_client_wrapper.rs index 4e876b7dc..5b798f68f 100644 --- a/xmtp_mls/src/api_client_wrapper.rs +++ b/xmtp_mls/src/api_client_wrapper.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use crate::{retry::Retry, retry_async}; use xmtp_proto::{ api_client::{ Envelope, Error as ApiError, ErrorKind, PagingInfo, QueryRequest, XmtpApiClient, @@ -26,14 +27,18 @@ use xmtp_proto::{ #[derive(Debug)] pub struct ApiClientWrapper { api_client: ApiClient, + retry_strategy: Retry, } impl ApiClientWrapper where ApiClient: XmtpMlsClient + XmtpApiClient, { - pub fn new(api_client: ApiClient) -> Self { - Self { api_client } + pub fn new(api_client: ApiClient, retry_strategy: Retry) -> Self { + Self { + api_client, + retry_strategy, + } } pub async fn read_topic( @@ -45,19 +50,23 @@ where let mut out: Vec = vec![]; let page_size = 100; loop { - let mut result = self - .api_client - .query(QueryRequest { - content_topics: vec![topic.to_string()], - start_time_ns, - end_time_ns: 0, - paging_info: Some(PagingInfo { - cursor, - limit: page_size, - direction: SortDirection::Ascending as i32, - }), + let mut result = retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .query(QueryRequest { + content_topics: vec![topic.to_string()], + start_time_ns, + end_time_ns: 0, + paging_info: Some(PagingInfo { + cursor: cursor.clone(), + limit: page_size, + direction: SortDirection::Ascending as i32, + }), + }) + .await }) - .await?; + )?; let num_envelopes = result.envelopes.len(); out.append(&mut result.envelopes); @@ -80,29 +89,39 @@ where &self, last_resort_key_package: Vec, ) -> Result, ApiError> { - let res = self - .api_client - .register_installation(RegisterInstallationRequest { - last_resort_key_package: Some(KeyPackageUpload { - key_package_tls_serialized: last_resort_key_package.to_vec(), - }), + let res = retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .register_installation(RegisterInstallationRequest { + last_resort_key_package: Some(KeyPackageUpload { + key_package_tls_serialized: last_resort_key_package.to_vec(), + }), + }) + .await }) - .await?; + )?; Ok(res.installation_id) } pub async fn upload_key_packages(&self, key_packages: Vec>) -> Result<(), ApiError> { - self.api_client - .upload_key_packages(UploadKeyPackagesRequest { - key_packages: key_packages - .into_iter() - .map(|kp| KeyPackageUpload { - key_package_tls_serialized: kp, + retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .upload_key_packages(UploadKeyPackagesRequest { + key_packages: key_packages + .clone() + .into_iter() + .map(|kp| KeyPackageUpload { + key_package_tls_serialized: kp, + }) + .collect(), }) - .collect(), + .await }) - .await?; + )?; Ok(()) } @@ -111,12 +130,16 @@ where &self, installation_ids: Vec>, ) -> Result { - let res = self - .api_client - .consume_key_packages(ConsumeKeyPackagesRequest { - installation_ids: installation_ids.clone(), + let res = retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .consume_key_packages(ConsumeKeyPackagesRequest { + installation_ids: installation_ids.clone(), + }) + .await }) - .await?; + )?; if res.key_packages.len() != installation_ids.len() { println!("mismatched number of results"); @@ -154,11 +177,16 @@ where }) .collect(); - self.api_client - .publish_welcomes(PublishWelcomesRequest { - welcome_messages: welcome_requests, + retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .publish_welcomes(PublishWelcomesRequest { + welcome_messages: welcome_requests.clone(), + }) + .await }) - .await?; + )?; Ok(()) } @@ -168,13 +196,17 @@ where start_time_ns: u64, wallet_addresses: Vec, ) -> Result { - let result = self - .api_client - .get_identity_updates(GetIdentityUpdatesRequest { - start_time_ns, - wallet_addresses: wallet_addresses.clone(), + let result = retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .get_identity_updates(GetIdentityUpdatesRequest { + start_time_ns, + wallet_addresses: wallet_addresses.clone(), + }) + .await }) - .await?; + )?; if result.updates.len() != wallet_addresses.len() { println!("mismatched number of results"); @@ -228,9 +260,16 @@ where }) .collect(); - self.api_client - .publish_to_group(PublishToGroupRequest { messages: to_send }) - .await?; + retry_async!( + self.retry_strategy, + (|| async { + self.api_client + .publish_to_group(PublishToGroupRequest { + messages: to_send.clone(), + }) + .await + }) + )?; Ok(()) } @@ -271,7 +310,9 @@ mod tests { use async_trait::async_trait; use mockall::mock; use xmtp_proto::{ - api_client::{Error, PagingInfo, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient}, + api_client::{ + Error, ErrorKind, PagingInfo, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient, + }, xmtp::message_api::{ v1::{ cursor::Cursor as InnerCursor, BatchQueryRequest, BatchQueryResponse, Cursor, @@ -292,6 +333,7 @@ mod tests { }; use super::ApiClientWrapper; + use crate::retry::Retry; fn build_envelopes(num_envelopes: usize, topic: &str) -> Vec { let mut out: Vec = vec![]; @@ -370,7 +412,7 @@ mod tests { installation_id: vec![1, 2, 3], }) }); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper.register_installation(vec![2, 3, 4]).await.unwrap(); assert_eq!(result, vec![1, 2, 3]); } @@ -389,7 +431,7 @@ mod tests { .eq(&key_package) }) .returning(move |_| Ok(())); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper.upload_key_packages(vec![key_package_clone]).await; assert!(result.is_ok()); } @@ -410,7 +452,7 @@ mod tests { ], }) }); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper .consume_key_packages(installation_ids.clone()) .await @@ -469,7 +511,7 @@ mod tests { }) }); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper .get_identity_updates(start_time_ns, wallet_addresses_clone.clone()) .await @@ -520,7 +562,7 @@ mod tests { }) }); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper.read_topic(topic, start_time_ns).await.unwrap(); assert_eq!(result.len(), 10); @@ -545,7 +587,7 @@ mod tests { }) }); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper.read_topic(topic, start_time_ns).await.unwrap(); assert_eq!(result.len(), 100); @@ -600,9 +642,42 @@ mod tests { }) }); - let wrapper = ApiClientWrapper::new(mock_api); + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); let result = wrapper.read_topic(topic, start_time_ns).await.unwrap(); assert_eq!(result.len(), 200); } + + #[tokio::test] + async fn it_retries_twice() { + crate::tests::setup(); + + let mut mock_api = MockApiClient::new(); + let topic = "topic"; + let start_time_ns = 10; + + mock_api + .expect_query() + .times(1) + .returning(move |_| Err(Error::new(ErrorKind::QueryError))); + mock_api + .expect_query() + .times(1) + .returning(move |_| Err(Error::new(ErrorKind::QueryError))); + mock_api.expect_query().times(1).returning(move |_| { + Ok(QueryResponse { + paging_info: Some(PagingInfo { + cursor: None, + limit: 100, + direction: 0, + }), + envelopes: build_envelopes(10, topic), + }) + }); + + let wrapper = ApiClientWrapper::new(mock_api, Retry::default()); + + let result = wrapper.read_topic(topic, start_time_ns).await.unwrap(); + assert_eq!(result.len(), 10); + } } diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 78ca8511e..93b30fa94 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -15,6 +15,7 @@ use crate::{ api_client_wrapper::{ApiClientWrapper, IdentityUpdate}, groups::{IntentError, MlsGroup}, identity::Identity, + retry::Retry, storage::{ group::{GroupMembershipState, StoredGroup}, DbConnection, EncryptedMessageStore, StorageError, @@ -130,7 +131,7 @@ where store: EncryptedMessageStore, ) -> Self { Self { - api_client: ApiClientWrapper::new(api_client), + api_client: ApiClientWrapper::new(api_client, Retry::default()), _network: network, identity, store, diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 1861ecc4f..6b9cb28a0 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -30,7 +30,7 @@ use crate::{ identity::Identity, retry, retry::Retry, - retry_async, retryable, + retryable, storage::{ group::{GroupMembershipState, StoredGroup}, group_intent::{IntentKind, IntentState, NewGroupIntent, StoredGroupIntent}, @@ -552,15 +552,10 @@ where let (payload, post_commit_data) = result.expect("result already checked"); let payload_slice = payload.as_slice(); - retry_async!( - Retry::default(), - (|| async { - self.client - .api_client - .publish_to_group(vec![payload_slice]) - .await - }) - )?; + self.client + .api_client + .publish_to_group(vec![payload_slice]) + .await?; { EncryptedMessageStore::set_group_intent_published( diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index 237c37fcf..c8c34a886 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -32,7 +32,7 @@ pub trait RetryableError: std::error::Error { impl RetryableError for &T where T: std::error::Error {} /// Options to specify how to retry a function -#[derive(SmartDefault, PartialEq, Eq, Copy, Clone)] +#[derive(SmartDefault, Debug, PartialEq, Eq, Copy, Clone)] pub struct Retry { #[default = 3] retries: usize, diff --git a/xmtp_proto/src/gen/xmtp.message_api.v1.rs b/xmtp_proto/src/gen/xmtp.message_api.v1.rs index 4cdbcba7a..aede8f493 100644 --- a/xmtp_proto/src/gen/xmtp.message_api.v1.rs +++ b/xmtp_proto/src/gen/xmtp.message_api.v1.rs @@ -5,13 +5,13 @@ #[derive(Clone, PartialEq, ::prost::Message)] pub struct Token { /// identity key signed by a wallet - #[prost(message, optional, tag="1")] + #[prost(message, optional, tag = "1")] pub identity_key: ::core::option::Option, /// encoded bytes of AuthData - #[prost(bytes="vec", tag="2")] + #[prost(bytes = "vec", tag = "2")] pub auth_data_bytes: ::prost::alloc::vec::Vec, /// identity key signature of AuthData bytes - #[prost(message, optional, tag="3")] + #[prost(message, optional, tag = "3")] pub auth_data_signature: ::core::option::Option, } /// AuthData carries token parameters that are authenticated @@ -23,10 +23,10 @@ pub struct Token { #[derive(Clone, PartialEq, ::prost::Message)] pub struct AuthData { /// address of the wallet - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub wallet_addr: ::prost::alloc::string::String, /// time when the token was generated/signed - #[prost(uint64, tag="2")] + #[prost(uint64, tag = "2")] pub created_ns: u64, } /// This is based off of the go-waku Index type, but with the @@ -35,9 +35,9 @@ pub struct AuthData { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct IndexCursor { - #[prost(bytes="vec", tag="1")] + #[prost(bytes = "vec", tag = "1")] pub digest: ::prost::alloc::vec::Vec, - #[prost(uint64, tag="2")] + #[prost(uint64, tag = "2")] pub sender_time_ns: u64, } /// Wrapper for potentially multiple types of cursor @@ -47,7 +47,7 @@ pub struct Cursor { /// Making the cursor a one-of type, as I would like to change the way we /// handle pagination to use a precomputed sort field. /// This way we can handle both methods - #[prost(oneof="cursor::Cursor", tags="1")] + #[prost(oneof = "cursor::Cursor", tags = "1")] pub cursor: ::core::option::Option, } /// Nested message and enum types in `Cursor`. @@ -56,9 +56,9 @@ pub mod cursor { /// handle pagination to use a precomputed sort field. /// This way we can handle both methods #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Cursor { - #[prost(message, tag="1")] + #[prost(message, tag = "1")] Index(super::IndexCursor), } } @@ -68,11 +68,11 @@ pub mod cursor { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PagingInfo { /// Note: this is a uint32, while go-waku's pageSize is a uint64 - #[prost(uint32, tag="1")] + #[prost(uint32, tag = "1")] pub limit: u32, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub cursor: ::core::option::Option, - #[prost(enumeration="SortDirection", tag="3")] + #[prost(enumeration = "SortDirection", tag = "3")] pub direction: i32, } /// Envelope encapsulates a message while in transit. @@ -82,74 +82,72 @@ pub struct Envelope { /// The topic the message belongs to, /// If the message includes the topic as well /// it MUST be the same as the topic in the envelope. - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub content_topic: ::prost::alloc::string::String, /// Message creation timestamp /// If the message includes the timestamp as well /// it MUST be equivalent to the timestamp in the envelope. - #[prost(uint64, tag="2")] + #[prost(uint64, tag = "2")] pub timestamp_ns: u64, - #[prost(bytes="vec", tag="3")] + #[prost(bytes = "vec", tag = "3")] pub message: ::prost::alloc::vec::Vec, } /// Publish #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PublishRequest { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub envelopes: ::prost::alloc::vec::Vec, } /// Empty message as a response for Publish #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PublishResponse { -} +pub struct PublishResponse {} /// Subscribe #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SubscribeRequest { - #[prost(string, repeated, tag="1")] + #[prost(string, repeated, tag = "1")] pub content_topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } /// SubscribeAll #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct SubscribeAllRequest { -} +pub struct SubscribeAllRequest {} /// Query #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryRequest { - #[prost(string, repeated, tag="1")] + #[prost(string, repeated, tag = "1")] pub content_topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(uint64, tag="2")] + #[prost(uint64, tag = "2")] pub start_time_ns: u64, - #[prost(uint64, tag="3")] + #[prost(uint64, tag = "3")] pub end_time_ns: u64, - #[prost(message, optional, tag="4")] + #[prost(message, optional, tag = "4")] pub paging_info: ::core::option::Option, } /// The response, containing envelopes, for a query #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryResponse { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub envelopes: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub paging_info: ::core::option::Option, } /// BatchQuery #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct BatchQueryRequest { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub requests: ::prost::alloc::vec::Vec, } /// Response containing a list of QueryResponse messages #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct BatchQueryResponse { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub responses: ::prost::alloc::vec::Vec, } /// Sort direction @@ -664,4 +662,5 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = &[ ]; include!("xmtp.message_api.v1.serde.rs"); include!("xmtp.message_api.v1.tonic.rs"); -// @@protoc_insertion_point(module) \ No newline at end of file +// @@protoc_insertion_point(module) + From 827fa5feecb569244ed11e345563458416519c87 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 22 Nov 2023 14:03:08 -0500 Subject: [PATCH 10/10] impl RetryableError on mls errors --- xmtp_mls/src/api_client_wrapper.rs | 2 +- xmtp_mls/src/groups/mod.rs | 10 +++-- xmtp_mls/src/storage/errors.rs | 70 +++++++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 5 deletions(-) diff --git a/xmtp_mls/src/api_client_wrapper.rs b/xmtp_mls/src/api_client_wrapper.rs index 5b798f68f..54c1e2250 100644 --- a/xmtp_mls/src/api_client_wrapper.rs +++ b/xmtp_mls/src/api_client_wrapper.rs @@ -649,7 +649,7 @@ mod tests { } #[tokio::test] - async fn it_retries_twice() { + async fn it_retries_twice_then_succeeds() { crate::tests::setup(); let mut mock_api = MockApiClient::new(); diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 6b9cb28a0..9133f08d3 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -29,7 +29,7 @@ use crate::{ configuration::CIPHERSUITE, identity::Identity, retry, - retry::Retry, + retry::{Retry, RetryableError}, retryable, storage::{ group::{GroupMembershipState, StoredGroup}, @@ -78,11 +78,15 @@ pub enum GroupError { Diesel(#[from] diesel::result::Error), } -impl crate::retry::RetryableError for GroupError { +impl RetryableError for GroupError { fn is_retryable(&self) -> bool { match self { - Self::Diesel(_) => true, Self::ReceiveError(msg) => retryable!(msg), + Self::AddMembers(members) => retryable!(members), + Self::RemoveMembers(members) => retryable!(members), + Self::GroupCreate(group) => retryable!(group), + Self::SelfUpdate(update) => retryable!(update), + Self::WelcomeError(welcome) => retryable!(welcome), _ => false, } } diff --git a/xmtp_mls/src/storage/errors.rs b/xmtp_mls/src/storage/errors.rs index d10d8ddb5..913dafbe1 100644 --- a/xmtp_mls/src/storage/errors.rs +++ b/xmtp_mls/src/storage/errors.rs @@ -1,5 +1,7 @@ use thiserror::Error; +use crate::{retry::RetryableError, retryable}; + #[derive(Debug, Error, PartialEq)] pub enum StorageError { #[error("Diesel connection error")] @@ -20,7 +22,7 @@ pub enum StorageError { NotFound, } -impl crate::retry::RetryableError for StorageError { +impl RetryableError for StorageError { fn is_retryable(&self) -> bool { match self { Self::DieselConnect(connection) => { @@ -31,3 +33,69 @@ impl crate::retry::RetryableError for StorageError { } } } + +// OpenMLS KeyStore errors +impl RetryableError for openmls::group::AddMembersError { + fn is_retryable(&self) -> bool { + match self { + Self::CreateCommitError(commit) => retryable!(commit), + _ => false, + } + } +} + +impl RetryableError for openmls::group::CreateCommitError { + fn is_retryable(&self) -> bool { + match self { + Self::KeyStoreError(storage) => retryable!(storage), + Self::KeyPackageGenerationError(generation) => retryable!(generation), + _ => false, + } + } +} + +impl RetryableError for openmls::key_packages::errors::KeyPackageNewError { + fn is_retryable(&self) -> bool { + match self { + Self::KeyStoreError(storage) => retryable!(storage), + _ => false, + } + } +} + +impl RetryableError for openmls::group::RemoveMembersError { + fn is_retryable(&self) -> bool { + match self { + Self::CreateCommitError(commit) => retryable!(commit), + _ => false, + } + } +} + +impl RetryableError for openmls::group::NewGroupError { + fn is_retryable(&self) -> bool { + match self { + Self::KeyStoreError(storage) => retryable!(storage), + _ => false, + } + } +} + +impl RetryableError for openmls::group::SelfUpdateError { + fn is_retryable(&self) -> bool { + match self { + Self::CreateCommitError(commit) => retryable!(commit), + Self::KeyStoreError => true, + _ => false, + } + } +} + +impl RetryableError for openmls::group::WelcomeError { + fn is_retryable(&self) -> bool { + match self { + Self::KeyStoreError(storage) => retryable!(storage), + _ => false, + } + } +}