Skip to content

Commit

Permalink
quick pass at stream_all_msg refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jun 11, 2024
1 parent 2715ea2 commit 70dc4ef
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ smart-default = "0.7.1"
thiserror = { workspace = true }
tls_codec = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = "0.1"
toml = "0.8.4"
xmtp_cryptography = { workspace = true }
xmtp_id = { path = "../xmtp_id" }
Expand Down
78 changes: 77 additions & 1 deletion xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ use std::{

use futures::{Stream, StreamExt};
use prost::Message;
use tokio::sync::oneshot::{self, Sender};
use tokio::{
sync::{
mpsc::{self, UnboundedSender},
oneshot::{self, Sender},
},
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use xmtp_proto::xmtp::mls::api::v1::WelcomeMessage;

use crate::{
Expand Down Expand Up @@ -242,6 +249,58 @@ where
})
}

pub async fn stream_all_messages(
client: Arc<Client<ApiClient>>,
) -> Result<impl Stream<Item = StoredGroupMessage>, ClientError> {
let mut handle;

let (tx, rx) = mpsc::unbounded_channel();

client.sync_welcomes().await?;

let current_groups = client.store().conn()?.find_groups(None, None, None, None)?;

let mut group_id_to_info: HashMap<Vec<u8>, MessagesStreamInfo> = current_groups
.into_iter()
.map(|group| {
(
group.id.clone(),
MessagesStreamInfo {
convo_created_at_ns: group.created_at_ns,
cursor: 0,
},
)
})
.collect();
handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone());

tokio::spawn(async move {
let client_pointer = client.clone();
let mut convo_stream = Self::stream_conversations(&client_pointer).await?;

Check failure on line 279 in xmtp_mls/src/subscriptions.rs

View workflow job for this annotation

GitHub Actions / workspace

the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `std::ops::FromResidual`)

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `std::ops::FromResidual`) --> xmtp_mls/src/subscriptions.rs:279:85 | 277 | tokio::spawn(async move { | ______________________- 278 | | let client_pointer = client.clone(); 279 | | let mut convo_stream = Self::stream_conversations(&client_pointer).await?; | | ^ cannot use the `?` operator in an async block that returns `()` 280 | | ... | 298 | | } 299 | | }); | |_________- this function should return `Result` or `Option` to accept `?` | = help: the trait `std::ops::FromResidual<std::result::Result<std::convert::Infallible, client::ClientError>>` is not implemented for `()`

Check failure on line 279 in xmtp_mls/src/subscriptions.rs

View workflow job for this annotation

GitHub Actions / workspace

the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `std::ops::FromResidual`)

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `std::ops::FromResidual`) --> xmtp_mls/src/subscriptions.rs:279:85 | 277 | tokio::spawn(async move { | ______________________- 278 | | let client_pointer = client.clone(); 279 | | let mut convo_stream = Self::stream_conversations(&client_pointer).await?; | | ^ cannot use the `?` operator in an async block that returns `()` 280 | | ... | 298 | | } 299 | | }); | |_________- this function should return `Result` or `Option` to accept `?` | = help: the trait `std::ops::FromResidual<std::result::Result<std::convert::Infallible, client::ClientError>>` is not implemented for `()`

while let Some(new_group) = convo_stream.next().await {
if group_id_to_info.contains_key(&new_group.group_id) {
continue;
}

handle.abort();
for info in group_id_to_info.values_mut() {
info.cursor = 0;
}
group_id_to_info.insert(
new_group.group_id,
MessagesStreamInfo {
convo_created_at_ns: new_group.created_at_ns,
cursor: 1,
},
);
handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone());
}
});

Ok(UnboundedReceiverStream::new(rx))
}

pub async fn stream_all_messages_with_callback(
client: Arc<Client<ApiClient>>,
callback: impl FnMut(StoredGroupMessage) + Send + Sync + 'static,
Expand Down Expand Up @@ -313,6 +372,23 @@ where

Ok(groups_stream_closer)
}

fn relay_messages(
client: Arc<Client<ApiClient>>,
tx: UnboundedSender<StoredGroupMessage>,
group_id_to_info: HashMap<Vec<u8>, MessagesStreamInfo>,
) -> JoinHandle<Result<(), ClientError>> {
tokio::spawn(async move {
let mut stream = client.stream_messages(group_id_to_info).await?;
while let Some(message) = stream.next().await {
// an error can only mean the receiver has been dropped or closed
if tx.send(message).is_err() {
break;
}
}
Ok::<_, ClientError>(())
})
}
}

#[cfg(test)]
Expand Down

0 comments on commit 70dc4ef

Please sign in to comment.