-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move staged commit to intents #984
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
805f3f5
to
51dbd6e
Compare
492e673
to
58bbcd2
Compare
2c6cb9e
to
1e535db
Compare
@@ -1,4 +1,4 @@ | |||
#!/bin/bash | |||
set -eou pipefail | |||
|
|||
docker-compose -f dev/docker/docker-compose.yml -p "libxmtp" "$@" | |||
docker compose -f dev/docker/docker-compose.yml -p "libxmtp" "$@" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GH action runner has been upgraded and no longer has a separate docker-compose
command. It's now a subcommand of docker
only.
1e535db
to
5d277d6
Compare
5d277d6
to
c9f0f10
Compare
@@ -455,11 +464,6 @@ impl MlsGroup { | |||
Self::into_envelope(message, now) | |||
}); | |||
|
|||
// Skipping a full sync here and instead just firing and forgetting | |||
if let Err(err) = self.publish_intents(&provider, client).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets handled inside sync_until_intent_resolved
. No need to call it twice
@@ -479,7 +483,6 @@ impl MlsGroup { | |||
let update_interval = Some(5_000_000); | |||
self.maybe_update_installations(&provider, update_interval, client) | |||
.await?; | |||
self.publish_intents(&provider, client).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
@@ -66,7 +66,8 @@ impl MlsGroup { | |||
); | |||
|
|||
if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() { | |||
self.sync(&client).await?; | |||
self.sync_with_conn(&client.mls_provider()?, &client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change alone is going to resolve a lot of thrash and noise. Previously we were calling sync when we receive a commit from a stream. That would attempt to add missing members first.
Every member of the group who was online would try and do it at the same time, leading to a bunch of commits that would get thrown out.
We don't need to do this as part of stream processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. I'm thinking that another optimization would be to update sync
so that if we sync during the maybe_update_installations
process (add_missing_installations
=> sync_until_intent_resolved
), we don't need to do a second sync_with_conn
right afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another potential optimization - the main reason we need to call sync, rather than directly processing a streamed commit, is because streams are not guaranteed to deliver the commits in order, while syncs are.
If we can guarantee stream ordering from the server (the same way we're doing it for replication):
- We don't need to call sync while streaming
- We may not need to call sync while publishing, especially not repeated syncs (can handle it via stream instead, potentially the same long-running stream can handle all commits)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are critical reliability improvements, well done. Left a few questions about things I'm unsure about, but I could be missing details
@@ -0,0 +1,7 @@ | |||
-- Your SQL goes here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this comment, here and above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call
@@ -66,7 +66,8 @@ impl MlsGroup { | |||
); | |||
|
|||
if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() { | |||
self.sync(&client).await?; | |||
self.sync_with_conn(&client.mls_provider()?, &client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another potential optimization - the main reason we need to call sync, rather than directly processing a streamed commit, is because streams are not guaranteed to deliver the commits in order, while syncs are.
If we can guarantee stream ordering from the server (the same way we're doing it for replication):
- We don't need to call sync while streaming
- We may not need to call sync while publishing, especially not repeated syncs (can handle it via stream instead, potentially the same long-running stream can handle all commits)
// TODO: remove clone | ||
if let Some(commit) = openmls_group.clone().pending_commit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we can use the commit as a reference rather than cloning the group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compiler didn't like that because openmls_group
is a mutable reference. I'm sure there's some hack to make it work.
provider.conn_ref().set_group_intent_published( | ||
intent.id, | ||
sha256(payload_slice), | ||
post_commit_action, | ||
staged_commit, | ||
openmls_group.epoch().as_u64() as i64, | ||
)?; | ||
log::debug!( | ||
"client [{}] set stored intent [{}] to state `published`", | ||
client.inbox_id(), | ||
intent.id | ||
); | ||
|
||
client | ||
.api_client | ||
.send_group_messages(vec![payload_slice]) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're marking the intent as published before sending it, if the publish fails, either:
- Network error returned from
send_group_messages
- App dies before the request goes through
Do we have any way of making sure the intent gets re-sent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of a network error we bubble the error back up to the caller and they can choose to retry.
In the case of the app dying, that's the big trade-off of this approach. It's just permanently in limbo. The difference from some of the alternative solutions is that it won't brick the group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, we are not bubbling it up in the case of send_message<ApiClient>()
. And in the case of send_message_optimistic
, the message would have already been rendered to the UI, and the error from the publish may or may not be bubbled up to the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, we are not bubbling it up in the case of send_message().
We would be because sync_until_last_intent_resolved
will never finish
if has_staged_commit { | ||
log::info!("Canceling all further publishes, since a commit was found"); | ||
return Err(GroupError::PublishCancelled); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works if this method (publish_intents) is only called once, but won't further publishes still happen if publish_intents
is called again? Then we would skip past this published intent because it is no longer in TO_PUBLISH
state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, are we sure we want to return an error here, rather than returning success? Won't an error here stop us from calling receive()
within sync_with_conn()
, and finishing off the intent lifecycle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We continue through publish_intents errors in sync_with_conn
. In the old method we would get them frequently if you had two intents lined up that both created commits, because they would trigger PendingCommit
errors.
I did actually make that change in one of the down-stack PRs just to clean up error handling.
tl;dr
staged_commits
from being handled by OpenMLS to being stored on each intentpublish_intents
and then immediately callingsync
. That should not be necessary, since we callpublish_intents
from inside the sync methodMore Info
https://github.com/xmtp/libxmtp/issues/979