Skip to content

Commit

Permalink
refactor: fetch pending receipt ids within a time window (#2864)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorhugods authored Jul 5, 2024
1 parent c01866c commit e497778
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ internal interface MessageRepository {
suspend fun getAllPendingMessagesFromUser(senderUserId: UserId): Either<CoreFailure, List<Message>>
suspend fun getPendingConfirmationMessagesByConversationAfterDate(
conversationId: ConversationId,
afterDateTime: Instant,
untilDateTime: Instant,
visibility: List<Message.Visibility> = Message.Visibility.entries
): Either<CoreFailure, List<String>>

Expand Down Expand Up @@ -533,10 +535,14 @@ internal class MessageDataSource internal constructor(

override suspend fun getPendingConfirmationMessagesByConversationAfterDate(
conversationId: ConversationId,
afterDateTime: Instant,
untilDateTime: Instant,
visibility: List<Message.Visibility>
): Either<CoreFailure, List<String>> = wrapStorageRequest {
messageDAO.getPendingToConfirmMessagesByConversationAndVisibilityAfterDate(
messageDAO.getMessageIdsThatExpectReadConfirmationWithinDates(
conversationId.toDao(),
afterDateTime,
untilDateTime,
visibility.map { it.toEntityVisibility() }
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ class ConversationScope internal constructor(
selfUserId,
selfConversationIdProvider,
sendConfirmation,
scope
scope,
kaliumLogger
)

val updateConversationAccess: UpdateConversationAccessRoleUseCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@
package com.wire.kalium.logic.feature.conversation

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.feature.message.SendConfirmationUseCase
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
Expand All @@ -52,7 +58,8 @@ class UpdateConversationReadDateUseCase internal constructor(
private val selfUserId: UserId,
private val selfConversationIdProvider: SelfConversationIdProvider,
private val sendConfirmation: SendConfirmationUseCase,
private val scope: CoroutineScope
private val scope: CoroutineScope,
private val logger: KaliumLogger = kaliumLogger
) {

/**
Expand All @@ -61,11 +68,23 @@ class UpdateConversationReadDateUseCase internal constructor(
*/
operator fun invoke(conversationId: QualifiedID, time: Instant) {
scope.launch {
sendConfirmation(conversationId)
conversationRepository.updateConversationReadDate(conversationId, time)
selfConversationIdProvider().flatMap { selfConversationIds ->
selfConversationIds.foldToEitherWhileRight(Unit) { selfConversationId, _ ->
sendLastReadMessageToOtherClients(conversationId, selfConversationId, time)
conversationRepository.observeCacheDetailsById(conversationId).flatMap {
it.first()?.let { Either.Right(it) } ?: Either.Left(StorageFailure.DataNotFound)
}.onFailure {
logger.w("Failed to update conversation read date; StorageFailure $it")
}.onSuccess { conversation ->
launch {
sendConfirmation(conversationId, conversation.lastReadDate, time)
}
launch {
conversationRepository.updateConversationReadDate(conversationId, time)
}
launch {
selfConversationIdProvider().flatMap { selfConversationIds ->
selfConversationIds.foldToEitherWhileRight(Unit) { selfConversationId, _ ->
sendLastReadMessageToOtherClients(conversationId, selfConversationId, time)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.receipt.ReceiptType
import com.wire.kalium.logic.data.properties.UserPropertyRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
Expand All @@ -41,6 +41,7 @@ import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.util.serialization.toJsonElement
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json

Expand Down Expand Up @@ -69,12 +70,19 @@ internal class SendConfirmationUseCase internal constructor(

private val logger by lazy { kaliumLogger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.MESSAGES) }

suspend operator fun invoke(conversationId: ConversationId): Either<CoreFailure, Unit> {
suspend operator fun invoke(
conversationId: ConversationId,
afterDateTime: Instant,
untilDateTime: Instant
): Either<CoreFailure, Unit> {
syncManager.waitUntilLive()

val messageIds = getPendingUnreadMessagesIds(conversationId)
val messageIds = getPendingUnreadMessagesIds(
conversationId,
afterDateTime,
untilDateTime
)
if (messageIds.isEmpty()) {

logConfirmationNothingToSend(conversationId)
return Either.Right(Unit)
}
Expand All @@ -100,13 +108,20 @@ internal class SendConfirmationUseCase internal constructor(
}
}

private suspend fun getPendingUnreadMessagesIds(conversationId: ConversationId): List<String> =
private suspend fun getPendingUnreadMessagesIds(
conversationId: ConversationId,
afterDateTime: Instant,
untilDateTime: Instant
): List<String> =
if (isReceiptsEnabledForConversation(conversationId)) {
messageRepository.getPendingConfirmationMessagesByConversationAfterDate(conversationId)
.fold({
logger.e("$TAG There was an unknown error trying to get messages pending read confirmation $it")
emptyList()
}, { it })
messageRepository.getPendingConfirmationMessagesByConversationAfterDate(
conversationId,
afterDateTime,
untilDateTime,
).fold({
logger.e("$TAG There was an unknown error trying to get messages pending read confirmation $it")
emptyList()
}, { it })
} else emptyList()

private suspend fun isReceiptsEnabledForConversation(conversationId: ConversationId) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,12 @@ class MessageRepositoryTest {
messageDAO.getMessagesByConversationAndVisibility(any(), any(), any(), any())
}.returns(flowOf(messages))
coEvery {
messageDAO.getPendingToConfirmMessagesByConversationAndVisibilityAfterDate(any(), any())
messageDAO.getMessageIdsThatExpectReadConfirmationWithinDates(
any(),
any(),
any(),
any()
)
}.returns(messages.map { it.id })
return this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package com.wire.kalium.logic.feature.message

import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.properties.UserPropertyRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.framework.TestClient
import com.wire.kalium.logic.framework.TestConversation
import com.wire.kalium.logic.framework.TestMessage
Expand All @@ -31,20 +31,16 @@ import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.logic.util.shouldSucceed
import io.mockative.Mock
import io.mockative.any

import io.mockative.coEvery
import io.mockative.every
import io.mockative.coVerify
import io.mockative.coEvery
import io.mockative.coEvery
import io.mockative.eq
import io.mockative.mock
import io.mockative.once
import io.mockative.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds

@OptIn(ExperimentalCoroutinesApi::class)
class SendConfirmationUseCaseTest {

@Test
Expand All @@ -57,7 +53,10 @@ class SendConfirmationUseCaseTest {
.withSendMessageSuccess()
.arrange()

val result = sendConfirmation(TestConversation.ID)
val after = Instant.DISTANT_PAST
val until = after + 10.seconds

val result = sendConfirmation(TestConversation.ID, after, until)

result.shouldSucceed()
coVerify {
Expand All @@ -75,15 +74,23 @@ class SendConfirmationUseCaseTest {
.withSendMessageSuccess()
.arrange()

val result = sendConfirmation(TestConversation.ID)
val after = Instant.DISTANT_PAST
val until = after + 10.seconds

val result = sendConfirmation(TestConversation.ID, after, until)

result.shouldSucceed()
coVerify {
arrangement.messageSender.sendMessage(any(), any())
}.wasNotInvoked()

coVerify {
arrangement.messageRepository.getPendingConfirmationMessagesByConversationAfterDate(any(), any())
arrangement.messageRepository.getPendingConfirmationMessagesByConversationAfterDate(
any(),
eq(after),
eq(until),
any()
)
}.wasNotInvoked()
}

Expand Down Expand Up @@ -133,7 +140,7 @@ class SendConfirmationUseCaseTest {

suspend fun withPendingMessagesResponse() = apply {
coEvery {
messageRepository.getPendingConfirmationMessagesByConversationAfterDate(any(), any())
messageRepository.getPendingConfirmationMessagesByConversationAfterDate(any(), any(), any(), any())
}.returns(Either.Right(listOf(TestMessage.TEXT_MESSAGE.id)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ SELECT * FROM MessageDetailsView WHERE conversationId = ? AND senderUserId = ? A
selectByConversationIdAndSenderIdAndType:
SELECT * FROM MessageDetailsView WHERE conversationId = ? AND senderUserId = ? AND contentType = ?;

selectPendingMessagesIdsByConversationIdAndVisibilityAfterDate:
selectMessageIdsThatExpectReadConfirmationWithinDates:
SELECT Message.id
FROM Message
LEFT JOIN SelfUser
Expand All @@ -527,7 +527,8 @@ WHERE
Message.conversation_id = ?
AND Message.visibility IN ?
AND (Message.expects_read_confirmation = 1)
AND Message.creation_date > Conversation.last_read_date
AND Message.creation_date > ?
AND Message.creation_date <= ?
AND (Message.sender_user_id != SelfUser.id)
ORDER BY
Message.creation_date DESC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ interface MessageDAO {
clientId: String,
)

suspend fun getPendingToConfirmMessagesByConversationAndVisibilityAfterDate(
suspend fun getMessageIdsThatExpectReadConfirmationWithinDates(
conversationId: QualifiedIDEntity,
afterDate: Instant,
untilDate: Instant,
visibility: List<MessageEntity.Visibility> = MessageEntity.Visibility.entries
): List<String>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,17 @@ internal class MessageDAOImpl internal constructor(
queries.markMessagesAsDecryptionResolved(userId, clientId)
}

override suspend fun getPendingToConfirmMessagesByConversationAndVisibilityAfterDate(
override suspend fun getMessageIdsThatExpectReadConfirmationWithinDates(
conversationId: QualifiedIDEntity,
afterDate: Instant,
untilDate: Instant,
visibility: List<MessageEntity.Visibility>
): List<String> = withContext(coroutineContext) {
queries.selectPendingMessagesIdsByConversationIdAndVisibilityAfterDate(
conversationId, visibility
queries.selectMessageIdsThatExpectReadConfirmationWithinDates(
conversation_id = conversationId,
visibility = visibility,
creation_date = afterDate,
creation_date_ = untilDate
).executeAsList()
}

Expand Down
Loading

0 comments on commit e497778

Please sign in to comment.