From a7f6ec5d12b3b9878ccdc31b63763513a6c4c8d2 Mon Sep 17 00:00:00 2001 From: Daniel Salinas Date: Thu, 12 Dec 2024 11:18:52 -0500 Subject: [PATCH 1/4] Add created_at time persisted send_queue system --- bindings/matrix-sdk-ffi/src/timeline/mod.rs | 2 + .../src/store/integration_tests.rs | 107 ++++++++++++++++-- .../matrix-sdk-base/src/store/memory_store.rs | 23 ++-- .../matrix-sdk-base/src/store/send_queue.rs | 10 +- crates/matrix-sdk-base/src/store/traits.rs | 12 +- .../src/state_store/mod.rs | 21 +++- .../010_send_queue_enqueue_time.sql | 6 + crates/matrix-sdk-sqlite/src/state_store.rs | 92 +++++++++++---- .../src/timeline/event_handler.rs | 1 + .../src/timeline/event_item/local.rs | 4 +- .../src/timeline/event_item/mod.rs | 5 + .../matrix-sdk-ui/src/timeline/tests/echo.rs | 2 +- .../tests/integration/timeline/echo.rs | 4 +- .../tests/integration/timeline/queue.rs | 4 +- crates/matrix-sdk/src/send_queue/mod.rs | 83 +++++++++++++- crates/matrix-sdk/src/send_queue/upload.rs | 15 ++- 16 files changed, 325 insertions(+), 66 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 7a4df082f0d..3520ce64c8a 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -1033,6 +1033,7 @@ pub struct EventTimelineItem { timestamp: Timestamp, reactions: Vec, local_send_state: Option, + local_created_at: Option, read_receipts: HashMap, origin: Option, can_be_replied_to: bool, @@ -1070,6 +1071,7 @@ impl From for EventTimelineItem { timestamp: item.timestamp().into(), reactions, local_send_state: item.send_state().map(|s| s.into()), + local_created_at: item.local_created_at().map(|t| t.0.into()), read_receipts, origin: item.origin(), can_be_replied_to: item.can_be_replied_to(), diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index c6b3b5a7c3c..94eb18221ca 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -29,7 +29,8 @@ use ruma::{ }, owned_event_id, owned_mxc_uri, room_id, serde::Raw, - uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId, + uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, + TransactionId, UserId, }; use serde_json::{json, value::Value as JsonValue}; @@ -980,13 +981,21 @@ impl StateStoreIntegrationTests for DynStateStore { let ev = SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into()) .unwrap(); - self.save_send_queue_request(room_id, txn.clone(), ev.into(), 0).await?; + self.save_send_queue_request( + room_id, + txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev.into(), + 0, + ) + .await?; // Add a single dependent queue request. self.save_dependent_queued_request( room_id, &txn, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await?; @@ -1242,7 +1251,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn0.clone(), + MilliSecondsSinceUnixEpoch::now(), + event0.into(), + 0, + ) + .await + .unwrap(); // Reading it will work. let pending = self.load_send_queue_requests(room_id).await.unwrap(); @@ -1266,7 +1283,15 @@ impl StateStoreIntegrationTests for DynStateStore { ) .unwrap(); - self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn, + MilliSecondsSinceUnixEpoch::now(), + event.into(), + 0, + ) + .await + .unwrap(); } // Reading all the events should work. @@ -1364,7 +1389,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into()) .unwrap(); - self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id2, + txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + event.into(), + 0, + ) + .await + .unwrap(); } // Add and remove one event for room3. @@ -1374,7 +1407,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into()) .unwrap(); - self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id3, + txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + event.into(), + 0, + ) + .await + .unwrap(); self.remove_send_queue_request(room_id3, &txn).await.unwrap(); } @@ -1399,21 +1440,45 @@ impl StateStoreIntegrationTests for DynStateStore { let ev0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into()) .unwrap(); - self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap(); + self.save_send_queue_request( + room_id, + low0_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev0.into(), + 2, + ) + .await + .unwrap(); // Saving one request with higher priority should work. let high_txn = TransactionId::new(); let ev1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into()) .unwrap(); - self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap(); + self.save_send_queue_request( + room_id, + high_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev1.into(), + 10, + ) + .await + .unwrap(); // Saving another request with the low priority should work. let low1_txn = TransactionId::new(); let ev2 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into()) .unwrap(); - self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap(); + self.save_send_queue_request( + room_id, + low1_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev2.into(), + 2, + ) + .await + .unwrap(); // The requests should be ordered from higher priority to lower, and when equal, // should use the insertion order instead. @@ -1453,7 +1518,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn0.clone(), + MilliSecondsSinceUnixEpoch::now(), + event0.into(), + 0, + ) + .await + .unwrap(); // No dependents, to start with. assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty()); @@ -1464,6 +1537,7 @@ impl StateStoreIntegrationTests for DynStateStore { room_id, &txn0, child_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await @@ -1515,12 +1589,21 @@ impl StateStoreIntegrationTests for DynStateStore { let event1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into()) .unwrap(); - self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn1.clone(), + MilliSecondsSinceUnixEpoch::now(), + event1.into(), + 0, + ) + .await + .unwrap(); self.save_dependent_queued_request( room_id, &txn0, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await @@ -1531,6 +1614,7 @@ impl StateStoreIntegrationTests for DynStateStore { room_id, &txn1, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), @@ -1563,6 +1647,7 @@ impl StateStoreIntegrationTests for DynStateStore { room_id, &txn, child_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 9148c9b34da..5c9c98d2de1 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -30,8 +30,8 @@ use ruma::{ }, serde::Raw, time::Instant, - CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, - OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, + OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, }; use tracing::{debug, instrument, warn}; @@ -750,16 +750,19 @@ impl StateStore for MemoryStore { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, kind: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { - self.inner - .write() - .unwrap() - .send_queue_events - .entry(room_id.to_owned()) - .or_default() - .push(QueuedRequest { kind, transaction_id, error: None, priority }); + self.inner.write().unwrap().send_queue_events.entry(room_id.to_owned()).or_default().push( + QueuedRequest { + kind, + transaction_id, + error: None, + priority, + created_at: Some(created_at), + }, + ); Ok(()) } @@ -858,6 +861,7 @@ impl StateStore for MemoryStore { room: &RoomId, parent_transaction_id: &TransactionId, own_transaction_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<(), Self::Error> { self.inner @@ -871,6 +875,7 @@ impl StateStore for MemoryStore { parent_transaction_id: parent_transaction_id.to_owned(), own_transaction_id, parent_key: None, + created_at: Some(created_at), }); Ok(()) } diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index ece50344e9f..289def806e6 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -23,7 +23,8 @@ use ruma::{ AnyMessageLikeEventContent, EventContent as _, RawExt as _, }, serde::Raw, - OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt, + MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, + TransactionId, UInt, }; use serde::{Deserialize, Serialize}; @@ -131,6 +132,9 @@ pub struct QueuedRequest { /// The bigger the value, the higher the priority at which this request /// should be handled. pub priority: usize, + + /// The time that the request was original attempted. + pub created_at: Option, } impl QueuedRequest { @@ -371,6 +375,10 @@ pub struct DependentQueuedRequest { /// If the parent request has been sent, the parent's request identifier /// returned by the server once the local echo has been sent out. pub parent_key: Option, + + /// The time that the request was original attempted. + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option, } impl DependentQueuedRequest { diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 5f651483f5b..446bc9fbddd 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -35,8 +35,8 @@ use ruma::{ }, serde::Raw, time::SystemTime, - EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, - TransactionId, UserId, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId, + OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, }; use serde::{Deserialize, Serialize}; @@ -359,6 +359,7 @@ pub trait StateStore: AsyncTraitDeps { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, request: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error>; @@ -421,6 +422,7 @@ pub trait StateStore: AsyncTraitDeps { room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<(), Self::Error>; @@ -657,11 +659,12 @@ impl StateStore for EraseStateStoreError { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { self.0 - .save_send_queue_request(room_id, transaction_id, content, priority) + .save_send_queue_request(room_id, transaction_id, created_at, content, priority) .await .map_err(Into::into) } @@ -711,10 +714,11 @@ impl StateStore for EraseStateStoreError { room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<(), Self::Error> { self.0 - .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content) + .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content) .await .map_err(Into::into) } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 01d386f354f..c3fff22647e 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -44,8 +44,8 @@ use ruma::{ GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent, }, serde::Raw, - CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, - OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, + OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tracing::{debug, warn}; @@ -442,6 +442,10 @@ struct PersistedQueuedRequest { priority: Option, + /// The time the original message was first attempted to be sent at. + #[serde(skip_serializing_if = "Option::is_none")] + created_at: Option, + // Migrated fields: keep these private, they're not used anymore elsewhere in the code base. /// Deprecated (from old format), now replaced with error field. is_wedged: Option, @@ -467,7 +471,13 @@ impl PersistedQueuedRequest { // By default, events without a priority have a priority of 0. let priority = self.priority.unwrap_or(0); - Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority }) + Some(QueuedRequest { + kind, + transaction_id: self.transaction_id, + error, + priority, + created_at: self.created_at, + }) } } @@ -1370,6 +1380,7 @@ impl_state_store!({ &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, kind: QueuedRequestKind, priority: usize, ) -> Result<()> { @@ -1391,7 +1402,6 @@ impl_state_store!({ || Ok(Vec::new()), |val| self.deserialize_value::>(&val), )?; - // Push the new request. prev.push(PersistedQueuedRequest { room_id: room_id.to_owned(), @@ -1401,6 +1411,7 @@ impl_state_store!({ is_wedged: None, event: None, priority: Some(priority), + created_at: Some(created_at), }); // Save the new vector into db. @@ -1570,6 +1581,7 @@ impl_state_store!({ room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<()> { let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); @@ -1596,6 +1608,7 @@ impl_state_store!({ parent_transaction_id: parent_txn_id.to_owned(), own_transaction_id: own_txn_id, parent_key: None, + created_at: Some(created_at), }); // Save the new vector into db. diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql b/crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql new file mode 100644 index 00000000000..7dcf3bf6d04 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql @@ -0,0 +1,6 @@ +-- Migration script to add the created_at column to the send_queue_events table +ALTER TABLE "send_queue_events" +ADD COLUMN "created_at" INTEGER DEFAULT NULL; + +ALTER TABLE "dependent_send_queue_events" +ADD COLUMN "created_at" INTEGER DEFAULT NULL; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index adfd9d5b5a3..c0e58dcbbe5 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -32,8 +32,8 @@ use ruma::{ GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, }, serde::Raw, - CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, - RoomId, RoomVersionId, TransactionId, UserId, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, + OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId, }; use rusqlite::{OptionalExtension, Transaction}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -69,7 +69,7 @@ mod keys { /// This is used to figure whether the sqlite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`SqliteStateStore::run_migrations`] function.. -const DATABASE_VERSION: u8 = 10; +const DATABASE_VERSION: u8 = 11; /// A sqlite based cryptostore. #[derive(Clone)] @@ -318,6 +318,17 @@ impl SqliteStateStore { .await?; } + if from < 11 && to >= 11 { + conn.with_transaction(move |txn| { + // Run the migration. + txn.execute_batch(include_str!( + "../migrations/state_store/010_send_queue_enqueue_time.sql" + ))?; + txn.set_db_version(11) + }) + .await?; + } + Ok(()) } @@ -1757,6 +1768,7 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { @@ -1764,16 +1776,16 @@ impl StateStore for SqliteStateStore { let room_id_value = self.serialize_value(&room_id.to_owned())?; let content = self.serialize_json(&content)?; - // The transaction id is used both as a key (in remove/update) and a value (as // it's useful for the callers), so we keep it as is, and neither hash // it (with encode_key) or encrypt it (through serialize_value). After // all, it carries no personal information, so this is considered fine. + let created_at_ts: u64 = created_at.0.into(); self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?; + txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?; Ok(()) }) .await @@ -1835,14 +1847,14 @@ impl StateStore for SqliteStateStore { // Note: ROWID is always present and is an auto-incremented integer counter. We // want to maintain the insertion order, so we can sort using it. // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, Vec, Option>, usize)> = self + let res: Vec<(String, Vec, Option>, usize, u64)> = self .acquire() .await? .prepare( - "SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID", + "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID", |mut stmt| { stmt.query((room_id,))? - .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) + .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))) .collect() }, ) @@ -1850,11 +1862,13 @@ impl StateStore for SqliteStateStore { let mut requests = Vec::with_capacity(res.len()); for entry in res { + let created_at = UInt::new(entry.4).map(MilliSecondsSinceUnixEpoch); requests.push(QueuedRequest { transaction_id: entry.0.into(), kind: self.deserialize_json(&entry.1)?, error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?, priority: entry.3, + created_at, }); } @@ -1912,6 +1926,7 @@ impl StateStore for SqliteStateStore { room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<()> { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); @@ -1921,15 +1936,22 @@ impl StateStore for SqliteStateStore { let parent_txn_id = parent_txn_id.to_string(); let own_txn_id = own_txn_id.to_string(); + let created_at_ts: u64 = created_at.0.into(); self.acquire() .await? .with_transaction(move |txn| { txn.prepare_cached( r#"INSERT INTO dependent_send_queue_events - (room_id, parent_transaction_id, own_transaction_id, content) - VALUES (?, ?, ?, ?)"#, + (room_id, parent_transaction_id, own_transaction_id, content, created_at) + VALUES (?, ?, ?, ?, ?)"#, )? - .execute((room_id, parent_txn_id, own_txn_id, content))?; + .execute(( + room_id, + parent_txn_id, + own_txn_id, + content, + created_at_ts, + ))?; Ok(()) }) .await @@ -2022,14 +2044,14 @@ impl StateStore for SqliteStateStore { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); // Note: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, String, Option>, Vec)> = self + let res: Vec<(String, String, Option>, Vec, u64)> = self .acquire() .await? .prepare( - "SELECT own_transaction_id, parent_transaction_id, parent_key, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", + "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", |mut stmt| { stmt.query((room_id,))? - .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) + .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))) .collect() }, ) @@ -2037,11 +2059,13 @@ impl StateStore for SqliteStateStore { let mut dependent_events = Vec::with_capacity(res.len()); for entry in res { + let created_at = UInt::new(entry.4).map(MilliSecondsSinceUnixEpoch); dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), parent_transaction_id: entry.1.into(), parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?, kind: self.deserialize_json(&entry.3)?, + created_at, }); } @@ -2395,16 +2419,15 @@ mod migration_tests { let wedge_tx = wedged_event_transaction_id.clone(); let local_tx = local_event_transaction_id.clone(); - db.save_dependent_queued_request( - room_id, - &local_tx, - ChildTransactionId::new(), - DependentQueuedRequestKind::RedactEvent, - ) - .await - .unwrap(); - conn.with_transaction(move |txn| { + add_dependent_send_queue_event_v7( + &db, + txn, + room_id, + &local_tx, + ChildTransactionId::new(), + DependentQueuedRequestKind::RedactEvent, + )?; add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?; add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?; Result::<_, Error>::Ok(()) @@ -2442,6 +2465,29 @@ mod migration_tests { txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")? .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?; + Ok(()) + } + fn add_dependent_send_queue_event_v7( + this: &SqliteStateStore, + txn: &Transaction<'_>, + room_id: &RoomId, + parent_txn_id: &TransactionId, + own_txn_id: ChildTransactionId, + content: DependentQueuedRequestKind, + ) -> Result<(), Error> { + let room_id_value = this.serialize_value(&room_id.to_owned())?; + + let parent_txn_id = parent_txn_id.to_string(); + let own_txn_id = own_txn_id.to_string(); + let content = this.serialize_json(&content)?; + + txn.prepare_cached( + "INSERT INTO dependent_send_queue_events + (room_id, parent_transaction_id, own_transaction_id, content) + VALUES (?, ?, ?, ?)", + )? + .execute((room_id_value, parent_txn_id, own_txn_id, content))?; + Ok(()) } } diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 8cdcf1256ee..f25b65dbe29 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1045,6 +1045,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { send_state: EventSendState::NotSentYet, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), + created_at: send_handle.clone().and_then(|h| h.created_at), } .into(), diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 2890b6eab8e..3f5cd48c953 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use as_variant::as_variant; use matrix_sdk::{send_queue::SendHandle, Error}; -use ruma::{EventId, OwnedEventId, OwnedTransactionId}; +use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId}; use super::TimelineEventItemId; @@ -30,6 +30,8 @@ pub(in crate::timeline) struct LocalEventTimelineItem { pub transaction_id: OwnedTransactionId, /// A handle to manipulate this event before it is sent, if possible. pub send_handle: Option, + /// The time that the event was created locally + pub created_at: Option, } impl LocalEventTimelineItem { diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index a10f9fc95a6..f632e9371a7 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -268,6 +268,11 @@ impl EventTimelineItem { as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.send_state) } + /// Get the local time that the event was enqueued at. + pub fn local_created_at(&self) -> Option { + as_variant!(&self.kind, EventTimelineItemKind::Local(local) => local.created_at).flatten() + } + /// Get the unique identifier of this item. /// /// Returns the transaction ID for a local echo item that has not been sent diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index f5a85de4495..cdb76d387a6 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -309,7 +309,7 @@ async fn test_no_reuse_of_counters() { let local_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet{ .. })); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index 2d715963ab4..8c83a5f80cb 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -174,7 +174,7 @@ async fn test_retry_failed() { // First, local echo is added. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); }); // Sending fails, because the error is a transient one that's recoverable, @@ -320,7 +320,7 @@ async fn test_cancel_failed() { // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); }); // Sending fails, the mock server has no matching route diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index d9d27689180..9318c058aee 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -273,7 +273,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { // Local echoes are updated with the failed send state as soon as the error // response has been received. assert_let!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next().await); - let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable } => (error, is_recoverable)); + let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable, .. } => (error, is_recoverable)); // The error is not recoverable. assert!(!is_recoverable); @@ -292,7 +292,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { assert_eq!(initial.len(), 1); assert_eq!(initial[0].content().as_message().unwrap().body(), "wall of text"); assert_let!( - Some(EventSendState::SendingFailed { error, is_recoverable }) = initial[0].send_state() + Some(EventSendState::SendingFailed { error, is_recoverable, .. }) = initial[0].send_state() ); // Same recoverable status as above. diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 749917b19f5..cec15f9dc53 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -162,7 +162,7 @@ use ruma::{ AnyMessageLikeEventContent, EventContent as _, Mentions, }, serde::Raw, - OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -444,7 +444,8 @@ impl RoomSendQueue { let content = SerializableEventContent::from_raw(content, event_type); - let transaction_id = self.inner.queue.push(content.clone().into()).await?; + let created_at = MilliSecondsSinceUnixEpoch::now(); + let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?; trace!(%transaction_id, "manager sends a raw event to the background task"); self.inner.notifier.notify_one(); @@ -453,6 +454,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: transaction_id.clone(), media_handles: None, + created_at: Some(created_at), }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -949,6 +951,7 @@ impl QueueStorage { async fn push( &self, request: QueuedRequestKind, + created_at: MilliSecondsSinceUnixEpoch, ) -> Result { let transaction_id = TransactionId::new(); @@ -960,6 +963,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, transaction_id.clone(), + created_at, request, Self::LOW_PRIORITY, ) @@ -1115,6 +1119,7 @@ impl QueueStorage { &self.room_id, transaction_id, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await?; @@ -1155,6 +1160,7 @@ impl QueueStorage { &self.room_id, transaction_id, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::EditEvent { new_content: serializable }, ) .await?; @@ -1174,11 +1180,13 @@ impl QueueStorage { /// Push requests (and dependents) to upload a media. /// /// See the module-level description for details of the whole processus. + #[allow(clippy::too_many_arguments)] async fn push_media( &self, event: RoomMessageEventContent, content_type: Mime, send_event_txn: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, @@ -1186,7 +1194,6 @@ impl QueueStorage { let guard = self.store.lock().await; let client = guard.client()?; let store = client.store(); - let thumbnail_info = if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = thumbnail @@ -1198,6 +1205,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, upload_thumbnail_txn.clone(), + created_at, QueuedRequestKind::MediaUpload { content_type: thumbnail_content_type.to_string(), cache_key: thumbnail_media_request, @@ -1214,6 +1222,7 @@ impl QueueStorage { &self.room_id, &upload_thumbnail_txn, upload_file_txn.clone().into(), + created_at, DependentQueuedRequestKind::UploadFileWithThumbnail { content_type: content_type.to_string(), cache_key: file_media_request, @@ -1229,6 +1238,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, upload_file_txn.clone(), + created_at, QueuedRequestKind::MediaUpload { content_type: content_type.to_string(), cache_key: file_media_request, @@ -1248,6 +1258,7 @@ impl QueueStorage { &self.room_id, &upload_file_txn, send_event_txn.into(), + created_at, DependentQueuedRequestKind::FinishUpload { local_echo: event, file_upload: upload_file_txn.clone(), @@ -1265,6 +1276,7 @@ impl QueueStorage { &self, transaction_id: &TransactionId, key: String, + created_at: MilliSecondsSinceUnixEpoch, ) -> Result, RoomSendQueueStorageError> { let guard = self.store.lock().await; let client = guard.client()?; @@ -1294,6 +1306,7 @@ impl QueueStorage { &self.room_id, transaction_id, reaction_txn_id.clone(), + created_at, DependentQueuedRequestKind::ReactEvent { key }, ) .await?; @@ -1322,6 +1335,7 @@ impl QueueStorage { room: room.clone(), transaction_id: queued.transaction_id, media_handles: None, + created_at: queued.created_at, }, send_error: queued.error, }, @@ -1381,6 +1395,7 @@ impl QueueStorage { upload_thumbnail_txn: thumbnail_info.map(|info| info.txn), upload_file_txn: file_upload, }), + created_at: dep.created_at, }, send_error: None, }, @@ -1404,6 +1419,7 @@ impl QueueStorage { client: &Client, dependent_request: DependentQueuedRequest, new_updates: &mut Vec, + created_at: MilliSecondsSinceUnixEpoch, ) -> Result { let store = client.store(); @@ -1470,6 +1486,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, dependent_request.own_transaction_id.into(), + created_at, serializable.into(), Self::HIGH_PRIORITY, ) @@ -1557,6 +1574,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, dependent_request.own_transaction_id.into(), + created_at, serializable.into(), Self::HIGH_PRIORITY, ) @@ -1659,7 +1677,15 @@ impl QueueStorage { for dependent in canonicalized_dependent_requests { let dependent_id = dependent.own_transaction_id.clone(); - match self.try_apply_single_dependent_request(&client, dependent, new_updates).await { + match self + .try_apply_single_dependent_request( + &client, + dependent, + new_updates, + MilliSecondsSinceUnixEpoch::now(), + ) + .await + { Ok(should_remove) => { if should_remove { // The dependent request has been successfully applied, forget about it. @@ -1887,6 +1913,9 @@ pub struct SendHandle { /// Additional handles for a media upload. media_handles: Option, + + /// The time that this send handle was first created + pub created_at: Option, } impl SendHandle { @@ -2073,8 +2102,9 @@ impl SendHandle { ) -> Result, RoomSendQueueStorageError> { trace!("received an intent to react"); + let created_at = MilliSecondsSinceUnixEpoch::now(); if let Some(reaction_txn_id) = - self.room.inner.queue.react(&self.transaction_id, key.clone()).await? + self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await? { trace!("successfully queued react"); @@ -2138,6 +2168,7 @@ impl SendReactionHandle { room: self.room.clone(), transaction_id: self.transaction_id.clone().into(), media_handles: None, + created_at: Some(MilliSecondsSinceUnixEpoch::now()), }; handle.abort().await @@ -2209,12 +2240,44 @@ mod tests { use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder}; use ruma::{ events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent}, - room_id, TransactionId, + room_id, MilliSecondsSinceUnixEpoch, TransactionId, }; use super::canonicalize_dependent_requests; use crate::{client::WeakClient, test_utils::logged_in_client}; + #[test] + fn test_canonicalize_dependent_events_created_at() { + // Test to ensure the created_at field is being serialized and retrieved + // correctly. + let txn = TransactionId::new(); + let created_at = MilliSecondsSinceUnixEpoch::now(); + + let edit = DependentQueuedRequest { + own_transaction_id: ChildTransactionId::new(), + parent_transaction_id: txn.clone(), + kind: DependentQueuedRequestKind::EditEvent { + new_content: SerializableEventContent::new( + &RoomMessageEventContent::text_plain("edit").into(), + ) + .unwrap(), + }, + parent_key: None, + created_at: Some(created_at), + }; + + let res = canonicalize_dependent_requests(&[edit]); + + assert_eq!(res.len(), 1); + assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind); + assert_let!( + AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap() + ); + assert_eq!(msg.body(), "edit"); + assert_eq!(res[0].parent_transaction_id, txn); + assert_eq!(res[0].created_at, Some(created_at)); + } + #[async_test] async fn test_client_no_cycle_with_send_queue() { for enabled in [true, false] { @@ -2275,6 +2338,7 @@ mod tests { .unwrap(), }, parent_key: None, + created_at: None, }; let res = canonicalize_dependent_requests(&[edit]); @@ -2295,6 +2359,7 @@ mod tests { parent_transaction_id: txn.clone(), kind: DependentQueuedRequestKind::RedactEvent, parent_key: None, + created_at: None, }; let edit = DependentQueuedRequest { @@ -2307,6 +2372,7 @@ mod tests { .unwrap(), }, parent_key: None, + created_at: None, }; inputs.push({ @@ -2346,6 +2412,7 @@ mod tests { .unwrap(), }, parent_key: None, + created_at: None, }) .collect::>(); @@ -2377,6 +2444,7 @@ mod tests { kind: DependentQueuedRequestKind::RedactEvent, parent_transaction_id: txn1.clone(), parent_key: None, + created_at: None, }, // This one pertains to txn2. DependentQueuedRequest { @@ -2389,6 +2457,7 @@ mod tests { }, parent_transaction_id: txn2.clone(), parent_key: None, + created_at: None, }, ]; @@ -2419,6 +2488,7 @@ mod tests { kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() }, parent_transaction_id: txn.clone(), parent_key: None, + created_at: None, }; let edit_id = ChildTransactionId::new(); @@ -2432,6 +2502,7 @@ mod tests { }, parent_transaction_id: txn, parent_key: None, + created_at: None, }; let res = canonicalize_dependent_requests(&[react, edit]); diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 1d951b7caf2..98de76324bf 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -28,7 +28,7 @@ use ruma::{ room::message::{FormattedBody, MessageType, RoomMessageEventContent}, AnyMessageLikeEventContent, Mentions, }, - OwnedTransactionId, TransactionId, + MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId, }; use tracing::{debug, error, instrument, trace, warn, Span}; @@ -185,6 +185,7 @@ impl RoomSendQueue { config.mentions, ); + let created_at = MilliSecondsSinceUnixEpoch::now(); // Save requests in the queue storage. self.inner .queue @@ -192,6 +193,7 @@ impl RoomSendQueue { event_content.clone(), content_type, send_event_txn.clone().into(), + created_at, upload_file_txn.clone(), file_media_request, queue_thumbnail_info, @@ -206,6 +208,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: send_event_txn.clone().into(), media_handles: Some(MediaHandles { upload_thumbnail_txn, upload_file_txn }), + created_at: Some(created_at), }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -306,6 +309,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, event_txn, + MilliSecondsSinceUnixEpoch::now(), new_content.into(), Self::HIGH_PRIORITY, ) @@ -350,7 +354,13 @@ impl QueueStorage { client .store() - .save_send_queue_request(&self.room_id, next_upload_txn, request, Self::HIGH_PRIORITY) + .save_send_queue_request( + &self.room_id, + next_upload_txn, + MilliSecondsSinceUnixEpoch::now(), + request, + Self::HIGH_PRIORITY, + ) .await .map_err(RoomSendQueueStorageError::StateStoreError)?; @@ -579,6 +589,7 @@ impl QueueStorage { &self.room_id, txn, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::EditEvent { new_content: new_serialized }, ) .await?; From 1a996e8b92a797ce6a110b26fad22ebaa13af3ba Mon Sep 17 00:00:00 2001 From: Daniel Salinas Date: Fri, 20 Dec 2024 08:39:50 -0600 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Benjamin Bouvier Signed-off-by: Daniel Salinas --- .../matrix-sdk-base/src/store/memory_store.rs | 18 ++++++------- .../matrix-sdk-base/src/store/send_queue.rs | 9 +++---- .../src/state_store/mod.rs | 5 ++-- crates/matrix-sdk-sqlite/src/state_store.rs | 7 +++-- .../src/timeline/event_handler.rs | 2 +- .../src/timeline/event_item/mod.rs | 2 +- .../matrix-sdk-ui/src/timeline/tests/echo.rs | 2 +- .../tests/integration/timeline/echo.rs | 4 +-- .../tests/integration/timeline/queue.rs | 4 +-- crates/matrix-sdk/src/send_queue/mod.rs | 26 +++++++++---------- crates/matrix-sdk/src/send_queue/upload.rs | 2 +- 11 files changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 5c9c98d2de1..94c10c95f8f 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -754,15 +754,13 @@ impl StateStore for MemoryStore { kind: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { - self.inner.write().unwrap().send_queue_events.entry(room_id.to_owned()).or_default().push( - QueuedRequest { - kind, - transaction_id, - error: None, - priority, - created_at: Some(created_at), - }, - ); + self.inner + .write() + .unwrap() + .send_queue_events + .entry(room_id.to_owned()) + .or_default() + .push(QueuedRequest { kind, transaction_id, error: None, priority, created_at }); Ok(()) } @@ -875,7 +873,7 @@ impl StateStore for MemoryStore { parent_transaction_id: parent_transaction_id.to_owned(), own_transaction_id, parent_key: None, - created_at: Some(created_at), + created_at, }); Ok(()) } diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 289def806e6..8c87b6c5ebb 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -133,8 +133,8 @@ pub struct QueuedRequest { /// should be handled. pub priority: usize, - /// The time that the request was original attempted. - pub created_at: Option, + /// The time that the request was originally attempted. + pub created_at: MilliSecondsSinceUnixEpoch, } impl QueuedRequest { @@ -376,9 +376,8 @@ pub struct DependentQueuedRequest { /// returned by the server once the local echo has been sent out. pub parent_key: Option, - /// The time that the request was original attempted. - #[serde(skip_serializing_if = "Option::is_none")] - pub created_at: Option, + /// The time that the request was originally attempted. + pub created_at: MilliSecondsSinceUnixEpoch, } impl DependentQueuedRequest { diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index c3fff22647e..c625c5b4999 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -476,7 +476,7 @@ impl PersistedQueuedRequest { transaction_id: self.transaction_id, error, priority, - created_at: self.created_at, + created_at: self.created_at.unwrap_or(MilliSecondsSinceUnixEpoch::now()), }) } } @@ -1402,6 +1402,7 @@ impl_state_store!({ || Ok(Vec::new()), |val| self.deserialize_value::>(&val), )?; + // Push the new request. prev.push(PersistedQueuedRequest { room_id: room_id.to_owned(), @@ -1608,7 +1609,7 @@ impl_state_store!({ parent_transaction_id: parent_txn_id.to_owned(), own_transaction_id: own_txn_id, parent_key: None, - created_at: Some(created_at), + created_at, }); // Save the new vector into db. diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index c0e58dcbbe5..abf2fcb2300 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -1862,7 +1862,8 @@ impl StateStore for SqliteStateStore { let mut requests = Vec::with_capacity(res.len()); for entry in res { - let created_at = UInt::new(entry.4).map(MilliSecondsSinceUnixEpoch); + let created_at = UInt::new(entry.4) + .map_or_else(|| MilliSecondsSinceUnixEpoch::now(), MilliSecondsSinceUnixEpoch); requests.push(QueuedRequest { transaction_id: entry.0.into(), kind: self.deserialize_json(&entry.1)?, @@ -2059,7 +2060,8 @@ impl StateStore for SqliteStateStore { let mut dependent_events = Vec::with_capacity(res.len()); for entry in res { - let created_at = UInt::new(entry.4).map(MilliSecondsSinceUnixEpoch); + let created_at = UInt::new(entry.4) + .map_or_else(|| MilliSecondsSinceUnixEpoch::now(), MilliSecondsSinceUnixEpoch); dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), parent_transaction_id: entry.1.into(), @@ -2467,6 +2469,7 @@ mod migration_tests { Ok(()) } + fn add_dependent_send_queue_event_v7( this: &SqliteStateStore, txn: &Transaction<'_>, diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index f25b65dbe29..11a0ea32432 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1045,7 +1045,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { send_state: EventSendState::NotSentYet, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), - created_at: send_handle.clone().and_then(|h| h.created_at), + created_at: send_handle.clone().map(|h| h.created_at), } .into(), diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index f632e9371a7..607f3efcfe0 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -268,7 +268,7 @@ impl EventTimelineItem { as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.send_state) } - /// Get the local time that the event was enqueued at. + /// Get the time that the local event was pushed in the send queue at. pub fn local_created_at(&self) -> Option { as_variant!(&self.kind, EventTimelineItemKind::Local(local) => local.created_at).flatten() } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index cdb76d387a6..f5a85de4495 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -309,7 +309,7 @@ async fn test_no_reuse_of_counters() { let local_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet{ .. })); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index 8c83a5f80cb..2d715963ab4 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -174,7 +174,7 @@ async fn test_retry_failed() { // First, local echo is added. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); }); // Sending fails, because the error is a transient one that's recoverable, @@ -320,7 +320,7 @@ async fn test_cancel_failed() { // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); }); // Sending fails, the mock server has no matching route diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 9318c058aee..d9d27689180 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -273,7 +273,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { // Local echoes are updated with the failed send state as soon as the error // response has been received. assert_let!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next().await); - let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable, .. } => (error, is_recoverable)); + let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable } => (error, is_recoverable)); // The error is not recoverable. assert!(!is_recoverable); @@ -292,7 +292,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { assert_eq!(initial.len(), 1); assert_eq!(initial[0].content().as_message().unwrap().body(), "wall of text"); assert_let!( - Some(EventSendState::SendingFailed { error, is_recoverable, .. }) = initial[0].send_state() + Some(EventSendState::SendingFailed { error, is_recoverable }) = initial[0].send_state() ); // Same recoverable status as above. diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index cec15f9dc53..a526fa48efa 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -454,7 +454,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: transaction_id.clone(), media_handles: None, - created_at: Some(created_at), + created_at, }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -1915,7 +1915,7 @@ pub struct SendHandle { media_handles: Option, /// The time that this send handle was first created - pub created_at: Option, + pub created_at: MilliSecondsSinceUnixEpoch, } impl SendHandle { @@ -2168,7 +2168,7 @@ impl SendReactionHandle { room: self.room.clone(), transaction_id: self.transaction_id.clone().into(), media_handles: None, - created_at: Some(MilliSecondsSinceUnixEpoch::now()), + created_at: MilliSecondsSinceUnixEpoch::now(), }; handle.abort().await @@ -2263,7 +2263,7 @@ mod tests { .unwrap(), }, parent_key: None, - created_at: Some(created_at), + created_at, }; let res = canonicalize_dependent_requests(&[edit]); @@ -2275,7 +2275,7 @@ mod tests { ); assert_eq!(msg.body(), "edit"); assert_eq!(res[0].parent_transaction_id, txn); - assert_eq!(res[0].created_at, Some(created_at)); + assert_eq!(res[0].created_at, created_at); } #[async_test] @@ -2338,7 +2338,7 @@ mod tests { .unwrap(), }, parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }; let res = canonicalize_dependent_requests(&[edit]); @@ -2359,7 +2359,7 @@ mod tests { parent_transaction_id: txn.clone(), kind: DependentQueuedRequestKind::RedactEvent, parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }; let edit = DependentQueuedRequest { @@ -2372,7 +2372,7 @@ mod tests { .unwrap(), }, parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }; inputs.push({ @@ -2412,7 +2412,7 @@ mod tests { .unwrap(), }, parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }) .collect::>(); @@ -2444,7 +2444,7 @@ mod tests { kind: DependentQueuedRequestKind::RedactEvent, parent_transaction_id: txn1.clone(), parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }, // This one pertains to txn2. DependentQueuedRequest { @@ -2457,7 +2457,7 @@ mod tests { }, parent_transaction_id: txn2.clone(), parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }, ]; @@ -2488,7 +2488,7 @@ mod tests { kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() }, parent_transaction_id: txn.clone(), parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }; let edit_id = ChildTransactionId::new(); @@ -2502,7 +2502,7 @@ mod tests { }, parent_transaction_id: txn, parent_key: None, - created_at: None, + created_at: MilliSecondsSinceUnixEpoch::now(), }; let res = canonicalize_dependent_requests(&[react, edit]); diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 98de76324bf..bf5a5f88a1b 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -208,7 +208,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: send_event_txn.clone().into(), media_handles: Some(MediaHandles { upload_thumbnail_txn, upload_file_txn }), - created_at: Some(created_at), + created_at, }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { From eb594cc79cbaef89a1a48c7ddc9bde84da96d8c8 Mon Sep 17 00:00:00 2001 From: Daniel Salinas Date: Fri, 20 Dec 2024 09:26:31 -0600 Subject: [PATCH 3/4] Expose created_at from SendHandle directly instead of unpacking --- crates/matrix-sdk-sqlite/src/state_store.rs | 4 ++-- crates/matrix-sdk-ui/src/timeline/event_handler.rs | 1 - crates/matrix-sdk-ui/src/timeline/event_item/local.rs | 4 +--- crates/matrix-sdk-ui/src/timeline/event_item/mod.rs | 5 ++++- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index abf2fcb2300..8df4e75fcc8 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -1863,7 +1863,7 @@ impl StateStore for SqliteStateStore { let mut requests = Vec::with_capacity(res.len()); for entry in res { let created_at = UInt::new(entry.4) - .map_or_else(|| MilliSecondsSinceUnixEpoch::now(), MilliSecondsSinceUnixEpoch); + .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch); requests.push(QueuedRequest { transaction_id: entry.0.into(), kind: self.deserialize_json(&entry.1)?, @@ -2061,7 +2061,7 @@ impl StateStore for SqliteStateStore { let mut dependent_events = Vec::with_capacity(res.len()); for entry in res { let created_at = UInt::new(entry.4) - .map_or_else(|| MilliSecondsSinceUnixEpoch::now(), MilliSecondsSinceUnixEpoch); + .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch); dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), parent_transaction_id: entry.1.into(), diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 11a0ea32432..8cdcf1256ee 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1045,7 +1045,6 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { send_state: EventSendState::NotSentYet, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), - created_at: send_handle.clone().map(|h| h.created_at), } .into(), diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 3f5cd48c953..2890b6eab8e 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use as_variant::as_variant; use matrix_sdk::{send_queue::SendHandle, Error}; -use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId}; +use ruma::{EventId, OwnedEventId, OwnedTransactionId}; use super::TimelineEventItemId; @@ -30,8 +30,6 @@ pub(in crate::timeline) struct LocalEventTimelineItem { pub transaction_id: OwnedTransactionId, /// A handle to manipulate this event before it is sent, if possible. pub send_handle: Option, - /// The time that the event was created locally - pub created_at: Option, } impl LocalEventTimelineItem { diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 607f3efcfe0..47918eba1e1 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -270,7 +270,10 @@ impl EventTimelineItem { /// Get the time that the local event was pushed in the send queue at. pub fn local_created_at(&self) -> Option { - as_variant!(&self.kind, EventTimelineItemKind::Local(local) => local.created_at).flatten() + match &self.kind { + EventTimelineItemKind::Local(local) => local.send_handle.as_ref().map(|s| s.created_at), + EventTimelineItemKind::Remote(_) => None, + } } /// Get the unique identifier of this item. From 3da29a418f303458afb1bbbb9cdb160b32f4aa5b Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 13 Jan 2025 15:26:09 +0100 Subject: [PATCH 4/4] stylistic changes --- .../matrix-sdk-indexeddb/src/state_store/mod.rs | 12 ++++++++---- crates/matrix-sdk-sqlite/src/state_store.rs | 12 ++++++++---- crates/matrix-sdk/src/send_queue/mod.rs | 17 ++++------------- crates/matrix-sdk/src/send_queue/upload.rs | 1 + 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index c625c5b4999..ff8142104fb 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -443,8 +443,8 @@ struct PersistedQueuedRequest { priority: Option, /// The time the original message was first attempted to be sent at. - #[serde(skip_serializing_if = "Option::is_none")] - created_at: Option, + #[serde(default = "created_now")] + created_at: MilliSecondsSinceUnixEpoch, // Migrated fields: keep these private, they're not used anymore elsewhere in the code base. /// Deprecated (from old format), now replaced with error field. @@ -453,6 +453,10 @@ struct PersistedQueuedRequest { event: Option, } +fn created_now() -> MilliSecondsSinceUnixEpoch { + MilliSecondsSinceUnixEpoch::now() +} + impl PersistedQueuedRequest { fn into_queued_request(self) -> Option { let kind = @@ -476,7 +480,7 @@ impl PersistedQueuedRequest { transaction_id: self.transaction_id, error, priority, - created_at: self.created_at.unwrap_or(MilliSecondsSinceUnixEpoch::now()), + created_at: self.created_at, }) } } @@ -1412,7 +1416,7 @@ impl_state_store!({ is_wedged: None, event: None, priority: Some(priority), - created_at: Some(created_at), + created_at, }); // Save the new vector into db. diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 8df4e75fcc8..e47fca481e1 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -1847,7 +1847,7 @@ impl StateStore for SqliteStateStore { // Note: ROWID is always present and is an auto-incremented integer counter. We // want to maintain the insertion order, so we can sort using it. // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, Vec, Option>, usize, u64)> = self + let res: Vec<(String, Vec, Option>, usize, Option)> = self .acquire() .await? .prepare( @@ -1862,7 +1862,9 @@ impl StateStore for SqliteStateStore { let mut requests = Vec::with_capacity(res.len()); for entry in res { - let created_at = UInt::new(entry.4) + let created_at = entry + .4 + .and_then(UInt::new) .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch); requests.push(QueuedRequest { transaction_id: entry.0.into(), @@ -2045,7 +2047,7 @@ impl StateStore for SqliteStateStore { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); // Note: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, String, Option>, Vec, u64)> = self + let res: Vec<(String, String, Option>, Vec, Option)> = self .acquire() .await? .prepare( @@ -2060,7 +2062,9 @@ impl StateStore for SqliteStateStore { let mut dependent_events = Vec::with_capacity(res.len()); for entry in res { - let created_at = UInt::new(entry.4) + let created_at = entry + .4 + .and_then(UInt::new) .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch); dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index a526fa48efa..be839ec39a7 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -1419,7 +1419,6 @@ impl QueueStorage { client: &Client, dependent_request: DependentQueuedRequest, new_updates: &mut Vec, - created_at: MilliSecondsSinceUnixEpoch, ) -> Result { let store = client.store(); @@ -1486,7 +1485,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, dependent_request.own_transaction_id.into(), - created_at, + dependent_request.created_at, serializable.into(), Self::HIGH_PRIORITY, ) @@ -1574,7 +1573,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, dependent_request.own_transaction_id.into(), - created_at, + dependent_request.created_at, serializable.into(), Self::HIGH_PRIORITY, ) @@ -1677,15 +1676,7 @@ impl QueueStorage { for dependent in canonicalized_dependent_requests { let dependent_id = dependent.own_transaction_id.clone(); - match self - .try_apply_single_dependent_request( - &client, - dependent, - new_updates, - MilliSecondsSinceUnixEpoch::now(), - ) - .await - { + match self.try_apply_single_dependent_request(&client, dependent, new_updates).await { Ok(should_remove) => { if should_remove { // The dependent request has been successfully applied, forget about it. @@ -1914,7 +1905,7 @@ pub struct SendHandle { /// Additional handles for a media upload. media_handles: Option, - /// The time that this send handle was first created + /// The time at which the event to be sent has been created. pub created_at: MilliSecondsSinceUnixEpoch, } diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index bf5a5f88a1b..96a0df45858 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -186,6 +186,7 @@ impl RoomSendQueue { ); let created_at = MilliSecondsSinceUnixEpoch::now(); + // Save requests in the queue storage. self.inner .queue