Skip to content

Commit

Permalink
fix, not needed to wait on new groups
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jun 26, 2024
1 parent d55304e commit a56a5cd
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 31 deletions.
12 changes: 2 additions & 10 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,6 @@ mod tests {

alix_group.send("first".as_bytes().to_vec()).await.unwrap();

println!("Waiting on first");
stream_callback.notify.notified().await;

let bo_group = bo
Expand All @@ -1676,22 +1675,15 @@ mod tests {
)
.await
.unwrap();

println!("Waiting on second");
stream_callback.notify.notified().await;

let _ = caro.inner_client.sync_welcomes().await.unwrap();

bo_group.send("second".as_bytes().to_vec()).await.unwrap();

println!("Waiting on third");
stream_callback.notify.notified().await;

alix_group.send("third".as_bytes().to_vec()).await.unwrap();

println!("Waiting on fourth");
stream_callback.notify.notified().await;

bo_group.send("fourth".as_bytes().to_vec()).await.unwrap();
println!("Waiting on fifth");
stream_callback.notify.notified().await;

assert_eq!(stream_callback.message_count(), 4);
Expand Down
28 changes: 7 additions & 21 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where
match res.await {
Ok(group) => Some(group),
Err(err) => {
log::error!("Error processing stream entry: {:?}", err);
log::error!("Error processing stream entry for conversation: {:?}", err);
None
}
}
Expand Down Expand Up @@ -214,7 +214,6 @@ where
let (tx, rx) = mpsc::unbounded_channel();

client.sync_welcomes().await?;
log::debug!("Synced Welcomes!!!");

let current_groups = client.store().conn()?.find_groups(None, None, None, None)?;

Expand All @@ -231,16 +230,13 @@ where
)
})
.collect();
log::info!("Groups len: {:?}", group_id_to_info.len());

tokio::spawn(async move {
let client = client.clone();
let mut handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone());

let client_pointer = client.clone();
let mut convo_stream = Self::stream_conversations(&client_pointer).await?;
let mut convo_stream = Self::stream_conversations(&client).await?;

loop {
log::debug!("Selecting ....");
// TODO:insipx We should more closely investigate whether
// the stream mapping in `stream_conversations` is cancellation safe
// otherwise it could lead to hard-to-find bugs
Expand All @@ -249,7 +245,7 @@ where
if group_id_to_info.contains_key(&new_group.group_id) {
continue;
}

//TODO: Should we await the handle to ensure it finishes?
handle.abort();
for info in group_id_to_info.values_mut() {
info.cursor = 0;
Expand All @@ -258,19 +254,17 @@ where
new_group.group_id,
MessagesStreamInfo {
convo_created_at_ns: new_group.created_at_ns,
cursor: 1,
cursor: 1, // For the new group, stream all messages since the group was created
},
);
handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone());
},
maybe_finished = &mut handle => {
match maybe_finished {
// if all is well it means the stream closed (our receiver is dropped
// or ended), our work is done
// if all is well it means the stream closed (receiver is dropped or ended)
Ok(_) => break,
Err(e) => {
// if we have an error, it probably means we need to try and
// restart the stream.
// if we have an error, try to restart the stream.
log::error!("{}", e.to_string());
handle = Self::relay_messages(client.clone(), tx.clone(), group_id_to_info.clone());
}
Expand All @@ -293,7 +287,6 @@ where
let (tx, rx) = tokio::sync::oneshot::channel();

let handle = tokio::spawn(async move {
log::debug!("Spawning one");
let mut stream = Self::stream_all_messages(client).await?;
let _ = tx.send(());
while let Some(message) = stream.next().await {
Expand All @@ -316,15 +309,9 @@ where
let mut stream = client.stream_messages(group_id_to_info).await?;
while let Some(message) = stream.next().await {
// an error can only mean the receiver has been dropped or closed
log::debug!(
"SENDING MESSAGE {}",
String::from_utf8_lossy(&message.decrypted_message_bytes)
);
if tx.send(message).is_err() {
log::debug!("CLOSING STREAM");
break;
}
log::debug!("Sent Message!");
}
Ok::<_, ClientError>(())
})
Expand Down Expand Up @@ -388,7 +375,6 @@ mod tests {
let messages_clone = messages.clone();

Client::<GrpcClient>::stream_all_messages_with_callback(Arc::new(caro), move |message| {
log::debug!("YOOO MESSAGES");
(*messages_clone.lock().unwrap()).push(message);
});

Expand Down

0 comments on commit a56a5cd

Please sign in to comment.