Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: typing indicator receiver events and observability (WPB-4591) #2069

Merged
merged 26 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6399438
feat: add boilerplate for typing indicator handling
yamilmedina Sep 15, 2023
72e324d
feat: add boilerplate for typing indicator handling
yamilmedina Sep 15, 2023
b1a7cdb
feat: add boilerplate for typing indicator handling
yamilmedina Sep 15, 2023
de5bac1
feat: add boilerplate for typing indicator handling + storage
yamilmedina Sep 15, 2023
46344d2
feat: add boilerplate for worker
yamilmedina Sep 18, 2023
46982e5
feat: remove out of scope in this pr
yamilmedina Sep 18, 2023
060a8d3
feat: cleanup
yamilmedina Sep 18, 2023
666cecf
Merge branch 'develop' into epic/typing-indicator-receiver
yamilmedina Sep 18, 2023
2df92e9
feat: cleanup
yamilmedina Sep 18, 2023
0bfe0d7
chore: pr comments
yamilmedina Sep 18, 2023
e2e5b95
feat: add coverage to repo
yamilmedina Sep 18, 2023
1f45f08
feat: add coverage to event receiver
yamilmedina Sep 18, 2023
37a2268
feat: add coverage to typing handler
yamilmedina Sep 18, 2023
091d1b6
Merge branch 'develop' into epic/typing-indicator-receiver
yamilmedina Sep 18, 2023
aad9216
Merge branch 'develop' into epic/typing-indicator-receiver
yamilmedina Sep 21, 2023
82851f5
Merge branch 'develop' into epic/typing-indicator-receiver
yamilmedina Sep 21, 2023
7a567fd
Merge branch 'develop' into epic/typing-indicator-receiver
yamilmedina Sep 21, 2023
8aaa732
Merge branch 'develop' into epic/typing-indicator-receiver
yamilmedina Sep 22, 2023
b7e8f96
fix: pr comments
yamilmedina Sep 21, 2023
2057634
fix: pr comments thread safe ops
yamilmedina Sep 22, 2023
21ed23b
fix: pr comments thread safe ops
yamilmedina Sep 22, 2023
e24e1d6
fix: pr comments map per user
yamilmedina Sep 22, 2023
289cf07
fix: pr comments map per user
yamilmedina Sep 22, 2023
ebb1cd5
feat: pr comments, threadsafe mutating collection value
yamilmedina Sep 24, 2023
44b9a91
feat: pr comments, threadsafe mutating collection value
yamilmedina Sep 24, 2023
fdb6f1d
feat: pr comments, threadsafe mutating collection value
yamilmedina Sep 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ data class Conversation(
}

enum class ReceiptMode { DISABLED, ENABLED }
enum class TypingIndicatorMode { STARTED, STOPPED }

@Suppress("MagicNumber")
enum class CipherSuite(val tag: Int) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.data.conversation

import co.touchlab.stately.collections.ConcurrentMutableMap
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.UserId
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

interface TypingIndicatorRepository {
yamilmedina marked this conversation as resolved.
Show resolved Hide resolved
fun addTypingUserTypingInConversation(conversationId: ConversationId, userId: UserId)
yamilmedina marked this conversation as resolved.
Show resolved Hide resolved
fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId)
suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>>
}

class TypingIndicatorRepositoryImpl : TypingIndicatorRepository {

private val userTypingDataSourceFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
private val userTypingCache = ConcurrentMutableMap<ConversationId, Set<ExpiringUserTyping>>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it so this Repository needs to be "static" a.k.a. we need to create one for each user, can't just have getters like most repositories.

It would be nice to make this more explicit by passing the MutableMap<ConversationId, Set<ExpiringUserTyping>> as a parameter in the constructor.

Copy link
Contributor Author

@yamilmedina yamilmedina Sep 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done !


override fun addTypingUserTypingInConversation(conversationId: ConversationId, userId: UserId) {
val newTypingUser = ExpiringUserTyping(userId, Clock.System.now())
val newTypingUsers = userTypingCache[conversationId]?.toMutableSet() ?: mutableSetOf()
newTypingUsers.add(newTypingUser)
userTypingCache[conversationId] = newTypingUsers
vitorhugods marked this conversation as resolved.
Show resolved Hide resolved
userTypingDataSourceFlow.tryEmit(Unit)
}

override fun removeTypingUserInConversation(conversationId: ConversationId, userId: UserId) {
userTypingCache[conversationId] =
userTypingCache[conversationId]?.toMutableSet()?.apply {
this.removeAll { it.userId == userId }
} ?: mutableSetOf()
userTypingDataSourceFlow.tryEmit(Unit)
}

override suspend fun observeUsersTyping(conversationId: ConversationId): Flow<Set<UserId>> {
return userTypingDataSourceFlow
.map { userTypingCache.filterUsersIdTypingInConversation(conversationId) }
.onStart { emit(userTypingCache.filterUsersIdTypingInConversation(conversationId)) }
.distinctUntilChanged()
}
}

private fun ConcurrentMutableMap<ConversationId, Set<ExpiringUserTyping>>.filterUsersIdTypingInConversation(
conversationId: ConversationId
) = this[conversationId]?.map { it.userId }?.toSet().orEmpty()

// todo expire by worker
class ExpiringUserTyping(
val userId: UserId,
val date: Instant
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.data.client.Client
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.Conversation.Member
import com.wire.kalium.logic.data.conversation.Conversation.ReceiptMode
import com.wire.kalium.logic.data.conversation.Conversation.TypingIndicatorMode
import com.wire.kalium.logic.data.conversation.MutedConversationStatus
import com.wire.kalium.logic.data.featureConfig.ClassifiedDomainsModel
import com.wire.kalium.logic.data.featureConfig.ConferenceCallingModel
Expand Down Expand Up @@ -342,6 +344,23 @@ sealed class Event(open val id: String, open val transient: Boolean) {
) : Conversation(id, transient, conversationId) {
override fun toLogMap(): Map<String, Any?> = mapOf(typeKey to "Conversation.CodeDeleted")
}

data class TypingIndicator(
override val id: String,
override val conversationId: ConversationId,
override val transient: Boolean,
val senderUserId: UserId,
val timestampIso: String,
vitorhugods marked this conversation as resolved.
Show resolved Hide resolved
val typingIndicatorMode: TypingIndicatorMode,
) : Conversation(id, transient, conversationId) {
override fun toLogMap(): Map<String, Any?> = mapOf(
typeKey to "Conversation.TypingIndicator",
conversationIdKey to conversationId.toLogString(),
"typingIndicatorMode" to typingIndicatorMode.name,
senderUserIdKey to senderUserId.toLogString(),
timestampIsoKey to timestampIso
)
}
}

sealed class Team(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.data.id.SubconversationId
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.util.Base64
import com.wire.kalium.network.api.base.authenticated.conversation.TypingIndicatorStatus
import com.wire.kalium.network.api.base.authenticated.featureConfigs.FeatureConfigData
import com.wire.kalium.network.api.base.authenticated.notification.EventContentDTO
import com.wire.kalium.network.api.base.authenticated.notification.EventResponse
Expand Down Expand Up @@ -81,6 +82,7 @@ class EventMapper(
is EventContentDTO.Unknown -> unknown(id, transient, eventContentDTO)
is EventContentDTO.Conversation.AccessUpdate -> unknown(id, transient, eventContentDTO)
is EventContentDTO.Conversation.DeletedConversationDTO -> conversationDeleted(id, eventContentDTO, transient)

is EventContentDTO.Conversation.ConversationRenameDTO -> conversationRenamed(id, eventContentDTO, transient)
is EventContentDTO.Team.MemberJoin -> teamMemberJoined(id, eventContentDTO, transient)
is EventContentDTO.Team.MemberLeave -> teamMemberLeft(id, eventContentDTO, transient)
Expand All @@ -90,12 +92,32 @@ class EventMapper(
is EventContentDTO.UserProperty.PropertiesSetDTO -> updateUserProperties(id, eventContentDTO, transient)
is EventContentDTO.UserProperty.PropertiesDeleteDTO -> deleteUserProperties(id, eventContentDTO, transient)
is EventContentDTO.Conversation.ReceiptModeUpdate -> conversationReceiptModeUpdate(id, eventContentDTO, transient)

is EventContentDTO.Conversation.MessageTimerUpdate -> conversationMessageTimerUpdate(id, eventContentDTO, transient)

is EventContentDTO.Conversation.CodeDeleted -> conversationCodeDeleted(id, eventContentDTO, transient)
is EventContentDTO.Conversation.CodeUpdated -> conversationCodeUpdated(id, eventContentDTO, transient)
is EventContentDTO.Federation -> federationTerminated(id, eventContentDTO, transient)
is EventContentDTO.Conversation.ConversationTypingDTO -> conversationTyping(id, eventContentDTO, transient)
}

private fun conversationTyping(
id: String,
eventContentDTO: EventContentDTO.Conversation.ConversationTypingDTO,
transient: Boolean
): Event =
Event.Conversation.TypingIndicator(
id,
eventContentDTO.qualifiedConversation.toModel(),
transient,
eventContentDTO.qualifiedFrom.toModel(),
eventContentDTO.time,
when (eventContentDTO.status.status) {
TypingIndicatorStatus.STARTED -> Conversation.TypingIndicatorMode.STARTED
TypingIndicatorStatus.STOPPED -> Conversation.TypingIndicatorMode.STOPPED
}
)

private fun federationTerminated(id: String, eventContentDTO: EventContentDTO.Federation, transient: Boolean): Event =
when (eventContentDTO) {
is EventContentDTO.Federation.FederationConnectionRemovedDTO -> Event.Federation.ConnectionRemoved(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ import com.wire.kalium.logic.data.message.CompositeMessageRepository
import com.wire.kalium.logic.data.message.IsMessageSentInSelfConversationUseCase
import com.wire.kalium.logic.data.message.IsMessageSentInSelfConversationUseCaseImpl
import com.wire.kalium.logic.data.message.MessageDataSource
import com.wire.kalium.logic.data.message.MessageMetadataSource
import com.wire.kalium.logic.data.message.MessageMetadataRepository
import com.wire.kalium.logic.data.message.MessageMetadataSource
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.message.PersistMessageUseCaseImpl
Expand Down Expand Up @@ -160,8 +160,6 @@ import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManager
import com.wire.kalium.logic.feature.conversation.ConversationsRecoveryManagerImpl
import com.wire.kalium.logic.feature.conversation.GetConversationVerificationStatusUseCase
import com.wire.kalium.logic.feature.conversation.GetConversationVerificationStatusUseCaseImpl
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCase
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationUseCaseImpl
import com.wire.kalium.logic.feature.conversation.JoinExistingMLSConversationsUseCase
Expand All @@ -172,6 +170,8 @@ import com.wire.kalium.logic.feature.conversation.LeaveSubconversationUseCase
import com.wire.kalium.logic.feature.conversation.LeaveSubconversationUseCaseImpl
import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManager
import com.wire.kalium.logic.feature.conversation.MLSConversationsRecoveryManagerImpl
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveOtherUserSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCase
import com.wire.kalium.logic.feature.conversation.ObserveSecurityClassificationLabelUseCaseImpl
import com.wire.kalium.logic.feature.conversation.RecoverMLSConversationsUseCase
Expand Down Expand Up @@ -261,7 +261,6 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.network.ApiMigrationManager
import com.wire.kalium.logic.network.ApiMigrationV3
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.logic.network.SessionManagerImpl
import com.wire.kalium.logic.sync.AvsSyncStateReporter
import com.wire.kalium.logic.sync.AvsSyncStateReporterImpl
Expand All @@ -281,8 +280,6 @@ import com.wire.kalium.logic.sync.incremental.IncrementalSyncManager
import com.wire.kalium.logic.sync.incremental.IncrementalSyncRecoveryHandlerImpl
import com.wire.kalium.logic.sync.incremental.IncrementalSyncWorker
import com.wire.kalium.logic.sync.incremental.IncrementalSyncWorkerImpl
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCase
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCaseImpl
import com.wire.kalium.logic.sync.receiver.ConversationEventReceiver
import com.wire.kalium.logic.sync.receiver.ConversationEventReceiverImpl
import com.wire.kalium.logic.sync.receiver.FeatureConfigEventReceiver
Expand Down Expand Up @@ -325,18 +322,22 @@ import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventH
import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventHandlerImpl
import com.wire.kalium.logic.sync.receiver.conversation.message.ProteusMessageUnpacker
import com.wire.kalium.logic.sync.receiver.conversation.message.ProteusMessageUnpackerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandler
import com.wire.kalium.logic.sync.receiver.handler.ButtonActionConfirmationHandler
import com.wire.kalium.logic.sync.receiver.handler.ButtonActionConfirmationHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.ClearConversationContentHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdateHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdatedHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.MessageTextEditHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.DeleteForMeHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.DeleteMessageHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.LastReadContentHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.MessageTextEditHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.ReceiptMessageHandlerImpl
import com.wire.kalium.logic.sync.receiver.handler.TypingIndicatorHandler
import com.wire.kalium.logic.sync.receiver.handler.TypingIndicatorHandlerImpl
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCase
import com.wire.kalium.logic.sync.slow.RestartSlowSyncProcessForRecoveryUseCaseImpl
import com.wire.kalium.logic.sync.slow.SlowSlowSyncCriteriaProviderImpl
import com.wire.kalium.logic.sync.slow.SlowSyncCriteriaProvider
import com.wire.kalium.logic.sync.slow.SlowSyncManager
Expand All @@ -345,6 +346,7 @@ import com.wire.kalium.logic.sync.slow.SlowSyncRecoveryHandlerImpl
import com.wire.kalium.logic.sync.slow.SlowSyncWorker
import com.wire.kalium.logic.sync.slow.SlowSyncWorkerImpl
import com.wire.kalium.logic.util.MessageContentEncoder
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.network.networkContainer.AuthenticatedNetworkContainer
import com.wire.kalium.network.session.SessionManager
import com.wire.kalium.persistence.client.ClientRegistrationStorage
Expand Down Expand Up @@ -1154,6 +1156,9 @@ class UserSessionScope internal constructor(
conversationDAO = userStorage.database.conversationDAO
)

private val typingIndicatorHandler: TypingIndicatorHandler
get() = TypingIndicatorHandlerImpl(conversations.typingIndicatorRepository)

private val conversationEventReceiver: ConversationEventReceiver by lazy {
ConversationEventReceiverImpl(
newMessageHandler,
Expand All @@ -1167,7 +1172,8 @@ class UserSessionScope internal constructor(
receiptModeUpdateEventHandler,
conversationMessageTimerEventHandler,
conversationCodeUpdateHandler,
conversationCodeDeletedHandler
conversationCodeDeletedHandler,
typingIndicatorHandler
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreatorImpl
import com.wire.kalium.logic.data.conversation.TypingIndicatorRepositoryImpl
import com.wire.kalium.logic.data.conversation.UpdateKeyingMaterialThresholdProvider
import com.wire.kalium.logic.data.id.QualifiedIdMapper
import com.wire.kalium.logic.data.message.PersistMessageUseCase
Expand Down Expand Up @@ -255,4 +256,10 @@ class ConversationScope internal constructor(
serverConfigRepository,
selfUserId
)

val typingIndicatorRepository = TypingIndicatorRepositoryImpl()

val observeUsersTyping: ObserveUsersTypingUseCase
get() = ObserveUsersTypingUseCaseImpl(typingIndicatorRepository)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Wire
* Copyright (C) 2023 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.conversation

import com.wire.kalium.logic.data.conversation.TypingIndicatorRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.UserId
import kotlinx.coroutines.flow.Flow

/**
* Use case for observing current users typing in a given conversation.
*/
interface ObserveUsersTypingUseCase {
suspend fun invoke(conversationId: ConversationId): Flow<Set<UserId>>
}

internal class ObserveUsersTypingUseCaseImpl(
private val typingIndicatorRepository: TypingIndicatorRepository
) : ObserveUsersTypingUseCase {
override suspend fun invoke(conversationId: ConversationId): Flow<Set<UserId>> =
typingIndicatorRepository.observeUsersTyping(conversationId)

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.sync.receiver.conversation.RenamedConversationEvent
import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeDeletedHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdatedHandler
import com.wire.kalium.logic.sync.receiver.handler.TypingIndicatorHandler

internal interface ConversationEventReceiver : EventReceiver<Event.Conversation>

Expand All @@ -51,7 +52,8 @@ internal class ConversationEventReceiverImpl(
private val receiptModeUpdateEventHandler: ReceiptModeUpdateEventHandler,
private val conversationMessageTimerEventHandler: ConversationMessageTimerEventHandler,
private val codeUpdatedHandler: CodeUpdatedHandler,
private val codeDeletedHandler: CodeDeletedHandler
private val codeDeletedHandler: CodeDeletedHandler,
private val typingIndicatorHandler: TypingIndicatorHandler
) : ConversationEventReceiver {
override suspend fun onEvent(event: Event.Conversation): Either<CoreFailure, Unit> {
// TODO: Make sure errors are accounted for by each handler.
Expand Down Expand Up @@ -111,6 +113,7 @@ internal class ConversationEventReceiverImpl(
is Event.Conversation.ConversationMessageTimer -> conversationMessageTimerEventHandler.handle(event)
is Event.Conversation.CodeDeleted -> codeDeletedHandler.handle(event)
is Event.Conversation.CodeUpdated -> codeUpdatedHandler.handle(event)
is Event.Conversation.TypingIndicator -> typingIndicatorHandler.handle(event)
}
}
}
Loading
Loading