From f5020a90bdab69becdbde9cd5d6ce7e18ea85a77 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 25 Jan 2025 07:18:33 +0000 Subject: [PATCH] refactor lazy-loading Signed-off-by: Jason Volk --- src/api/client/context.rs | 111 ++++--- src/api/client/message.rs | 142 ++++----- src/api/client/sync/v3.rs | 422 ++++++++++---------------- src/service/rooms/lazy_loading/mod.rs | 190 +++++++----- 4 files changed, 408 insertions(+), 457 deletions(-) diff --git a/src/api/client/context.rs b/src/api/client/context.rs index b957561c3..388bcf4d2 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - at, err, ref_at, + at, deref_at, err, ref_at, utils::{ future::TryExtExt, stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt}, @@ -8,15 +8,15 @@ use conduwuit::{ }, Err, PduEvent, Result, }; -use futures::{join, try_join, FutureExt, StreamExt, TryFutureExt}; -use ruma::{ - api::client::{context::get_context, filter::LazyLoadOptions}, - events::StateEventType, - OwnedEventId, UserId, +use futures::{ + future::{join, join3, try_join3, OptionFuture}, + FutureExt, StreamExt, TryFutureExt, }; +use ruma::{api::client::context::get_context, events::StateEventType, OwnedEventId, UserId}; +use service::rooms::{lazy_loading, lazy_loading::Options}; use crate::{ - client::message::{event_filter, ignored_filter, update_lazy, visibility_filter, LazySet}, + client::message::{event_filter, ignored_filter, lazy_loading_witness, visibility_filter}, Ruma, }; @@ -33,10 +33,10 @@ pub(crate) async fn get_context_route( State(services): State, body: Ruma, ) -> Result { - let filter = &body.filter; let sender = body.sender(); - let (sender_user, _) = sender; + let (sender_user, sender_device) = sender; let room_id = &body.room_id; + let filter = &body.filter; // Use limit or else 10, with maximum 100 let limit: usize = body @@ -45,18 +45,6 @@ pub(crate) async fn get_context_route( .unwrap_or(LIMIT_DEFAULT) .min(LIMIT_MAX); - // some clients, at least element, seem to require knowledge of redundant - // members for "inline" profiles on the timeline to work properly - let lazy_load_enabled = matches!(filter.lazy_load_options, LazyLoadOptions::Enabled { .. }); - - let lazy_load_redundant = if let LazyLoadOptions::Enabled { include_redundant_members } = - filter.lazy_load_options - { - include_redundant_members - } else { - false - }; - let base_id = services .rooms .timeline @@ -75,7 +63,7 @@ pub(crate) async fn get_context_route( .user_can_see_event(sender_user, &body.room_id, &body.event_id) .map(Ok); - let (base_id, base_pdu, visible) = try_join!(base_id, base_pdu, visible)?; + let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?; if base_pdu.room_id != body.room_id || base_pdu.event_id != body.event_id { return Err!(Request(NotFound("Base event not found."))); @@ -112,12 +100,32 @@ pub(crate) async fn get_context_route( .collect(); let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) = - join!(base_event, events_before, events_after); + join3(base_event, events_before, events_after).await; + + let lazy_loading_context = lazy_loading::Context { + user_id: sender_user, + device_id: sender_device, + room_id, + token: Some(base_count.into_unsigned()), + options: Some(&filter.lazy_load_options), + }; + + let lazy_loading_witnessed: OptionFuture<_> = filter + .lazy_load_options + .is_enabled() + .then_some( + base_event + .iter() + .chain(events_before.iter()) + .chain(events_after.iter()), + ) + .map(|witnessed| lazy_loading_witness(&services, &lazy_loading_context, witnessed)) + .into(); let state_at = events_after .last() .map(ref_at!(1)) - .map_or(body.event_id.as_ref(), |e| e.event_id.as_ref()); + .map_or(body.event_id.as_ref(), |pdu| pdu.event_id.as_ref()); let state_ids = services .rooms @@ -126,41 +134,32 @@ pub(crate) async fn get_context_route( .or_else(|_| services.rooms.state.get_room_shortstatehash(room_id)) .and_then(|shortstatehash| services.rooms.state_accessor.state_full_ids(shortstatehash)) .map_err(|e| err!(Database("State not found: {e}"))) - .await?; - - let lazy = base_event - .iter() - .chain(events_before.iter()) - .chain(events_after.iter()) - .stream() - .fold(LazySet::new(), |lazy, item| { - update_lazy(&services, room_id, sender, lazy, item, lazy_load_redundant) - }) - .await; + .boxed(); - let lazy = &lazy; - let state: Vec<_> = state_ids - .iter() - .stream() - .broad_filter_map(|(shortstatekey, event_id)| { - services - .rooms - .short - .get_statekey_from_short(*shortstatekey) - .map_ok(move |(event_type, state_key)| (event_type, state_key, event_id)) - .ok() - }) - .ready_filter_map(|(event_type, state_key, event_id)| { - if !lazy_load_enabled || event_type != StateEventType::RoomMember { - return Some(event_id); + let (lazy_loading_witnessed, state_ids) = join(lazy_loading_witnessed, state_ids).await; + + let state_ids = state_ids?; + let lazy_loading_witnessed = lazy_loading_witnessed.unwrap_or_default(); + let shortstatekeys = state_ids.iter().stream().map(deref_at!(0)); + + let state: Vec<_> = services + .rooms + .short + .multi_get_statekey_from_short(shortstatekeys) + .zip(state_ids.iter().stream().map(at!(1))) + .ready_filter_map(|item| Some((item.0.ok()?, item.1))) + .ready_filter_map(|((event_type, state_key), event_id)| { + if filter.lazy_load_options.is_enabled() + && event_type == StateEventType::RoomMember + && state_key + .as_str() + .try_into() + .is_ok_and(|user_id: &UserId| !lazy_loading_witnessed.contains(user_id)) + { + return None; } - state_key - .as_str() - .try_into() - .ok() - .filter(|&user_id: &&UserId| lazy.contains(user_id)) - .map(|_| event_id) + Some(event_id) }) .broad_filter_map(|event_id: &OwnedEventId| { services.rooms.timeline.get_pdu(event_id).ok() diff --git a/src/api/client/message.rs b/src/api/client/message.rs index ec9a14d52..21b8f06c8 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -1,5 +1,3 @@ -use std::collections::HashSet; - use axum::extract::State; use conduwuit::{ at, is_equal_to, @@ -10,7 +8,7 @@ use conduwuit::{ }, Event, PduCount, Result, }; -use futures::{FutureExt, StreamExt}; +use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt}; use ruma::{ api::{ client::{filter::RoomEventFilter, message::get_message_events}, @@ -18,14 +16,19 @@ use ruma::{ }, events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*}, serde::Raw, - DeviceId, OwnedUserId, RoomId, UserId, + RoomId, UserId, +}; +use service::{ + rooms::{ + lazy_loading, + lazy_loading::{Options, Witness}, + timeline::PdusIterItem, + }, + Services, }; -use service::{rooms::timeline::PdusIterItem, Services}; use crate::Ruma; -pub(crate) type LazySet = HashSet; - /// list of safe and common non-state events to ignore if the user is ignored const IGNORED_MESSAGE_TYPES: &[TimelineEventType; 17] = &[ Audio, @@ -84,13 +87,6 @@ pub(crate) async fn get_message_events_route( .unwrap_or(LIMIT_DEFAULT) .min(LIMIT_MAX); - services.rooms.lazy_loading.lazy_load_confirm_delivery( - sender_user, - sender_device, - room_id, - from, - ); - if matches!(body.dir, Direction::Backward) { services .rooms @@ -127,16 +123,24 @@ pub(crate) async fn get_message_events_route( .collect() .await; - let lazy = events - .iter() - .stream() - .fold(LazySet::new(), |lazy, item| { - update_lazy(&services, room_id, sender, lazy, item, false) - }) - .await; + let lazy_loading_context = lazy_loading::Context { + user_id: sender_user, + device_id: sender_device, + room_id, + token: Some(from.into_unsigned()), + options: Some(&filter.lazy_load_options), + }; - let state = lazy + let witness: OptionFuture<_> = filter + .lazy_load_options + .is_enabled() + .then(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter())) + .into(); + + let state = witness + .await .iter() + .flat_map(Witness::iter) .stream() .broad_filter_map(|user_id| get_member_event(&services, room_id, user_id)) .collect() @@ -144,18 +148,6 @@ pub(crate) async fn get_message_events_route( let next_token = events.last().map(at!(0)); - if !cfg!(feature = "element_hacks") { - if let Some(next_token) = next_token { - services.rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy, - next_token, - ); - } - } - let chunk = events .into_iter() .map(at!(1)) @@ -170,6 +162,52 @@ pub(crate) async fn get_message_events_route( }) } +pub(crate) async fn lazy_loading_witness<'a, I>( + services: &Services, + lazy_loading_context: &lazy_loading::Context<'_>, + events: I, +) -> Witness +where + I: Iterator + Clone + Send, +{ + let oldest = events + .clone() + .map(|(count, _)| count) + .copied() + .min() + .unwrap_or_else(PduCount::max); + + let newest = events + .clone() + .map(|(count, _)| count) + .copied() + .max() + .unwrap_or_else(PduCount::max); + + let receipts = services + .rooms + .read_receipt + .readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned()); + + pin_mut!(receipts); + let witness: Witness = events + .stream() + .map(|(_, pdu)| pdu.sender.clone()) + .chain( + receipts + .ready_take_while(|(_, c, _)| *c <= newest.into_unsigned()) + .map(|(user_id, ..)| user_id.to_owned()), + ) + .collect() + .await; + + services + .rooms + .lazy_loading + .witness_retain(witness, lazy_loading_context) + .await +} + async fn get_member_event( services: &Services, room_id: &RoomId, @@ -184,42 +222,6 @@ async fn get_member_event( .ok() } -pub(crate) async fn update_lazy( - services: &Services, - room_id: &RoomId, - sender: (&UserId, &DeviceId), - mut lazy: LazySet, - item: &PdusIterItem, - force: bool, -) -> LazySet { - let (_, event) = &item; - let (sender_user, sender_device) = sender; - - /* TODO: Remove the "element_hacks" check when these are resolved: - * https://github.com/vector-im/element-android/issues/3417 - * https://github.com/vector-im/element-web/issues/21034 - */ - if force || cfg!(features = "element_hacks") { - lazy.insert(event.sender().into()); - return lazy; - } - - if lazy.contains(event.sender()) { - return lazy; - } - - if !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, event.sender()) - .await - { - lazy.insert(event.sender().into()); - } - - lazy -} - pub(crate) async fn ignored_filter( services: &Services, item: PdusIterItem, diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index d6b9f15c3..7cca96162 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -6,9 +6,9 @@ use std::{ use axum::extract::State; use conduwuit::{ - at, err, error, extract_variant, is_equal_to, is_false, + at, err, error, extract_variant, is_equal_to, pdu::EventHash, - result::{FlatOk, LogDebugErr}, + result::FlatOk, utils::{ self, future::OptionExt, @@ -19,16 +19,20 @@ use conduwuit::{ Error, PduCount, PduEvent, Result, }; use conduwuit_service::{ - rooms::short::{ShortStateHash, ShortStateKey}, + rooms::{ + lazy_loading, + lazy_loading::{Options, Witness}, + short::ShortStateHash, + }, Services, }; use futures::{ - future::{join, join3, join4, join5, try_join, try_join3, OptionFuture}, + future::{join, join3, join4, join5, try_join, try_join4, OptionFuture}, FutureExt, StreamExt, TryFutureExt, }; use ruma::{ api::client::{ - filter::{FilterDefinition, LazyLoadOptions}, + filter::FilterDefinition, sync::sync_events::{ self, v3::{ @@ -152,9 +156,14 @@ pub(crate) async fn build_sync_events( let (sender_user, sender_device) = body.sender(); let next_batch = services.globals.current_count()?; - let next_batch_string = next_batch.to_string(); + let since = body + .body + .since + .as_ref() + .and_then(|string| string.parse().ok()) + .unwrap_or(0); - // Load filter + let full_state = body.body.full_state; let filter = match body.body.filter.as_ref() { | None => FilterDefinition::default(), | Some(Filter::FilterDefinition(ref filter)) => filter.clone(), @@ -165,24 +174,6 @@ pub(crate) async fn build_sync_events( .unwrap_or_default(), }; - // some clients, at least element, seem to require knowledge of redundant - // members for "inline" profiles on the timeline to work properly - let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options - { - | LazyLoadOptions::Enabled { include_redundant_members } => - (true, include_redundant_members), - | LazyLoadOptions::Disabled => (false, cfg!(feature = "element_hacks")), - }; - - let full_state = body.body.full_state; - - let since = body - .body - .since - .as_ref() - .and_then(|string| string.parse().ok()) - .unwrap_or(0); - let joined_rooms = services .rooms .state_cache @@ -196,9 +187,8 @@ pub(crate) async fn build_sync_events( room_id.clone(), since, next_batch, - lazy_load_enabled, - lazy_load_send_redundant, full_state, + &filter, ) .map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu)) .ok() @@ -227,9 +217,9 @@ pub(crate) async fn build_sync_events( since, room_id.clone(), sender_user, - &next_batch_string, + next_batch, full_state, - lazy_load_enabled, + &filter, ) .map_ok(move |left_room| (room_id, left_room)) .ok() @@ -358,7 +348,7 @@ pub(crate) async fn build_sync_events( device_one_time_keys_count, // Fallback keys are not yet supported device_unused_fallback_key_types: None, - next_batch: next_batch_string, + next_batch: next_batch.to_string(), presence: Presence { events: presence_updates .unwrap_or_default() @@ -449,7 +439,6 @@ async fn process_presence_updates( fields( room_id = %room_id, full = %full_state, - ll = %lazy_load_enabled, ), )] #[allow(clippy::too_many_arguments)] @@ -458,9 +447,9 @@ async fn handle_left_room( since: u64, ref room_id: OwnedRoomId, sender_user: &UserId, - next_batch_string: &str, + next_batch: u64, full_state: bool, - lazy_load_enabled: bool, + filter: &FilterDefinition, ) -> Result> { let left_count = services .rooms @@ -503,7 +492,7 @@ async fn handle_left_room( account_data: RoomAccountData { events: Vec::new() }, timeline: Timeline { limited: false, - prev_batch: Some(next_batch_string.to_owned()), + prev_batch: Some(next_batch.to_string()), events: Vec::new(), }, state: RoomState { @@ -567,28 +556,32 @@ async fn handle_left_room( .get_statekey_from_short(shortstatekey) .await?; - // TODO: Delete "element_hacks" when this is resolved: https://github.com/vector-im/element-web/issues/22565 - if !lazy_load_enabled - || event_type != StateEventType::RoomMember - || full_state - || (cfg!(feature = "element_hacks") && *sender_user == state_key) + if filter.room.state.lazy_load_options.is_enabled() + && event_type == StateEventType::RoomMember + && !full_state + && state_key + .as_str() + .try_into() + .is_ok_and(|user_id: &UserId| sender_user != user_id) { - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - continue; - }; - - left_state_events.push(pdu.to_sync_state_event()); + continue; } + + let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { + error!("Pdu in state not found: {event_id}"); + continue; + }; + + left_state_events.push(pdu.to_sync_state_event()); } } Ok(Some(LeftRoom { account_data: RoomAccountData { events: Vec::new() }, timeline: Timeline { - limited: true, /* TODO: support left timeline events so we dont need to set this to - * true */ - prev_batch: Some(next_batch_string.to_owned()), + // TODO: support left timeline events so we dont need to set limited to true + limited: true, + prev_batch: Some(next_batch.to_string()), events: Vec::new(), // and so we dont need to set this to empty vec }, state: RoomState { events: left_state_events }, @@ -611,9 +604,8 @@ async fn load_joined_room( ref room_id: OwnedRoomId, since: u64, next_batch: u64, - lazy_load_enabled: bool, - lazy_load_send_redundant: bool, full_state: bool, + filter: &FilterDefinition, ) -> Result<(JoinedRoom, HashSet, HashSet)> { let sincecount = PduCount::Normal(since); let next_batchcount = PduCount::Normal(next_batch); @@ -640,17 +632,26 @@ async fn load_joined_room( 10_usize, ); - let (current_shortstatehash, since_shortstatehash, timeline) = - try_join3(current_shortstatehash, since_shortstatehash, timeline).await?; + let receipt_events = services + .rooms + .read_receipt + .readreceipts_since(room_id, since) + .filter_map(|(read_user, _, edu)| async move { + services + .users + .user_is_ignored(read_user, sender_user) + .await + .or_some((read_user.to_owned(), edu)) + }) + .collect::>>() + .map(Ok); + + let (current_shortstatehash, since_shortstatehash, timeline, receipt_events) = + try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events) + .boxed() + .await?; let (timeline_pdus, limited) = timeline; - let timeline_users = - timeline_pdus - .iter() - .fold(HashSet::new(), |mut timeline_users, (_, event)| { - timeline_users.insert(event.sender.as_str().to_owned()); - timeline_users - }); let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() @@ -662,21 +663,68 @@ async fn load_joined_room( }) .into(); - let send_notification_counts = last_notification_read - .is_none_or(|&count| count > since) - .await; - - services.rooms.lazy_loading.lazy_load_confirm_delivery( - sender_user, - sender_device, - room_id, - sincecount, - ); - let no_state_changes = timeline_pdus.is_empty() && (since_shortstatehash.is_none() || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash))); + let since_sender_member: OptionFuture<_> = since_shortstatehash + .map(|short| { + services + .rooms + .state_accessor + .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) + .ok() + }) + .into(); + + let joined_since_last_sync = + since_sender_member + .await + .flatten() + .is_none_or(|content: RoomMemberEventContent| { + content.membership != MembershipState::Join + }); + + let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() + || filter.room.timeline.lazy_load_options.is_enabled(); + + let generate_witness = + lazy_loading_enabled && (since_shortstatehash.is_none() || joined_since_last_sync); + + let lazy_reset = lazy_loading_enabled && since_shortstatehash.is_none(); + + let lazy_loading_context = &lazy_loading::Context { + user_id: sender_user, + device_id: sender_device, + room_id, + token: None, + options: Some(&filter.room.state.lazy_load_options), + }; + + // Reset lazy loading because this is an initial sync + let lazy_load_reset: OptionFuture<_> = lazy_reset + .then(|| services.rooms.lazy_loading.reset(lazy_loading_context)) + .into(); + + lazy_load_reset.await; + let witness: Option = generate_witness.then(|| { + timeline_pdus + .iter() + .map(|(_, pdu)| pdu.sender.clone()) + .chain(receipt_events.keys().cloned()) + .collect() + }); + + let witness: OptionFuture<_> = witness + .map(|witness| { + services + .rooms + .lazy_loading + .witness_retain(witness, lazy_loading_context) + }) + .into(); + + let witness = witness.await; let mut device_list_updates = HashSet::::new(); let mut left_encrypted_users = HashSet::::new(); let StateChanges { @@ -691,19 +739,17 @@ async fn load_joined_room( calculate_state_changes( services, sender_user, - sender_device, room_id, - next_batchcount, - lazy_load_enabled, - lazy_load_send_redundant, full_state, + filter, &mut device_list_updates, &mut left_encrypted_users, since_shortstatehash, current_shortstatehash, - &timeline_pdus, - &timeline_users, + joined_since_last_sync, + witness.as_ref(), ) + .boxed() .await? }; @@ -728,19 +774,6 @@ async fn load_joined_room( .map(|(_, pdu)| pdu.to_sync_room_event()) .collect(); - let receipt_events = services - .rooms - .read_receipt - .readreceipts_since(room_id, since) - .filter_map(|(read_user, _, edu)| async move { - services - .users - .user_is_ignored(read_user, sender_user) - .await - .or_some((read_user.to_owned(), edu)) - }) - .collect::>>(); - let typing_events = services .rooms .typing @@ -760,6 +793,10 @@ async fn load_joined_room( }) .unwrap_or(Vec::new()); + let send_notification_counts = last_notification_read + .is_none_or(|&count| count > since) + .await; + let notification_count: OptionFuture<_> = send_notification_counts .then(|| { services @@ -782,14 +819,14 @@ async fn load_joined_room( }) .into(); - let events = join4(room_events, account_data_events, receipt_events, typing_events); + let events = join3(room_events, account_data_events, typing_events); let unread_notifications = join(notification_count, highlight_count); let (unread_notifications, events, device_updates) = join3(unread_notifications, events, device_updates) .boxed() .await; - let (room_events, account_data_events, receipt_events, typing_events) = events; + let (room_events, account_data_events, typing_events) = events; let (notification_count, highlight_count) = unread_notifications; device_list_updates.extend(device_updates); @@ -866,7 +903,6 @@ async fn load_joined_room( skip_all, fields( full = %full_state, - ll = ?(lazy_load_enabled, lazy_load_send_redundant), cs = %current_shortstatehash, ss = ?since_shortstatehash, ) @@ -875,64 +911,38 @@ async fn load_joined_room( async fn calculate_state_changes( services: &Services, sender_user: &UserId, - sender_device: &DeviceId, room_id: &RoomId, - next_batchcount: PduCount, - lazy_load_enabled: bool, - lazy_load_send_redundant: bool, full_state: bool, + filter: &FilterDefinition, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, since_shortstatehash: Option, current_shortstatehash: ShortStateHash, - timeline_pdus: &Vec<(PduCount, PduEvent)>, - timeline_users: &HashSet, + joined_since_last_sync: bool, + witness: Option<&Witness>, ) -> Result { - let since_sender_member: OptionFuture<_> = since_shortstatehash - .map(|short| { - services - .rooms - .state_accessor - .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) - .ok() - }) - .into(); - - let joined_since_last_sync = - since_sender_member - .await - .flatten() - .is_none_or(|content: RoomMemberEventContent| { - content.membership != MembershipState::Join - }); - if since_shortstatehash.is_none() || joined_since_last_sync { calculate_state_initial( services, sender_user, - sender_device, room_id, - next_batchcount, - lazy_load_enabled, full_state, + filter, current_shortstatehash, - timeline_users, + witness, ) .await } else { calculate_state_incremental( services, sender_user, - sender_device, room_id, - next_batchcount, - lazy_load_send_redundant, full_state, + filter, device_list_updates, left_encrypted_users, since_shortstatehash, current_shortstatehash, - timeline_pdus, joined_since_last_sync, ) .await @@ -944,87 +954,54 @@ async fn calculate_state_changes( async fn calculate_state_initial( services: &Services, sender_user: &UserId, - sender_device: &DeviceId, room_id: &RoomId, - next_batchcount: PduCount, - lazy_load_enabled: bool, full_state: bool, + filter: &FilterDefinition, current_shortstatehash: ShortStateHash, - timeline_users: &HashSet, + witness: Option<&Witness>, ) -> Result { - // Probably since = 0, we will do an initial sync - let state = services + let state_events = services .rooms .state_accessor .state_full_ids(current_shortstatehash) - .await? - .into_iter() - .stream() - .broad_filter_map(|(shortstatekey, event_id): (ShortStateKey, OwnedEventId)| { - services - .rooms - .short - .get_statekey_from_short(shortstatekey) - .map_ok(move |(event_type, state_key)| ((event_type, state_key), event_id)) - .ok() - }) - .fold((Vec::new(), HashSet::new()), |a, item| async move { - let (mut state_events, mut lazy_loaded) = a; - let ((event_type, state_key), event_id) = item; - - if event_type != StateEventType::RoomMember { - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - return (state_events, lazy_loaded); - }; - - state_events.push(pdu); - return (state_events, lazy_loaded); - } + .await?; - // TODO: Delete "element_hacks" when this is resolved: https://github.com/vector-im/element-web/issues/22565 - if !lazy_load_enabled - || full_state - || timeline_users.contains(&state_key) - || (cfg!(feature = "element_hacks") && *sender_user == state_key) - { - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - return (state_events, lazy_loaded); - }; - - // This check is in case a bad user ID made it into the database - if let Ok(uid) = OwnedUserId::parse(&state_key) { - lazy_loaded.insert(uid); - } + let shortstatekeys = state_events.keys().copied().stream(); - state_events.push(pdu); + let state_events = services + .rooms + .short + .multi_get_statekey_from_short(shortstatekeys) + .zip(state_events.values().cloned().stream()) + .ready_filter_map(|item| Some((item.0.ok()?, item.1))) + .ready_filter_map(|((event_type, state_key), event_id)| { + let lazy_load_enabled = filter.room.state.lazy_load_options.is_enabled() + || filter.room.timeline.lazy_load_options.is_enabled(); + + if lazy_load_enabled + && event_type == StateEventType::RoomMember + && !full_state + && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { + sender_user != user_id + && witness.is_some_and(|witness| !witness.contains(user_id)) + }) { + return None; } - (state_events, lazy_loaded) + Some(event_id) + }) + .broad_filter_map(|event_id: OwnedEventId| async move { + services.rooms.timeline.get_pdu(&event_id).await.ok() }) + .collect() .map(Ok); let counts = calculate_counts(services, room_id, sender_user); - let ((joined_member_count, invited_member_count, heroes), (state_events, lazy_loaded)) = - try_join(counts, state).boxed().await?; - - // Reset lazy loading because this is an initial sync - services - .rooms - .lazy_loading - .lazy_load_reset(sender_user, sender_device, room_id) - .await; + let ((joined_member_count, invited_member_count, heroes), state_events) = + try_join(counts, state_events).boxed().await?; // The state_events above should contain all timeline_users, let's mark them as // lazy loaded. - services.rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_batchcount, - ); Ok(StateChanges { heroes, @@ -1040,16 +1017,13 @@ async fn calculate_state_initial( async fn calculate_state_incremental( services: &Services, sender_user: &UserId, - sender_device: &DeviceId, room_id: &RoomId, - next_batchcount: PduCount, - lazy_load_send_redundant: bool, full_state: bool, + _filter: &FilterDefinition, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, since_shortstatehash: Option, current_shortstatehash: ShortStateHash, - timeline_pdus: &Vec<(PduCount, PduEvent)>, joined_since_last_sync: bool, ) -> Result { // Incremental /sync @@ -1162,76 +1136,12 @@ async fn calculate_state_incremental( (None, None, None) }; - let mut state_events = delta_state_events; - - // Mark all member events we're returning as lazy-loaded - let mut lazy_loaded = state_events - .iter() - .filter(|pdu| pdu.kind == RoomMember) - .filter_map(|pdu| { - pdu.state_key - .clone() - .map(TryInto::try_into) - .map(LogDebugErr::log_debug_err) - .flat_ok() - }) - .fold(HashSet::new(), |mut lazy_loaded, user_id| { - lazy_loaded.insert(user_id); - lazy_loaded - }); - - // Fetch contextual member state events for events from the timeline, and - // mark them as lazy-loaded as well. - for (_, event) in timeline_pdus { - if lazy_loaded.contains(&event.sender) { - continue; - } - - let sent_before: OptionFuture<_> = (!lazy_load_send_redundant) - .then(|| { - services.rooms.lazy_loading.lazy_load_was_sent_before( - sender_user, - sender_device, - room_id, - &event.sender, - ) - }) - .into(); - - let member_event: OptionFuture<_> = sent_before - .await - .is_none_or(is_false!()) - .then(|| { - services.rooms.state_accessor.room_state_get( - room_id, - &StateEventType::RoomMember, - event.sender.as_str(), - ) - }) - .into(); - - let Some(Ok(member_event)) = member_event.await else { - continue; - }; - - lazy_loaded.insert(event.sender.clone()); - state_events.push(member_event); - } - - services.rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_batchcount, - ); - Ok(StateChanges { heroes, joined_member_count, invited_member_count, joined_since_last_sync, - state_events, + state_events: delta_state_events, }) } diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs index c3c27b9ed..5cc912ecf 100644 --- a/src/service/rooms/lazy_loading/mod.rs +++ b/src/service/rooms/lazy_loading/mod.rs @@ -1,19 +1,17 @@ -use std::{ - collections::{HashMap, HashSet}, - fmt::Write, - sync::{Arc, Mutex}, -}; +//! Lazy Loading + +use std::{collections::HashSet, sync::Arc}; use conduwuit::{ implement, - utils::{stream::TryIgnore, ReadyExt}, - PduCount, Result, + utils::{stream::TryIgnore, IterStream, ReadyExt}, + Result, }; -use database::{Interfix, Map}; -use ruma::{DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, RoomId, UserId}; +use database::{Deserialized, Handle, Interfix, Map}; +use futures::{pin_mut, Stream, StreamExt}; +use ruma::{api::client::filter::LazyLoadOptions, DeviceId, OwnedUserId, RoomId, UserId}; pub struct Service { - lazy_load_waiting: Mutex, db: Data, } @@ -21,93 +19,135 @@ struct Data { lazyloadedids: Arc, } -type LazyLoadWaiting = HashMap; -type LazyLoadWaitingKey = (OwnedUserId, OwnedDeviceId, OwnedRoomId, PduCount); -type LazyLoadWaitingVal = HashSet; +pub trait Options: Send + Sync { + fn is_enabled(&self) -> bool; + fn include_redundant_members(&self) -> bool; +} + +#[derive(Clone, Debug)] +pub struct Context<'a> { + pub user_id: &'a UserId, + pub device_id: &'a DeviceId, + pub room_id: &'a RoomId, + pub token: Option, + pub options: Option<&'a LazyLoadOptions>, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Status { + Unseen, + Seen(u64), +} + +pub type Witness = HashSet; +type Key<'a> = (&'a UserId, &'a DeviceId, &'a RoomId, &'a UserId); impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { - lazy_load_waiting: LazyLoadWaiting::new().into(), db: Data { lazyloadedids: args.db["lazyloadedids"].clone(), }, })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result<()> { - let lazy_load_waiting = self.lazy_load_waiting.lock().expect("locked").len(); - writeln!(out, "lazy_load_waiting: {lazy_load_waiting}")?; - - Ok(()) - } - - fn clear_cache(&self) { self.lazy_load_waiting.lock().expect("locked").clear(); } - fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } #[implement(Service)] #[tracing::instrument(skip(self), level = "debug")] -#[inline] -pub async fn lazy_load_was_sent_before( - &self, - user_id: &UserId, - device_id: &DeviceId, - room_id: &RoomId, - ll_user: &UserId, -) -> bool { - let key = (user_id, device_id, room_id, ll_user); - self.db.lazyloadedids.qry(&key).await.is_ok() -} - -#[implement(Service)] -#[tracing::instrument(skip(self), level = "debug")] -pub fn lazy_load_mark_sent( - &self, - user_id: &UserId, - device_id: &DeviceId, - room_id: &RoomId, - lazy_load: HashSet, - count: PduCount, -) { - let key = (user_id.to_owned(), device_id.to_owned(), room_id.to_owned(), count); - - self.lazy_load_waiting - .lock() - .expect("locked") - .insert(key, lazy_load); +pub async fn reset(&self, ctx: &Context<'_>) { + let prefix = (ctx.user_id, ctx.device_id, ctx.room_id, Interfix); + self.db + .lazyloadedids + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| self.db.lazyloadedids.remove(key)) + .await; } #[implement(Service)] -#[tracing::instrument(skip(self), level = "debug")] -pub fn lazy_load_confirm_delivery( - &self, - user_id: &UserId, - device_id: &DeviceId, - room_id: &RoomId, - since: PduCount, -) { - let key = (user_id.to_owned(), device_id.to_owned(), room_id.to_owned(), since); - - let Some(user_ids) = self.lazy_load_waiting.lock().expect("locked").remove(&key) else { - return; - }; - - for ll_id in &user_ids { - let key = (user_id, device_id, room_id, ll_id); - self.db.lazyloadedids.put_raw(key, []); +#[tracing::instrument(name = "retain", level = "debug", skip_all)] +pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witness { + debug_assert!( + ctx.options.is_none_or(Options::is_enabled), + "lazy loading should be enabled by your options" + ); + + let include_redundant = cfg!(feature = "element_hacks") + || ctx.options.is_some_and(Options::include_redundant_members); + + let witness = self + .witness(ctx, senders.iter().map(AsRef::as_ref)) + .zip(senders.iter().stream()); + + pin_mut!(witness); + let mut senders = Witness::with_capacity(senders.len()); + while let Some((status, sender)) = witness.next().await { + if status == Status::Unseen || include_redundant { + senders.insert(sender.into()); + continue; + } + + if let Status::Seen(seen) = status { + if seen == 0 || ctx.token == Some(seen) { + senders.insert(sender.into()); + continue; + } + } } + + senders } #[implement(Service)] -#[tracing::instrument(skip(self), level = "debug")] -pub async fn lazy_load_reset(&self, user_id: &UserId, device_id: &DeviceId, room_id: &RoomId) { - let prefix = (user_id, device_id, room_id, Interfix); +fn witness<'a, I>( + &'a self, + ctx: &'a Context<'a>, + senders: I, +) -> impl Stream + Send + 'a +where + I: Iterator + Send + Clone + 'a, +{ + let make_key = + |sender: &'a UserId| -> Key<'a> { (ctx.user_id, ctx.device_id, ctx.room_id, sender) }; + self.db .lazyloadedids - .keys_prefix_raw(&prefix) - .ignore_err() - .ready_for_each(|key| self.db.lazyloadedids.remove(key)) - .await; + .qry_batch(senders.clone().stream().map(make_key)) + .map(into_status) + .zip(senders.stream()) + .map(move |(status, sender)| { + if matches!(status, Status::Unseen) { + self.db + .lazyloadedids + .put_aput::<8, _, _>(make_key(sender), 0_u64); + } else if matches!(status, Status::Seen(0)) { + self.db + .lazyloadedids + .put_aput::<8, _, _>(make_key(sender), ctx.token.unwrap_or(0_u64)); + } + + status + }) +} + +fn into_status(result: Result>) -> Status { + match result.and_then(|handle| handle.deserialized()) { + | Ok(seen) => Status::Seen(seen), + | Err(e) if e.is_not_found() => Status::Unseen, + | Err(e) => e.panic(), + } +} + +impl Options for LazyLoadOptions { + fn include_redundant_members(&self) -> bool { + if let Self::Enabled { include_redundant_members } = self { + *include_redundant_members + } else { + false + } + } + + fn is_enabled(&self) -> bool { !self.is_disabled() } }