Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send calling message in order [WPB-10051] #2920

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.feature.call.scenario.CallingMessageSender
import com.wire.kalium.logic.feature.call.scenario.OnActiveSpeakers
import com.wire.kalium.logic.feature.call.scenario.OnAnsweredCall
import com.wire.kalium.logic.feature.call.scenario.OnClientsRequest
Expand Down Expand Up @@ -95,9 +96,9 @@ class CallManagerImpl internal constructor(
private val callRepository: CallRepository,
private val userRepository: UserRepository,
private val currentClientIdProvider: CurrentClientIdProvider,
private val selfConversationIdProvider: SelfConversationIdProvider,
selfConversationIdProvider: SelfConversationIdProvider,
private val conversationRepository: ConversationRepository,
private val messageSender: MessageSender,
messageSender: MessageSender,
private val callMapper: CallMapper,
private val federatedIdMapper: FederatedIdMapper,
private val qualifiedIdMapper: QualifiedIdMapper,
Expand All @@ -114,6 +115,14 @@ class CallManagerImpl internal constructor(
private val scope = CoroutineScope(job + kaliumDispatchers.io)
private val deferredHandle: Deferred<Handle> = startHandleAsync()

private val callingMessageSender = CallingMessageSender(
deferredHandle,
calling,
messageSender,
scope,
selfConversationIdProvider
)

private val strongReferences = Collections.synchronizedList(mutableListOf<Any>())
private fun <T : Any> T.keepingStrongReference(): T {
strongReferences.add(this)
Expand Down Expand Up @@ -167,15 +176,11 @@ class CallManagerImpl internal constructor(
}.keepingStrongReference(),
// TODO(refactor): inject all of these CallbackHandlers in class constructor
sendHandler = OnSendOTR(
deferredHandle,
calling,
qualifiedIdMapper,
selfUserId,
selfClientId,
messageSender,
selfConversationIdProvider,
scope,
callMapper
qualifiedIdMapper = qualifiedIdMapper,
selfUserId = selfUserId,
selfClientId = selfClientId,
callMapper = callMapper,
callingMessageSender = callingMessageSender,
).keepingStrongReference(),
sftRequestHandler = OnSFTRequest(deferredHandle, calling, callRepository, scope)
.keepingStrongReference(),
Expand Down Expand Up @@ -442,6 +447,13 @@ class CallManagerImpl internal constructor(
initActiveSpeakersHandler()
initRequestNewEpochHandler()
initSelfUserMuteHandler()
initCallingMessageSender()
}

private fun initCallingMessageSender() {
scope.launch {
callingMessageSender.processQueue()
}
}

private fun initParticipantsHandler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.call.scenario

import com.sun.jna.Pointer
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageTarget
import com.wire.kalium.logic.data.user.UserId

/**
* Represents the instructions for sending a calling message.
*
* @property context The pointer context for the message (optional).
* @property callHostConversationId The ID of the conversation where the call is taking place.
* @property messageString The content of the message.
* @property avsSelfUserId The self user ID used by AVS.
* @property avsSelfClientId The self client ID used by AVS.
* @property messageTarget The target for sending the message.
*/
data class CallingMessageInstructions(
val context: Pointer?,
val callHostConversationId: ConversationId,
val messageString: String,
val avsSelfUserId: UserId,
val avsSelfClientId: ClientId,
val messageTarget: CallingMessageTarget
)

sealed interface CallingMessageTarget {
val specificTarget: MessageTarget

/**
* Send the message only to other devices of self-user.
*/
data object Self : CallingMessageTarget {
override val specificTarget: MessageTarget
get() = MessageTarget.Conversation()
}

/**
* Send the message to the host conversation.
* Supports ignoring users through the [specificTarget].
*/
data class HostConversation(
override val specificTarget: MessageTarget = MessageTarget.Conversation()
) : CallingMessageTarget
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.call.scenario

import com.benasher44.uuid.uuid4
import com.sun.jna.Pointer
import com.wire.kalium.calling.Calling
import com.wire.kalium.calling.types.Handle
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.callingLogger
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.MessageTarget
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.util.DateTimeUtil
import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch

internal interface CallingMessageSender {

suspend fun processQueue()

@Suppress("LongParameterList")
fun enqueueSendingOfCallingMessage(
context: Pointer?,
callHostConversationId: ConversationId,
messageString: String?,
avsSelfUserId: UserId,
avsSelfClientId: ClientId,
messageTarget: CallingMessageTarget,
)
}

@Suppress("FunctionNaming")
internal fun CallingMessageSender(
handle: Deferred<Handle>,
calling: Calling,
messageSender: MessageSender,
callingScope: CoroutineScope,
selfConversationIdProvider: SelfConversationIdProvider
) = object : CallingMessageSender {

private val logger = callingLogger.withTextTag("CallingMessageSender")

private val queue = Channel<CallingMessageInstructions>(
capacity = Channel.UNLIMITED,
)

@Suppress("LongParameterList")
override fun enqueueSendingOfCallingMessage(
context: Pointer?,
callHostConversationId: ConversationId,
messageString: String?,
avsSelfUserId: UserId,
avsSelfClientId: ClientId,
messageTarget: CallingMessageTarget,
) {
if (messageString == null) return
callingScope.launch {
queue.send(
CallingMessageInstructions(
context,
callHostConversationId,
messageString,
avsSelfUserId,
avsSelfClientId,
messageTarget,
)
)
}
}

override suspend fun processQueue() {
queue.consumeAsFlow().collect { messageInstructions ->
processInstruction(messageInstructions, selfConversationIdProvider)
}
}

private suspend fun processInstruction(
messageInstructions: CallingMessageInstructions,
selfConversationIdProvider: SelfConversationIdProvider
) {
val target = messageInstructions.messageTarget

val transportConversationIds = when (target) {
is CallingMessageTarget.Self -> {
selfConversationIdProvider()
}

is CallingMessageTarget.HostConversation -> {
Either.Right(listOf(messageInstructions.callHostConversationId))
}
}

val result = transportConversationIds.flatMap { conversations ->
conversations.foldToEitherWhileRight(Unit) { transportConversationId, _ ->
sendCallingMessage(
messageInstructions.callHostConversationId,
messageInstructions.avsSelfUserId,
messageInstructions.avsSelfClientId,
messageInstructions.messageString,
target.specificTarget,
transportConversationId
)
}
}

val (code, message) = when (result) {
is Either.Right -> {
logger.i("Notifying AVS - Success sending message")
HttpStatusCode.OK.value to ""
}

is Either.Left -> {
logger.i("Notifying AVS - Error sending message")
HttpStatusCode.BadRequest.value to "Couldn't send Calling Message"
}
}
calling.wcall_resp(
inst = handle.await(),
status = code,
reason = message,
arg = messageInstructions.context
)
}

@Suppress("LongParameterList")
private suspend fun sendCallingMessage(
callHostConversationId: ConversationId,
userId: UserId,
clientId: ClientId,
data: String,
messageTarget: MessageTarget,
transportConversationId: ConversationId
): Either<CoreFailure, Unit> {
val messageContent = MessageContent.Calling(data, callHostConversationId)
val message = Message.Signaling(
id = uuid4().toString(),
content = messageContent,
conversationId = transportConversationId,
date = DateTimeUtil.currentIsoDateTimeString(),
senderUserId = userId,
senderClientId = clientId,
status = Message.Status.Sent,
isSelfMessage = true,
expirationData = null
)
return messageSender.sendMessage(message, messageTarget)
}
}
Loading
Loading