Skip to content

Commit

Permalink
Set cursors to 0 when streaming (#515)
Browse files Browse the repository at this point in the history
We need to land xmtp/xmtp-node-go#351 in order to implement `streamAllMessages()` without gaps. However once we have actual cursor support on the server, we may introduce bugs in streaming of welcomes and messages within a single group, because the client doesn't have a consistent strategy for setting the cursor for streams yet.

The easiest way to avoid regressions/bugs is to keep the existing behavior identical - we can do this by setting the cursor to 0, which effectively disables the sync from DB on the server. The existing streaming methods will just stream whatever is newly coming in from the Waku relay.

## What I think should be done in the long-term

In a nutshell, we are trying to synchronize three views of the data -

The node/backend database -> libxmtp database -> integrator's in-memory view of the data

In order to avoid gaps when requesting new data, each layer needs to pass a cursor down to the next layer representing the data they currently have. We are already doing this for the pull-based methods, with the integrator's `FfiGroup.find_messages()` and libxmtp's `MlsGroup.sync()`, which both allow you to specify where you want to pull from. We want to do this for the push-based streaming methods too.

1) Allow the *integrator* to pass down the initial cursor they want to stream from. This forces the integrator to avoid gaps in their data. Libxmtp can then pass this cursor along to the node backend. If the cursor is *greater* than the last message in the libxmtp database, libxmtp can additionally call `sync()` first before starting the streaming in order to ensure libxmtp's DB has no gaps.

2) For situations where libxmtp wants to close/re-open streams under the hood, libxmtp should hold an in-memory cursor. Libxmtp should save the in-memory cursor for the stream being closed, then pass the same cursor to the new stream being opened.

Feedback on this plan welcome!
  • Loading branch information
richardhuaaa authored Feb 15, 2024
1 parent 20b824e commit 9114dd7
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 12 deletions.
2 changes: 1 addition & 1 deletion xmtp_cryptography/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ pub mod tests {
}

fn toggle(index: usize, v: &mut [u8]) {
v[index] += 1;
v[index] ^= 1;
}

#[tokio::test]
Expand Down
5 changes: 1 addition & 4 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,7 @@ where
&'a self,
) -> Result<Pin<Box<dyn Stream<Item = MlsGroup<ApiClient>> + Send + 'a>>, ClientError> {
let installation_key = self.installation_public_key();
let id_cursor = self
.store
.conn()?
.get_last_cursor_for_id(&installation_key, EntityKind::Welcome)?;
let id_cursor = 0;

let subscription = self
.api_client
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use log::info;
use prost::Message;

use xmtp_cryptography::signature::RecoverableSignature;
Expand Down
8 changes: 2 additions & 6 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use xmtp_proto::{api_client::XmtpMlsClient, xmtp::mls::api::v1::GroupMessage};

use super::{extract_message_v1, GroupError, MlsGroup};
use crate::api_client_wrapper::GroupFilter;
use crate::storage::{group_message::StoredGroupMessage, refresh_state::EntityKind};
use crate::storage::group_message::StoredGroupMessage;

impl<'c, ApiClient> MlsGroup<'c, ApiClient>
where
Expand Down Expand Up @@ -44,11 +44,7 @@ where
pub async fn stream(
&'c self,
) -> Result<Pin<Box<dyn Stream<Item = StoredGroupMessage> + 'c + Send>>, GroupError> {
let last_cursor = self
.client
.store
.conn()?
.get_last_cursor_for_id(self.group_id.clone(), EntityKind::Group)?;
let last_cursor = 0;

let subscription = self
.client
Expand Down

0 comments on commit 9114dd7

Please sign in to comment.