From 8676f0421b7f280227f27709c37bcecb6d5a7229 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 20 Mar 2024 21:45:28 -0700 Subject: [PATCH] add corks to coalesce writes for several heavy calltrees. Signed-off-by: Jason Volk --- src/api/client_server/sync.rs | 4 ++++ src/service/rooms/timeline/mod.rs | 3 +++ src/service/sending/mod.rs | 6 ++++++ 3 files changed, 13 insertions(+) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 22a4c49c8..e0aa6965c 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -197,6 +197,10 @@ async fn sync_helper( .extend(services().users.keys_changed(sender_user.as_ref(), since, None).filter_map(std::result::Result::ok)); let all_joined_rooms = services().rooms.state_cache.rooms_joined(&sender_user).collect::>(); + + // Coalesce database writes for the remainder of this scope. + let _cork = services().globals.db.cork_and_flush()?; + for room_id in all_joined_rooms { let room_id = room_id?; if let Ok(joined_room) = load_joined_room( diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 0d9ed3beb..b91843879 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -221,6 +221,9 @@ impl Service { leaves: Vec, state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result> { + // Coalesce database writes for the remainder of this scope. + let _cork = services().globals.db.cork_and_flush()?; + let shortroomid = services().rooms.short.get_shortroomid(&pdu.room_id)?.expect("room exists"); // Make unsigned fields correct. This is not properly documented in the spec, diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 8c8d0243a..fc5ce5863 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -148,6 +148,7 @@ impl Service { Some(response) = futures.next() => { match response { Ok(outgoing_kind) => { + let _cork = services().globals.db.cork(); self.db.delete_all_active_requests_for(&outgoing_kind)?; // Find events that have been added since starting the last request @@ -202,6 +203,7 @@ impl Service { let mut retry = false; let mut allow = true; + let _cork = services().globals.db.cork(); let entry = current_transaction_status.entry(outgoing_kind.clone()); entry @@ -384,6 +386,7 @@ impl Service { pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let event = SendingEventType::Pdu(pdu_id.to_owned()); + let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); @@ -396,6 +399,7 @@ impl Service { .into_iter() .map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) .collect::>(); + let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&requests.iter().map(|(o, e)| (o, e.clone())).collect::>())?; for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) { self.sender.send((outgoing_kind.clone(), event, key)).unwrap(); @@ -408,6 +412,7 @@ impl Service { pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec, id: u64) -> Result<()> { let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let event = SendingEventType::Edu(serialized); + let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); @@ -418,6 +423,7 @@ impl Service { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { let outgoing_kind = OutgoingKind::Appservice(appservice_id); let event = SendingEventType::Pdu(pdu_id); + let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();