Skip to content

Commit

Permalink
feat: typing indicator receiver events and observability (WPB-4591) (#…
Browse files Browse the repository at this point in the history
…2069)

* feat: add boilerplate for typing indicator handling

* feat: add boilerplate for typing indicator handling

* feat: add boilerplate for typing indicator handling

* feat: add boilerplate for typing indicator handling + storage

* feat: add boilerplate for worker

* feat: remove out of scope in this pr

* feat: cleanup

* feat: cleanup

* chore: pr comments

* feat: add coverage to repo

* feat: add coverage to event receiver

* feat: add coverage to typing handler

* fix: pr comments

* fix: pr comments thread safe ops

* fix: pr comments thread safe ops

* fix: pr comments map per user

* fix: pr comments map per user

* feat: pr comments, threadsafe mutating collection value

* feat: pr comments, threadsafe mutating collection value

* feat: pr comments, threadsafe mutating collection value
  • Loading branch information
yamilmedina authored Sep 25, 2023
1 parent 47c310d commit 048b06f
Show file tree
Hide file tree
Showing 16 changed files with 537 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ data class Conversation(
}

enum class ReceiptMode { DISABLED, ENABLED }
enum class TypingIndicatorMode { STARTED, STOPPED }

@Suppress("MagicNumber")
enum class CipherSuite(val tag: Int) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.data.conversation

import co.touchlab.stately.collections.ConcurrentMutableMap
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.util.safeComputeAndMutateSetValue
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

internal interface TypingIndicatorRepository {
fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId)
fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId)
suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>>
}

internal class TypingIndicatorRepositoryImpl(
private val userTypingCache: ConcurrentMutableMap<ConversationId, MutableSet<ExpiringUserTyping>>
) : TypingIndicatorRepository {

private val userTypingDataSourceFlow: MutableSharedFlow<Unit> =
MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

override fun addTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
userTypingCache.safeComputeAndMutateSetValue(conversationId) { ExpiringUserTyping(userId, Clock.System.now()) }
userTypingDataSourceFlow.tryEmit(Unit)
}

override fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
userTypingCache.block { entry ->
entry[conversationId]?.apply { this.removeAll { it.userId == userId } }
}
userTypingDataSourceFlow.tryEmit(Unit)
}

override suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>> {
return userTypingDataSourceFlow
.map { userTypingCache.filterUsersIdTypingInConversation(conversationId) }
.onStart { emit(userTypingCache.filterUsersIdTypingInConversation(conversationId)) }
.distinctUntilChanged()
}
}

private fun ConcurrentMutableMap<ConversationId, MutableSet<ExpiringUserTyping>>.filterUsersIdTypingInConversation(
conversationId: ConversationId
) = this[conversationId]?.map { it.userId }?.toSet().orEmpty()

// todo expire by worker
class ExpiringUserTyping(
val userId: UserId,
val date: Instant
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.data.client.Client
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.Conversation.Member
import com.wire.kalium.logic.data.conversation.Conversation.ReceiptMode
import com.wire.kalium.logic.data.conversation.Conversation.TypingIndicatorMode
import com.wire.kalium.logic.data.conversation.MutedConversationStatus
import com.wire.kalium.logic.data.featureConfig.ClassifiedDomainsModel
import com.wire.kalium.logic.data.featureConfig.ConferenceCallingModel
Expand Down Expand Up @@ -361,6 +363,23 @@ sealed class Event(open val id: String, open val transient: Boolean) {
) : Conversation(id, transient, conversationId) {
override fun toLogMap(): Map<String, Any?> = mapOf(typeKey to "Conversation.CodeDeleted")
}

data class TypingIndicator(
override val id: String,
override val conversationId: ConversationId,
override val transient: Boolean,
val senderUserId: UserId,
val timestampIso: String,
val typingIndicatorMode: TypingIndicatorMode,
) : Conversation(id, transient, conversationId) {
override fun toLogMap(): Map<String, Any?> = mapOf(
typeKey to "Conversation.TypingIndicator",
conversationIdKey to conversationId.toLogString(),
"typingIndicatorMode" to typingIndicatorMode.name,
senderUserIdKey to senderUserId.toLogString(),
timestampIsoKey to timestampIso
)
}
}

sealed class Team(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.wire.kalium.logic.data.id.SubconversationId
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.util.Base64
import com.wire.kalium.network.api.base.authenticated.conversation.TypingIndicatorStatus
import com.wire.kalium.network.api.base.authenticated.featureConfigs.FeatureConfigData
import com.wire.kalium.network.api.base.authenticated.notification.EventContentDTO
import com.wire.kalium.network.api.base.authenticated.notification.EventResponse
Expand Down Expand Up @@ -83,6 +84,7 @@ class EventMapper(
is EventContentDTO.Unknown -> unknown(id, transient, eventContentDTO)
is EventContentDTO.Conversation.AccessUpdate -> unknown(id, transient, eventContentDTO)
is EventContentDTO.Conversation.DeletedConversationDTO -> conversationDeleted(id, eventContentDTO, transient)

is EventContentDTO.Conversation.ConversationRenameDTO -> conversationRenamed(id, eventContentDTO, transient)
is EventContentDTO.Team.MemberJoin -> teamMemberJoined(id, eventContentDTO, transient)
is EventContentDTO.Team.MemberLeave -> teamMemberLeft(id, eventContentDTO, transient)
Expand All @@ -92,12 +94,32 @@ class EventMapper(
is EventContentDTO.UserProperty.PropertiesSetDTO -> updateUserProperties(id, eventContentDTO, transient)
is EventContentDTO.UserProperty.PropertiesDeleteDTO -> deleteUserProperties(id, eventContentDTO, transient)
is EventContentDTO.Conversation.ReceiptModeUpdate -> conversationReceiptModeUpdate(id, eventContentDTO, transient)

is EventContentDTO.Conversation.MessageTimerUpdate -> conversationMessageTimerUpdate(id, eventContentDTO, transient)

is EventContentDTO.Conversation.CodeDeleted -> conversationCodeDeleted(id, eventContentDTO, transient)
is EventContentDTO.Conversation.CodeUpdated -> conversationCodeUpdated(id, eventContentDTO, transient)
is EventContentDTO.Federation -> federationTerminated(id, eventContentDTO, transient)
is EventContentDTO.Conversation.ConversationTypingDTO -> conversationTyping(id, eventContentDTO, transient)
}

private fun conversationTyping(
id: String,
eventContentDTO: EventContentDTO.Conversation.ConversationTypingDTO,
transient: Boolean
): Event =
Event.Conversation.TypingIndicator(
id,
eventContentDTO.qualifiedConversation.toModel(),
transient,
eventContentDTO.qualifiedFrom.toModel(),
eventContentDTO.time,
when (eventContentDTO.status.status) {
TypingIndicatorStatus.STARTED -> Conversation.TypingIndicatorMode.STARTED
TypingIndicatorStatus.STOPPED -> Conversation.TypingIndicatorMode.STOPPED
}
)

private fun federationTerminated(id: String, eventContentDTO: EventContentDTO.Federation, transient: Boolean): Event =
when (eventContentDTO) {
is EventContentDTO.Federation.FederationConnectionRemovedDTO -> Event.Federation.ConnectionRemoved(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ import com.wire.kalium.logic.data.message.CompositeMessageRepository
import com.wire.kalium.logic.data.message.IsMessageSentInSelfConversationUseCase
import com.wire.kalium.logic.data.message.IsMessageSentInSelfConversationUseCaseImpl
import com.wire.kalium.logic.data.message.MessageDataSource
import com.wire.kalium.logic.data.message.MessageMetadataSource
import com.wire.kalium.logic.data.message.MessageMetadataRepository
import com.wire.kalium.logic.data.message.MessageMetadataSource
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.message.PersistMessageUseCaseImpl
Expand Down Expand Up @@ -160,8 +160,6 @@ import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManager
import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManagerImpl
import com.wire.kalium.logic.feature.conversation.GetConversationVerificationStatusUseCase
import com.wire.kalium.logic.feature.conversation.GetConversationVerificationStatusUseCaseImpl
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCase
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCaseImpl
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationsUseCase
Expand All @@ -172,6 +170,8 @@ import com.wire.kalium.logic.feature.conversation.LeaveSubconversationUseCase
import com.wire.kalium.logic.feature.conversation.LeaveSubconversationUseCaseImpl
import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManager
import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManagerImpl
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.RecoverMLSConversationsUseCase
Expand Down Expand Up @@ -261,7 +261,6 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.network.ApiMigrationManager
import com.wire.kalium.logic.network.ApiMigrationV3
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.logic.network.SessionManagerImpl
import com.wire.kalium.logic.sync.AvsSyncStateReporter
import com.wire.kalium.logic.sync.AvsSyncStateReporterImpl
Expand All @@ -281,8 +280,6 @@ import com.wire.kalium.logic.sync.incremental.IncrementalSyncManager
import com.wire.kalium.logic.sync.incremental.IncrementalSyncRecoveryHandlerImpl
import com.wire.kalium.logic.sync.incremental.IncrementalSyncWorker
import com.wire.kalium.logic.sync.incremental.IncrementalSyncWorkerImpl
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCase
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCaseImpl
import com.wire.kalium.logic.sync.receiver.ConversationEventReceiver
import com.wire.kalium.logic.sync.receiver.ConversationEventReceiverImpl
import com.wire.kalium.logic.sync.receiver.FeatureConfigEventReceiver
Expand Down Expand Up @@ -325,18 +322,22 @@ import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventH
import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventHandlerImpl
import com.wire.kalium.logic.sync.receiver.conversation.message.ProteusMessageUnpacker
import com.wire.kalium.logic.sync.receiver.conversation.message.ProteusMessageUnpackerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandler
import com.wire.kalium.logic.sync.receiver.handler.ButtonActionConfirmationHandler
import com.wire.kalium.logic.sync.receiver.handler.ButtonActionConfirmationHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.ClearConversationContentHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdateHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdatedHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.MessageTextEditHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.DeleteForMeHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.DeleteMessageHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.LastReadContentHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.MessageTextEditHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.ReceiptMessageHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.TypingIndicatorHandler
import com.wire.kalium.logic.sync.receiver.handler.TypingIndicatorHandlerImpl
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCase
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCaseImpl
import com.wire.kalium.logic.sync.slow.SlowSlowSyncCriteriaProviderImpl
import com.wire.kalium.logic.sync.slow.SlowSyncCriteriaProvider
import com.wire.kalium.logic.sync.slow.SlowSyncManager
Expand All @@ -345,6 +346,7 @@ import com.wire.kalium.logic.sync.slow.SlowSyncRecoveryHandlerImpl
import com.wire.kalium.logic.sync.slow.SlowSyncWorker
import com.wire.kalium.logic.sync.slow.SlowSyncWorkerImpl
import com.wire.kalium.logic.util.MessageContentEncoder
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.network.networkContainer.AuthenticatedNetworkContainer
import com.wire.kalium.network.session.SessionManager
import com.wire.kalium.persistence.client.ClientRegistrationStorage
Expand Down Expand Up @@ -1154,6 +1156,9 @@ class UserSessionScope internal constructor(
conversationDAO = userStorage.database.conversationDAO
)

private val typingIndicatorHandler: TypingIndicatorHandler
get() = TypingIndicatorHandlerImpl(conversations.typingIndicatorRepository)

private val conversationEventReceiver: ConversationEventReceiver by lazy {
ConversationEventReceiverImpl(
newMessageHandler,
Expand All @@ -1167,7 +1172,8 @@ class UserSessionScope internal constructor(
receiptModeUpdateEventHandler,
conversationMessageTimerEventHandler,
conversationCodeUpdateHandler,
conversationCodeDeletedHandler
conversationCodeDeletedHandler,
typingIndicatorHandler
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.wire.kalium.logic.feature.conversation

import co.touchlab.stately.collections.ConcurrentMutableMap
import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.configuration.server.ServerConfigRepository
import com.wire.kalium.logic.data.connection.ConnectionRepository
Expand All @@ -26,6 +27,7 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl
import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl
import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider
import com.wire.kalium.logic.data.id.QualifiedIdMapper
import com.wire.kalium.logic.data.message.PersistMessageUseCase
Expand Down Expand Up @@ -258,4 +260,10 @@ class ConversationScope internal constructor(
serverConfigRepository,
selfUserId
)

internal val typingIndicatorRepository = TypingIndicatorRepositoryImpl(ConcurrentMutableMap())

val observeUsersTyping: ObserveUsersTypingUseCase
get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.conversation

import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.UserId
import kotlinx.coroutines.flow.Flow

/**
* Use case for observing current users typing in a given conversation.
*/
interface ObserveUsersTypingUseCase {
suspend fun invoke(conversationId: ConversationId): Flow<Set<UserId>>
}

internal class ObserveUsersTypingUseCaseImpl(
private val typingIndicatorRepository: TypingIndicatorRepository
) : ObserveUsersTypingUseCase {
override suspend fun invoke(conversationId: ConversationId): Flow<Set<UserId>> =
typingIndicatorRepository.observeUsersTyping(conversationId)

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.sync.receiver.conversation.RenamedConversationEvent
import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdatedHandler
import com.wire.kalium.logic.sync.receiver.handler.TypingIndicatorHandler

internal interface ConversationEventReceiver : EventReceiver<Event.Conversation>

Expand All @@ -51,7 +52,8 @@ internal class ConversationEventReceiverImpl(
private val receiptModeUpdateEventHandler: ReceiptModeUpdateEventHandler,
private val conversationMessageTimerEventHandler: ConversationMessageTimerEventHandler,
private val codeUpdatedHandler: CodeUpdatedHandler,
private val codeDeletedHandler: CodeDeletedHandler
private val codeDeletedHandler: CodeDeletedHandler,
private val typingIndicatorHandler: TypingIndicatorHandler
) : ConversationEventReceiver {
override suspend fun onEvent(event: Event.Conversation): Either<CoreFailure, Unit> {
// TODO: Make sure errors are accounted for by each handler.
Expand Down Expand Up @@ -111,6 +113,7 @@ internal class ConversationEventReceiverImpl(
is Event.Conversation.ConversationMessageTimer -> conversationMessageTimerEventHandler.handle(event)
is Event.Conversation.CodeDeleted -> codeDeletedHandler.handle(event)
is Event.Conversation.CodeUpdated -> codeUpdatedHandler.handle(event)
is Event.Conversation.TypingIndicator -> typingIndicatorHandler.handle(event)
}
}
}
Loading

0 comments on commit 048b06f

Please sign in to comment.