Skip to content

Commit

Permalink
refactor lazy-loading
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Jan 25, 2025
1 parent 2af4ce2 commit 955ffdb
Show file tree
Hide file tree
Showing 4 changed files with 416 additions and 460 deletions.
111 changes: 55 additions & 56 deletions src/api/client/context.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use axum::extract::State;
use conduwuit::{
at, err, ref_at,
at, deref_at, err, ref_at,
utils::{
future::TryExtExt,
stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
IterStream,
},
Err, PduEvent, Result,
};
use futures::{join, try_join, FutureExt, StreamExt, TryFutureExt};
use ruma::{
api::client::{context::get_context, filter::LazyLoadOptions},
events::StateEventType,
OwnedEventId, UserId,
use futures::{
future::{join, join3, try_join3, OptionFuture},
FutureExt, StreamExt, TryFutureExt,
};
use ruma::{api::client::context::get_context, events::StateEventType, OwnedEventId, UserId};
use service::rooms::{lazy_loading, lazy_loading::Options};

use crate::{
client::message::{event_filter, ignored_filter, update_lazy, visibility_filter, LazySet},
client::message::{event_filter, ignored_filter, lazy_loading_witness, visibility_filter},
Ruma,
};

Expand All @@ -33,10 +33,10 @@ pub(crate) async fn get_context_route(
State(services): State<crate::State>,
body: Ruma<get_context::v3::Request>,
) -> Result<get_context::v3::Response> {
let filter = &body.filter;
let sender = body.sender();
let (sender_user, _) = sender;
let (sender_user, sender_device) = sender;
let room_id = &body.room_id;
let filter = &body.filter;

// Use limit or else 10, with maximum 100
let limit: usize = body
Expand All @@ -45,18 +45,6 @@ pub(crate) async fn get_context_route(
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);

// some clients, at least element, seem to require knowledge of redundant
// members for "inline" profiles on the timeline to work properly
let lazy_load_enabled = matches!(filter.lazy_load_options, LazyLoadOptions::Enabled { .. });

let lazy_load_redundant = if let LazyLoadOptions::Enabled { include_redundant_members } =
filter.lazy_load_options
{
include_redundant_members
} else {
false
};

let base_id = services
.rooms
.timeline
Expand All @@ -75,7 +63,7 @@ pub(crate) async fn get_context_route(
.user_can_see_event(sender_user, &body.room_id, &body.event_id)
.map(Ok);

let (base_id, base_pdu, visible) = try_join!(base_id, base_pdu, visible)?;
let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?;

if base_pdu.room_id != body.room_id || base_pdu.event_id != body.event_id {
return Err!(Request(NotFound("Base event not found.")));
Expand Down Expand Up @@ -112,12 +100,32 @@ pub(crate) async fn get_context_route(
.collect();

let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) =
join!(base_event, events_before, events_after);
join3(base_event, events_before, events_after).await;

let lazy_loading_context = lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
room_id,
token: Some(base_count.into_unsigned()),
options: Some(&filter.lazy_load_options),
};

let lazy_loading_witnessed: OptionFuture<_> = filter
.lazy_load_options
.is_enabled()
.then_some(
base_event
.iter()
.chain(events_before.iter())
.chain(events_after.iter()),
)
.map(|witnessed| lazy_loading_witness(&services, &lazy_loading_context, witnessed))
.into();

let state_at = events_after
.last()
.map(ref_at!(1))
.map_or(body.event_id.as_ref(), |e| e.event_id.as_ref());
.map_or(body.event_id.as_ref(), |pdu| pdu.event_id.as_ref());

let state_ids = services
.rooms
Expand All @@ -126,41 +134,32 @@ pub(crate) async fn get_context_route(
.or_else(|_| services.rooms.state.get_room_shortstatehash(room_id))
.and_then(|shortstatehash| services.rooms.state_accessor.state_full_ids(shortstatehash))
.map_err(|e| err!(Database("State not found: {e}")))
.await?;

let lazy = base_event
.iter()
.chain(events_before.iter())
.chain(events_after.iter())
.stream()
.fold(LazySet::new(), |lazy, item| {
update_lazy(&services, room_id, sender, lazy, item, lazy_load_redundant)
})
.await;
.boxed();

let lazy = &lazy;
let state: Vec<_> = state_ids
.iter()
.stream()
.broad_filter_map(|(shortstatekey, event_id)| {
services
.rooms
.short
.get_statekey_from_short(*shortstatekey)
.map_ok(move |(event_type, state_key)| (event_type, state_key, event_id))
.ok()
})
.ready_filter_map(|(event_type, state_key, event_id)| {
if !lazy_load_enabled || event_type != StateEventType::RoomMember {
return Some(event_id);
let (lazy_loading_witnessed, state_ids) = join(lazy_loading_witnessed, state_ids).await;

let state_ids = state_ids?;
let lazy_loading_witnessed = lazy_loading_witnessed.unwrap_or_default();
let shortstatekeys = state_ids.iter().stream().map(deref_at!(0));

let state: Vec<_> = services
.rooms
.short
.multi_get_statekey_from_short(shortstatekeys)
.zip(state_ids.iter().stream().map(at!(1)))
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| {
if filter.lazy_load_options.is_enabled()
&& event_type == StateEventType::RoomMember
&& state_key
.as_str()
.try_into()
.is_ok_and(|user_id: &UserId| !lazy_loading_witnessed.contains(user_id))
{
return None;
}

state_key
.as_str()
.try_into()
.ok()
.filter(|&user_id: &&UserId| lazy.contains(user_id))
.map(|_| event_id)
Some(event_id)
})
.broad_filter_map(|event_id: &OwnedEventId| {
services.rooms.timeline.get_pdu(event_id).ok()
Expand Down
151 changes: 78 additions & 73 deletions src/api/client/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashSet;

use axum::extract::State;
use conduwuit::{
at, is_equal_to,
Expand All @@ -10,22 +8,27 @@ use conduwuit::{
},
Event, PduCount, Result,
};
use futures::{FutureExt, StreamExt};
use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt};
use ruma::{
api::{
client::{filter::RoomEventFilter, message::get_message_events},
Direction,
},
events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*},
serde::Raw,
DeviceId, OwnedUserId, RoomId, UserId,
RoomId, UserId,
};
use service::{
rooms::{
lazy_loading,
lazy_loading::{Options, Witness},
timeline::PdusIterItem,
},
Services,
};
use service::{rooms::timeline::PdusIterItem, Services};

use crate::Ruma;

pub(crate) type LazySet = HashSet<OwnedUserId>;

/// list of safe and common non-state events to ignore if the user is ignored
const IGNORED_MESSAGE_TYPES: &[TimelineEventType; 17] = &[
Audio,
Expand Down Expand Up @@ -84,13 +87,6 @@ pub(crate) async fn get_message_events_route(
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);

services.rooms.lazy_loading.lazy_load_confirm_delivery(
sender_user,
sender_device,
room_id,
from,
);

if matches!(body.dir, Direction::Backward) {
services
.rooms
Expand Down Expand Up @@ -127,35 +123,34 @@ pub(crate) async fn get_message_events_route(
.collect()
.await;

let lazy = events
.iter()
.stream()
.fold(LazySet::new(), |lazy, item| {
update_lazy(&services, room_id, sender, lazy, item, false)
})
.await;
let lazy_loading_context = lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
room_id,
token: Some(from.into_unsigned()),
options: Some(&filter.lazy_load_options),
};

let state = lazy
.iter()
.stream()
.broad_filter_map(|user_id| get_member_event(&services, room_id, user_id))
let witness: OptionFuture<_> = filter
.lazy_load_options
.is_enabled()
.then(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()))
.into();

let state = witness
.map(Option::into_iter)
.map(|option| option.flat_map(Witness::into_iter))
.map(IterStream::stream)
.into_stream()
.flatten()
.broad_filter_map(|user_id| async move {
get_member_event(&services, room_id, &user_id).await
})
.collect()
.await;

let next_token = events.last().map(at!(0));

if !cfg!(feature = "element_hacks") {
if let Some(next_token) = next_token {
services.rooms.lazy_loading.lazy_load_mark_sent(
sender_user,
sender_device,
room_id,
lazy,
next_token,
);
}
}

let chunk = events
.into_iter()
.map(at!(1))
Expand All @@ -170,6 +165,52 @@ pub(crate) async fn get_message_events_route(
})
}

pub(crate) async fn lazy_loading_witness<'a, I>(
services: &Services,
lazy_loading_context: &lazy_loading::Context<'_>,
events: I,
) -> Witness
where
I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
{
let oldest = events
.clone()
.map(|(count, _)| count)
.copied()
.min()
.unwrap_or_else(PduCount::max);

let newest = events
.clone()
.map(|(count, _)| count)
.copied()
.max()
.unwrap_or_else(PduCount::max);

let receipts = services
.rooms
.read_receipt
.readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned());

pin_mut!(receipts);
let witness: Witness = events
.stream()
.map(|(_, pdu)| pdu.sender.clone())
.chain(
receipts
.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
.map(|(user_id, ..)| user_id.to_owned()),
)
.collect()
.await;

services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
.await
}

async fn get_member_event(
services: &Services,
room_id: &RoomId,
Expand All @@ -184,42 +225,6 @@ async fn get_member_event(
.ok()
}

pub(crate) async fn update_lazy(
services: &Services,
room_id: &RoomId,
sender: (&UserId, &DeviceId),
mut lazy: LazySet,
item: &PdusIterItem,
force: bool,
) -> LazySet {
let (_, event) = &item;
let (sender_user, sender_device) = sender;

/* TODO: Remove the "element_hacks" check when these are resolved:
* https://github.com/vector-im/element-android/issues/3417
* https://github.com/vector-im/element-web/issues/21034
*/
if force || cfg!(features = "element_hacks") {
lazy.insert(event.sender().into());
return lazy;
}

if lazy.contains(event.sender()) {
return lazy;
}

if !services
.rooms
.lazy_loading
.lazy_load_was_sent_before(sender_user, sender_device, room_id, event.sender())
.await
{
lazy.insert(event.sender().into());
}

lazy
}

pub(crate) async fn ignored_filter(
services: &Services,
item: PdusIterItem,
Expand Down
Loading

0 comments on commit 955ffdb

Please sign in to comment.