Skip to content

Commit

Permalink
pass v2 errors into streams (#1198)
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx authored Oct 29, 2024
1 parent 257063e commit 34f90be
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions bindings_ffi/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,10 @@ impl From<FfiV2SubscribeRequest> for SubscribeRequest {
}
}

#[uniffi::export(callback_interface)]
#[uniffi::export(with_foreign)]
pub trait FfiV2SubscriptionCallback: Send + Sync {
fn on_message(&self, message: FfiEnvelope);
fn on_error(&self, error: GenericError);
}

/// Subscription to a stream of V2 Messages
Expand Down Expand Up @@ -342,7 +343,7 @@ impl FfiV2Subscription {
impl FfiV2Subscription {
async fn subscribe(
mut subscription: GrpcMutableSubscription,
callback: Box<dyn FfiV2SubscriptionCallback>,
callback: Arc<dyn FfiV2SubscriptionCallback>,
) -> Self {
let (tx, mut rx): (_, mpsc::Receiver<FfiV2SubscribeRequest>) = mpsc::channel(10);

Expand All @@ -352,16 +353,18 @@ impl FfiV2Subscription {
item = subscription.next() => {
match item {
Some(Ok(envelope)) => callback.on_message(envelope.into()),
Some(Err(e)) => log::error!("Stream error {}", e),
Some(Err(e)) => callback.on_error(e.into()),
None => {
log::debug!("stream closed");
log::debug!("stream ended");
break;
}
}
},
update = rx.recv() => {
if let Some(update) = update {
let _ = subscription.update(update.into()).await.map_err(|e| log::error!("{}", e)).ok();
if let Err(e) = subscription.update(update.into()).await {
callback.on_error(e.into());
}
}
},
}
Expand Down Expand Up @@ -420,7 +423,7 @@ impl FfiV2ApiClient {
pub async fn subscribe(
&self,
request: FfiV2SubscribeRequest,
callback: Box<dyn FfiV2SubscriptionCallback>,
callback: Arc<dyn FfiV2SubscriptionCallback>,
) -> Result<FfiV2Subscription, GenericError> {
let subscription = self.inner_client.subscribe2(request.into()).await?;
Ok(FfiV2Subscription::subscribe(subscription, callback).await)
Expand Down Expand Up @@ -561,8 +564,12 @@ mod tests {
use futures::stream;
use xmtp_proto::api_client::{Envelope, Error as ApiError};

use crate::v2::{
create_v2_client, FfiEnvelope, FfiPublishRequest, FfiV2SubscribeRequest, FfiV2Subscription,
use crate::{
v2::{
create_v2_client, FfiEnvelope, FfiPublishRequest, FfiV2SubscribeRequest,
FfiV2Subscription,
},
GenericError,
};

use super::FfiV2SubscriptionCallback;
Expand All @@ -581,6 +588,10 @@ mod tests {
messages.push(message);
self.notify.notify_one();
}

fn on_error(&self, error: GenericError) {
log::error!("{}", error);
}
}

// Try a query on a test topic, and make sure we get a response
Expand Down Expand Up @@ -617,7 +628,7 @@ mod tests {
let local_data = callback.clone();
FfiV2Subscription::subscribe(
xmtp_api_grpc::grpc_api_helper::GrpcMutableSubscription::new(Box::pin(stream), tx),
Box::new(callback),
Arc::new(callback),
)
.await;

Expand Down Expand Up @@ -653,7 +664,7 @@ mod tests {
let local_data = callback.clone();
let sub = FfiV2Subscription::subscribe(
xmtp_api_grpc::grpc_api_helper::GrpcMutableSubscription::new(Box::pin(stream), tx),
Box::new(callback),
Arc::new(callback),
)
.await;

Expand Down Expand Up @@ -684,7 +695,7 @@ mod tests {
FfiV2SubscribeRequest {
content_topics: vec![content_topic.to_string()],
},
Box::new(callback),
Arc::new(callback),
)
.await
.unwrap();
Expand Down Expand Up @@ -732,7 +743,7 @@ mod tests {
FfiV2SubscribeRequest {
content_topics: vec![content_topic.to_string()],
},
Box::new(callback),
Arc::new(callback),
)
.await
.unwrap();
Expand Down

0 comments on commit 34f90be

Please sign in to comment.