Skip to content

Commit

Permalink
reroll encrypted_room branch in incremental sync state
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Jan 31, 2025
1 parent a4b3972 commit c5926c9
Showing 1 changed file with 55 additions and 65 deletions.
120 changes: 55 additions & 65 deletions src/api/client/sync/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use axum::extract::State;
use conduwuit::{
at, err, error, extract_variant, is_equal_to,
at, err, error, extract_variant, is_equal_to, pair,
pdu::EventHash,
result::FlatOk,
utils::{
Expand All @@ -16,7 +16,7 @@ use conduwuit::{
stream::{BroadbandExt, Tools, WidebandExt},
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
},
Error, PduCount, PduEvent, Result,
PduCount, PduEvent, Result,
};
use conduwuit_service::{
rooms::{
Expand Down Expand Up @@ -64,6 +64,8 @@ struct StateChanges {
invited_member_count: Option<u64>,
joined_since_last_sync: bool,
state_events: Vec<PduEvent>,
device_list_updates: HashSet<OwnedUserId>,
left_encrypted_users: HashSet<OwnedUserId>,
}

type PresenceUpdates = HashMap<OwnedUserId, PresenceEvent>;
Expand Down Expand Up @@ -730,14 +732,14 @@ async fn load_joined_room(
.into();

let witness = witness.await;
let mut device_list_updates = HashSet::<OwnedUserId>::new();
let mut left_encrypted_users = HashSet::<OwnedUserId>::new();
let StateChanges {
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events,
mut device_list_updates,
left_encrypted_users,
} = if no_state_changes {
StateChanges::default()
} else {
Expand All @@ -747,8 +749,6 @@ async fn load_joined_room(
room_id,
full_state,
filter,
&mut device_list_updates,
&mut left_encrypted_users,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
Expand Down Expand Up @@ -919,8 +919,6 @@ async fn calculate_state_changes(
room_id: &RoomId,
full_state: bool,
filter: &FilterDefinition,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
Expand All @@ -944,8 +942,6 @@ async fn calculate_state_changes(
room_id,
full_state,
filter,
device_list_updates,
left_encrypted_users,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
Expand Down Expand Up @@ -1013,6 +1009,7 @@ async fn calculate_state_initial(
invited_member_count,
joined_since_last_sync: true,
state_events,
..Default::default()
})
}

Expand All @@ -1024,8 +1021,6 @@ async fn calculate_state_incremental(
room_id: &RoomId,
full_state: bool,
_filter: &FilterDefinition,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
Expand Down Expand Up @@ -1063,79 +1058,72 @@ async fn calculate_state_incremental(
.await;
}

let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();

let since_encryption = services
.rooms
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();

let (encrypted_room, since_encryption) = join(encrypted_room, since_encryption).await;

// Calculations:
let new_encrypted_room = encrypted_room && !since_encryption;
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok()
.await;

let send_member_count = delta_state_events
let (mut device_list_updates, left_encrypted_users) = delta_state_events
.iter()
.any(|event| event.kind == RoomMember);

if encrypted_room {
for state_event in &delta_state_events {
if state_event.kind != RoomMember {
continue;
}
.stream()
.ready_filter(|_| encrypted_room)
.ready_filter(|state_event| state_event.kind == RoomMember)
.ready_filter_map(|state_event| {
let content = state_event.get_content().ok()?;
let user_id = state_event.state_key.as_ref()?.parse().ok()?;
Some((content, user_id))
})
.ready_filter(|(_, user_id): &(RoomMemberEventContent, OwnedUserId)| {
user_id != sender_user
})
.fold_default(|(mut dlu, mut leu): pair!(HashSet<_>), (content, user_id)| async move {
use MembershipState::*;

if let Some(state_key) = &state_event.state_key {
let user_id = UserId::parse(state_key)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
let shares_encrypted_room =
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));

if user_id == sender_user {
continue;
}
match content.membership {
| Join if !shares_encrypted_room(&user_id).await => dlu.insert(user_id),
| Leave => leu.insert(user_id),
| _ => false,
};

let content: RoomMemberEventContent = state_event.get_content()?;

match content.membership {
| MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(services, sender_user, user_id, Some(room_id))
.await
{
device_list_updates.insert(user_id.into());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id.into());
},
| _ => {},
}
}
}
}
(dlu, leu)
})
.await;

// If the user is in a new encrypted room, give them all joined users
let new_encrypted_room = encrypted_room && !since_encryption.await;
if joined_since_last_sync && encrypted_room || new_encrypted_room {
let updates: Vec<OwnedUserId> = services
services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|user_id| sender_user != *user_id)
.filter_map(|user_id| {
share_encrypted_room(services, sender_user, user_id, Some(room_id))
.map(|res| res.or_some(user_id.to_owned()))
.ready_filter(|&user_id| sender_user != user_id)
.map(ToOwned::to_owned)
.broad_filter_map(|user_id| async move {
share_encrypted_room(services, sender_user, &user_id, Some(room_id))
.await
.or_some(user_id)
})
.ready_for_each(|user_id| {
device_list_updates.insert(user_id);
})
.collect()
.await;

// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(updates);
}

let send_member_count = delta_state_events
.iter()
.any(|event| event.kind == RoomMember);

let (joined_member_count, invited_member_count, heroes) = if send_member_count {
calculate_counts(services, room_id, sender_user).await?
} else {
Expand All @@ -1148,6 +1136,8 @@ async fn calculate_state_incremental(
invited_member_count,
joined_since_last_sync,
state_events: delta_state_events,
device_list_updates,
left_encrypted_users,
})
}

Expand Down

0 comments on commit c5926c9

Please sign in to comment.