Skip to content

Commit

Permalink
perf: improve in-memory cache [WPB-8645] (#2891)
Browse files Browse the repository at this point in the history
* perf: improve in-memory cache

Replaced the LRUCache with a new FlowCache implementation to optimize flow handling in memory caching. Updated corresponding cache usages across DAOs and tests to ensure compatibility and improved efficiency.

* test: verify that cached flow is cancelled

* docs: improve FlowCache documentation

* style: remove unused import

* ci: enable kvm

* test: use same dispatcher for backup tests
  • Loading branch information
vitorhugods authored Jul 24, 2024
1 parent f534d36 commit 69d3227
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 239 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/gradle-android-instrumented-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ jobs:
with:
api-level: ${{ matrix.api-level }}

- name: Enable KVM group perms
run: |
echo 'KERNEL=="kvm", GROUP="kvm", MODE="0666", OPTIONS+="static_node=kvm"' | sudo tee /etc/udev/rules.d/99-kvm4all.rules
sudo udevadm control --reload-rules
sudo udevadm trigger --name-match=kvm
- name: Android Instrumentation Tests
uses: reactivecircus/android-emulator-runner@v2
env:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.persistence.cache

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* In-memory cache for sharing flows.
* This aims to bundle all interested collectors
* and read from a single upstream source, reducing IO reading.
*
* New collectors will get the latest value immediately.
* It converts produced flows into shared flows with a replay cache of 1.
*
* Each individual call to [get] will get its own buffer (size of 1),
* and oldest values are dropped if the collector is slow.
*
* Once the cached flows have no more collectors, the flows are removed from memory after [flowTimeoutDuration].
*
* Once the [cacheScope] is canceled, the whole cache stops.
*/
internal class FlowCache<Key : Any, Value>(
private val cacheScope: CoroutineScope,
private val flowTimeoutDuration: Duration = FLOW_OBSERVING_TIMEOUT_IN_MILLIS.milliseconds,
) {

private val mutex = Mutex()
private val storage = hashMapOf<Key, Flow<Value>>()

suspend fun get(
key: Key,
flowProducer: suspend (key: Key) -> Flow<Value>
): Flow<Value> {
suspend fun createFlow() = flowProducer(key)
.onCompletion {
remove(key)
}
.shareIn(
scope = cacheScope,
started = SharingStarted.WhileSubscribed(
stopTimeoutMillis = flowTimeoutDuration.inWholeMilliseconds
),
replay = 1
)

return mutex.withLock {
val result = storage.getOrPut(key) {
createFlow()
}
result
}.distinctUntilChanged()
.buffer(
capacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
}

suspend fun remove(key: Key) = mutex.withLock {
storage.remove(key)
}

companion object {
const val FLOW_OBSERVING_TIMEOUT_IN_MILLIS = 5_000L
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package com.wire.kalium.persistence.dao

import app.cash.sqldelight.coroutines.asFlow
import com.wire.kalium.persistence.MetadataQueries
import com.wire.kalium.persistence.cache.Cache
import com.wire.kalium.persistence.cache.FlowCache
import com.wire.kalium.persistence.util.JsonSerializer
import com.wire.kalium.persistence.util.mapToOneOrNull
import kotlinx.coroutines.CoroutineScope
Expand All @@ -35,7 +35,7 @@ import kotlin.coroutines.CoroutineContext

class MetadataDAOImpl internal constructor(
private val metadataQueries: MetadataQueries,
private val metadataCache: Cache<String, Flow<String?>>,
private val metadataCache: FlowCache<String, String?>,
private val databaseScope: CoroutineScope,
private val queriesContext: CoroutineContext
) : MetadataDAO {
Expand All @@ -48,12 +48,12 @@ class MetadataDAOImpl internal constructor(
metadataQueries.deleteValue(key)
}

override suspend fun valueByKeyFlow(key: String): Flow<String?> = metadataCache.get(key) {
override suspend fun valueByKeyFlow(
key: String
): Flow<String?> = metadataCache.get(key) {
metadataQueries.selectValueByKey(key)
.asFlow()
.mapToOneOrNull()
.distinctUntilChanged()
.shareIn(databaseScope, SharingStarted.Eagerly, 1)
}

override suspend fun valueByKey(key: String): String? = withContext(queriesContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ package com.wire.kalium.persistence.dao

import app.cash.sqldelight.coroutines.asFlow
import com.wire.kalium.persistence.UsersQueries
import com.wire.kalium.persistence.cache.Cache
import com.wire.kalium.persistence.cache.FlowCache
import com.wire.kalium.persistence.util.mapToList
import com.wire.kalium.persistence.util.mapToOneOrNull
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted.Companion.Lazily
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.withContext
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
Expand Down Expand Up @@ -160,8 +157,7 @@ class UserMapper {
@Suppress("TooManyFunctions")
class UserDAOImpl internal constructor(
private val userQueries: UsersQueries,
private val userCache: Cache<UserIDEntity, Flow<UserDetailsEntity?>>,
private val databaseScope: CoroutineScope,
private val userCache: FlowCache<UserIDEntity, UserDetailsEntity?>,
private val queriesContext: CoroutineContext
) : UserDAO {

Expand Down Expand Up @@ -276,7 +272,6 @@ class UserDAOImpl internal constructor(
.asFlow()
.mapToOneOrNull()
.map { it?.let { mapper.toDetailsModel(it) } }
.shareIn(databaseScope, Lazily, 1)
}

override suspend fun getUserDetailsWithTeamByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<Pair<UserDetailsEntity, TeamEntity?>?> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import app.cash.sqldelight.coroutines.asFlow
import com.wire.kalium.persistence.ConversationsQueries
import com.wire.kalium.persistence.MembersQueries
import com.wire.kalium.persistence.UnreadEventsQueries
import com.wire.kalium.persistence.cache.Cache
import com.wire.kalium.persistence.cache.FlowCache
import com.wire.kalium.persistence.dao.ConversationIDEntity
import com.wire.kalium.persistence.dao.QualifiedIDEntity
import com.wire.kalium.persistence.dao.UserIDEntity
Expand All @@ -32,12 +32,9 @@ import com.wire.kalium.persistence.util.mapToOneOrDefault
import com.wire.kalium.persistence.util.mapToOneOrNull
import com.wire.kalium.util.DateTimeUtil
import com.wire.kalium.util.DateTimeUtil.toIsoDateTimeString
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted.Companion.Lazily
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.withContext
import kotlinx.datetime.Instant
import kotlinx.datetime.toInstant
Expand All @@ -53,12 +50,12 @@ internal val MLS_DEFAULT_CIPHER_SUITE = ConversationEntity.CipherSuite.MLS_128_D
// Even if they operate on the same table underneath, these DAOs can represent/do different things.
@Suppress("TooManyFunctions")
internal class ConversationDAOImpl internal constructor(
private val conversationCache: Cache<ConversationIDEntity, Flow<ConversationViewEntity?>>,
private val conversationDetailsCache: FlowCache<ConversationIDEntity, ConversationViewEntity?>,
private val conversationCache: FlowCache<ConversationIDEntity, ConversationEntity?>,
private val conversationQueries: ConversationsQueries,
private val memberQueries: MembersQueries,
private val unreadEventsQueries: UnreadEventsQueries,
private val coroutineContext: CoroutineContext,
private val databaseScope: CoroutineScope
) : ConversationDAO {

private val conversationMapper = ConversationMapper()
Expand Down Expand Up @@ -206,15 +203,18 @@ internal class ConversationDAOImpl internal constructor(
}

override suspend fun observeGetConversationByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<ConversationViewEntity?> {
return conversationQueries.selectByQualifiedId(qualifiedID)
.asFlow()
.mapToOneOrNull()
.flowOn(coroutineContext)
.map { it?.let { conversationMapper.toModel(it) } }
return conversationDetailsCache.get(qualifiedID) {
conversationQueries.selectByQualifiedId(qualifiedID)
.asFlow()
.mapToOneOrNull()
.map { it?.let { conversationMapper.toModel(it) } }
}
}

override suspend fun observeGetConversationBaseInfoByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<ConversationEntity?> {
return conversationQueries.selectConversationByQualifiedId(qualifiedID, conversationMapper::toModel)
override suspend fun observeGetConversationBaseInfoByQualifiedID(
qualifiedID: QualifiedIDEntity
): Flow<ConversationEntity?> = conversationCache.get(qualifiedID) {
conversationQueries.selectConversationByQualifiedId(qualifiedID, conversationMapper::toModel)
.asFlow()
.mapToOneOrNull()
.flowOn(coroutineContext)
Expand Down Expand Up @@ -492,13 +492,12 @@ internal class ConversationDAOImpl internal constructor(
}
}

override suspend fun observeConversationDetailsById(conversationId: QualifiedIDEntity): Flow<ConversationViewEntity?> =
conversationCache.get(conversationId) {
conversationQueries.selectByQualifiedId(conversationId)
.asFlow()
.mapToOneOrNull()
.flowOn(coroutineContext)
.map { it?.let { conversationMapper.toModel(it) } }
.shareIn(databaseScope, Lazily, 1)
}
override suspend fun observeConversationDetailsById(
conversationId: QualifiedIDEntity
): Flow<ConversationViewEntity?> = conversationDetailsCache.get(conversationId) {
conversationQueries.selectByQualifiedId(conversationId)
.asFlow()
.mapToOneOrNull()
.map { it?.let { conversationMapper.toModel(it) } }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import app.cash.sqldelight.coroutines.asFlow
import com.wire.kalium.persistence.ConversationsQueries
import com.wire.kalium.persistence.MembersQueries
import com.wire.kalium.persistence.UsersQueries
import com.wire.kalium.persistence.cache.FlowCache
import com.wire.kalium.persistence.dao.ConversationIDEntity
import com.wire.kalium.persistence.dao.QualifiedIDEntity
import com.wire.kalium.persistence.dao.UserIDEntity
Expand Down Expand Up @@ -68,6 +69,7 @@ interface MemberDAO {

@Suppress("TooManyFunctions")
internal class MemberDAOImpl internal constructor(
private val membersCache: FlowCache<ConversationIDEntity, List<MemberEntity>>,
private val memberQueries: MembersQueries,
private val userQueries: UsersQueries,
private val conversationsQueries: ConversationsQueries,
Expand Down Expand Up @@ -140,8 +142,10 @@ internal class MemberDAOImpl internal constructor(
}
}

override suspend fun observeConversationMembers(qualifiedID: QualifiedIDEntity): Flow<List<MemberEntity>> {
return memberQueries.selectAllMembersByConversation(qualifiedID)
override suspend fun observeConversationMembers(
qualifiedID: QualifiedIDEntity
): Flow<List<MemberEntity>> = membersCache.get(qualifiedID) {
memberQueries.selectAllMembersByConversation(qualifiedID)
.asFlow()
.flowOn(coroutineContext)
.mapToList()
Expand Down
Loading

0 comments on commit 69d3227

Please sign in to comment.