Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an enqueue time to the send queue system #4385

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,7 @@ pub struct EventTimelineItem {
timestamp: u64,
reactions: Vec<Reaction>,
local_send_state: Option<EventSendState>,
local_created_at: Option<u64>,
read_receipts: HashMap<String, Receipt>,
origin: Option<EventItemOrigin>,
can_be_replied_to: bool,
Expand Down Expand Up @@ -1121,6 +1122,7 @@ impl From<matrix_sdk_ui::timeline::EventTimelineItem> for EventTimelineItem {
timestamp: item.timestamp().0.into(),
reactions,
local_send_state: item.send_state().map(|s| s.into()),
local_created_at: item.local_created_at().map(|t| t.0.into()),
read_receipts,
origin: item.origin(),
can_be_replied_to: item.can_be_replied_to(),
Expand Down
107 changes: 96 additions & 11 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use ruma::{
},
owned_event_id, owned_mxc_uri, room_id,
serde::Raw,
uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId,
TransactionId, UserId,
};
use serde_json::{json, value::Value as JsonValue};

Expand Down Expand Up @@ -980,13 +981,21 @@ impl StateStoreIntegrationTests for DynStateStore {
let ev =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())
.unwrap();
self.save_send_queue_request(room_id, txn.clone(), ev.into(), 0).await?;
self.save_send_queue_request(
room_id,
txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev.into(),
0,
)
.await?;

// Add a single dependent queue request.
self.save_dependent_queued_request(
room_id,
&txn,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await?;
Expand Down Expand Up @@ -1242,7 +1251,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn0.clone(),
MilliSecondsSinceUnixEpoch::now(),
event0.into(),
0,
)
.await
.unwrap();

// Reading it will work.
let pending = self.load_send_queue_requests(room_id).await.unwrap();
Expand All @@ -1266,7 +1283,15 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.unwrap();

self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn,
MilliSecondsSinceUnixEpoch::now(),
event.into(),
0,
)
.await
.unwrap();
}

// Reading all the events should work.
Expand Down Expand Up @@ -1364,7 +1389,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
.unwrap();
self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id2,
txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
event.into(),
0,
)
.await
.unwrap();
}

// Add and remove one event for room3.
Expand All @@ -1374,7 +1407,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
.unwrap();
self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id3,
txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
event.into(),
0,
)
.await
.unwrap();

self.remove_send_queue_request(room_id3, &txn).await.unwrap();
}
Expand All @@ -1399,21 +1440,45 @@ impl StateStoreIntegrationTests for DynStateStore {
let ev0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
.unwrap();
self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap();
self.save_send_queue_request(
room_id,
low0_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev0.into(),
2,
)
.await
.unwrap();

// Saving one request with higher priority should work.
let high_txn = TransactionId::new();
let ev1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
.unwrap();
self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap();
self.save_send_queue_request(
room_id,
high_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev1.into(),
10,
)
.await
.unwrap();

// Saving another request with the low priority should work.
let low1_txn = TransactionId::new();
let ev2 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
.unwrap();
self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap();
self.save_send_queue_request(
room_id,
low1_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev2.into(),
2,
)
.await
.unwrap();

// The requests should be ordered from higher priority to lower, and when equal,
// should use the insertion order instead.
Expand Down Expand Up @@ -1453,7 +1518,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn0.clone(),
MilliSecondsSinceUnixEpoch::now(),
event0.into(),
0,
)
.await
.unwrap();

// No dependents, to start with.
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
Expand All @@ -1464,6 +1537,7 @@ impl StateStoreIntegrationTests for DynStateStore {
room_id,
&txn0,
child_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await
Expand Down Expand Up @@ -1515,12 +1589,21 @@ impl StateStoreIntegrationTests for DynStateStore {
let event1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
.unwrap();
self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn1.clone(),
MilliSecondsSinceUnixEpoch::now(),
event1.into(),
0,
)
.await
.unwrap();

self.save_dependent_queued_request(
room_id,
&txn0,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await
Expand All @@ -1531,6 +1614,7 @@ impl StateStoreIntegrationTests for DynStateStore {
room_id,
&txn1,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
Expand Down Expand Up @@ -1563,6 +1647,7 @@ impl StateStoreIntegrationTests for DynStateStore {
room_id,
&txn,
child_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await
Expand Down
9 changes: 6 additions & 3 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use ruma::{
},
serde::Raw,
time::Instant,
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
};
use tracing::{debug, instrument, warn};

Expand Down Expand Up @@ -750,6 +750,7 @@ impl StateStore for MemoryStore {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
kind: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
Expand All @@ -759,7 +760,7 @@ impl StateStore for MemoryStore {
.send_queue_events
.entry(room_id.to_owned())
.or_default()
.push(QueuedRequest { kind, transaction_id, error: None, priority });
.push(QueuedRequest { kind, transaction_id, error: None, priority, created_at });
Ok(())
}

Expand Down Expand Up @@ -858,6 +859,7 @@ impl StateStore for MemoryStore {
room: &RoomId,
parent_transaction_id: &TransactionId,
own_transaction_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.inner
Expand All @@ -871,6 +873,7 @@ impl StateStore for MemoryStore {
parent_transaction_id: parent_transaction_id.to_owned(),
own_transaction_id,
parent_key: None,
created_at,
});
Ok(())
}
Expand Down
9 changes: 8 additions & 1 deletion crates/matrix-sdk-base/src/store/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use ruma::{
AnyMessageLikeEventContent, EventContent as _, RawExt as _,
},
serde::Raw,
OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt,
MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
TransactionId, UInt,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -131,6 +132,9 @@ pub struct QueuedRequest {
/// The bigger the value, the higher the priority at which this request
/// should be handled.
pub priority: usize,

/// The time that the request was originally attempted.
pub created_at: MilliSecondsSinceUnixEpoch,
}

impl QueuedRequest {
Expand Down Expand Up @@ -371,6 +375,9 @@ pub struct DependentQueuedRequest {
/// If the parent request has been sent, the parent's request identifier
/// returned by the server once the local echo has been sent out.
pub parent_key: Option<SentRequestKey>,

/// The time that the request was originally attempted.
pub created_at: MilliSecondsSinceUnixEpoch,
}

impl DependentQueuedRequest {
Expand Down
12 changes: 8 additions & 4 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use ruma::{
},
serde::Raw,
time::SystemTime,
EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId,
TransactionId, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -359,6 +359,7 @@ pub trait StateStore: AsyncTraitDeps {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error>;
Expand Down Expand Up @@ -421,6 +422,7 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error>;

Expand Down Expand Up @@ -657,11 +659,12 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.0
.save_send_queue_request(room_id, transaction_id, content, priority)
.save_send_queue_request(room_id, transaction_id, created_at, content, priority)
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -711,10 +714,11 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.0
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content)
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
.await
.map_err(Into::into)
}
Expand Down
Loading
Loading