From 34f90be07af9a5889fcc0ddb41f9d91c954102c8 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Tue, 29 Oct 2024 15:11:06 -0400 Subject: [PATCH] pass v2 errors into streams (#1198) --- bindings_ffi/src/v2.rs | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/bindings_ffi/src/v2.rs b/bindings_ffi/src/v2.rs index f98a9a803..57bb20886 100644 --- a/bindings_ffi/src/v2.rs +++ b/bindings_ffi/src/v2.rs @@ -283,9 +283,10 @@ impl From for SubscribeRequest { } } -#[uniffi::export(callback_interface)] +#[uniffi::export(with_foreign)] pub trait FfiV2SubscriptionCallback: Send + Sync { fn on_message(&self, message: FfiEnvelope); + fn on_error(&self, error: GenericError); } /// Subscription to a stream of V2 Messages @@ -342,7 +343,7 @@ impl FfiV2Subscription { impl FfiV2Subscription { async fn subscribe( mut subscription: GrpcMutableSubscription, - callback: Box, + callback: Arc, ) -> Self { let (tx, mut rx): (_, mpsc::Receiver) = mpsc::channel(10); @@ -352,16 +353,18 @@ impl FfiV2Subscription { item = subscription.next() => { match item { Some(Ok(envelope)) => callback.on_message(envelope.into()), - Some(Err(e)) => log::error!("Stream error {}", e), + Some(Err(e)) => callback.on_error(e.into()), None => { - log::debug!("stream closed"); + log::debug!("stream ended"); break; } } }, update = rx.recv() => { if let Some(update) = update { - let _ = subscription.update(update.into()).await.map_err(|e| log::error!("{}", e)).ok(); + if let Err(e) = subscription.update(update.into()).await { + callback.on_error(e.into()); + } } }, } @@ -420,7 +423,7 @@ impl FfiV2ApiClient { pub async fn subscribe( &self, request: FfiV2SubscribeRequest, - callback: Box, + callback: Arc, ) -> Result { let subscription = self.inner_client.subscribe2(request.into()).await?; Ok(FfiV2Subscription::subscribe(subscription, callback).await) @@ -561,8 +564,12 @@ mod tests { use futures::stream; use xmtp_proto::api_client::{Envelope, Error as ApiError}; - use crate::v2::{ - create_v2_client, FfiEnvelope, FfiPublishRequest, FfiV2SubscribeRequest, FfiV2Subscription, + use crate::{ + v2::{ + create_v2_client, FfiEnvelope, FfiPublishRequest, FfiV2SubscribeRequest, + FfiV2Subscription, + }, + GenericError, }; use super::FfiV2SubscriptionCallback; @@ -581,6 +588,10 @@ mod tests { messages.push(message); self.notify.notify_one(); } + + fn on_error(&self, error: GenericError) { + log::error!("{}", error); + } } // Try a query on a test topic, and make sure we get a response @@ -617,7 +628,7 @@ mod tests { let local_data = callback.clone(); FfiV2Subscription::subscribe( xmtp_api_grpc::grpc_api_helper::GrpcMutableSubscription::new(Box::pin(stream), tx), - Box::new(callback), + Arc::new(callback), ) .await; @@ -653,7 +664,7 @@ mod tests { let local_data = callback.clone(); let sub = FfiV2Subscription::subscribe( xmtp_api_grpc::grpc_api_helper::GrpcMutableSubscription::new(Box::pin(stream), tx), - Box::new(callback), + Arc::new(callback), ) .await; @@ -684,7 +695,7 @@ mod tests { FfiV2SubscribeRequest { content_topics: vec![content_topic.to_string()], }, - Box::new(callback), + Arc::new(callback), ) .await .unwrap(); @@ -732,7 +743,7 @@ mod tests { FfiV2SubscribeRequest { content_topics: vec![content_topic.to_string()], }, - Box::new(callback), + Arc::new(callback), ) .await .unwrap();