diff --git a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/CallManagerImpl.kt b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/CallManagerImpl.kt index ada0a5a6b68..11aff9f124f 100644 --- a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/CallManagerImpl.kt +++ b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/CallManagerImpl.kt @@ -51,6 +51,7 @@ import com.wire.kalium.logic.data.message.Message import com.wire.kalium.logic.data.message.MessageContent import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.data.user.UserRepository +import com.wire.kalium.logic.feature.call.scenario.CallingMessageSender import com.wire.kalium.logic.feature.call.scenario.OnActiveSpeakers import com.wire.kalium.logic.feature.call.scenario.OnAnsweredCall import com.wire.kalium.logic.feature.call.scenario.OnClientsRequest @@ -95,9 +96,9 @@ class CallManagerImpl internal constructor( private val callRepository: CallRepository, private val userRepository: UserRepository, private val currentClientIdProvider: CurrentClientIdProvider, - private val selfConversationIdProvider: SelfConversationIdProvider, + selfConversationIdProvider: SelfConversationIdProvider, private val conversationRepository: ConversationRepository, - private val messageSender: MessageSender, + messageSender: MessageSender, private val callMapper: CallMapper, private val federatedIdMapper: FederatedIdMapper, private val qualifiedIdMapper: QualifiedIdMapper, @@ -114,6 +115,14 @@ class CallManagerImpl internal constructor( private val scope = CoroutineScope(job + kaliumDispatchers.io) private val deferredHandle: Deferred = startHandleAsync() + private val callingMessageSender = CallingMessageSender( + deferredHandle, + calling, + messageSender, + scope, + selfConversationIdProvider + ) + private val strongReferences = Collections.synchronizedList(mutableListOf()) private fun T.keepingStrongReference(): T { strongReferences.add(this) @@ -167,15 +176,11 @@ class CallManagerImpl internal constructor( }.keepingStrongReference(), // TODO(refactor): inject all of these CallbackHandlers in class constructor sendHandler = OnSendOTR( - deferredHandle, - calling, - qualifiedIdMapper, - selfUserId, - selfClientId, - messageSender, - selfConversationIdProvider, - scope, - callMapper + qualifiedIdMapper = qualifiedIdMapper, + selfUserId = selfUserId, + selfClientId = selfClientId, + callMapper = callMapper, + callingMessageSender = callingMessageSender, ).keepingStrongReference(), sftRequestHandler = OnSFTRequest(deferredHandle, calling, callRepository, scope) .keepingStrongReference(), @@ -442,6 +447,13 @@ class CallManagerImpl internal constructor( initActiveSpeakersHandler() initRequestNewEpochHandler() initSelfUserMuteHandler() + initCallingMessageSender() + } + + private fun initCallingMessageSender() { + scope.launch { + callingMessageSender.processQueue() + } } private fun initParticipantsHandler() { diff --git a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageInstructions.kt b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageInstructions.kt new file mode 100644 index 00000000000..15eb39dc347 --- /dev/null +++ b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageInstructions.kt @@ -0,0 +1,63 @@ +/* + * Wire + * Copyright (C) 2024 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.feature.call.scenario + +import com.sun.jna.Pointer +import com.wire.kalium.logic.data.conversation.ClientId +import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.data.message.MessageTarget +import com.wire.kalium.logic.data.user.UserId + +/** + * Represents the instructions for sending a calling message. + * + * @property context The pointer context for the message (optional). + * @property callHostConversationId The ID of the conversation where the call is taking place. + * @property messageString The content of the message. + * @property avsSelfUserId The self user ID used by AVS. + * @property avsSelfClientId The self client ID used by AVS. + * @property messageTarget The target for sending the message. + */ +data class CallingMessageInstructions( + val context: Pointer?, + val callHostConversationId: ConversationId, + val messageString: String, + val avsSelfUserId: UserId, + val avsSelfClientId: ClientId, + val messageTarget: CallingMessageTarget +) + +sealed interface CallingMessageTarget { + val specificTarget: MessageTarget + + /** + * Send the message only to other devices of self-user. + */ + data object Self : CallingMessageTarget { + override val specificTarget: MessageTarget + get() = MessageTarget.Conversation() + } + + /** + * Send the message to the host conversation. + * Supports ignoring users through the [specificTarget]. + */ + data class HostConversation( + override val specificTarget: MessageTarget = MessageTarget.Conversation() + ) : CallingMessageTarget +} diff --git a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageSender.kt b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageSender.kt new file mode 100644 index 00000000000..cb94e430fb9 --- /dev/null +++ b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageSender.kt @@ -0,0 +1,177 @@ +/* + * Wire + * Copyright (C) 2024 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ + +package com.wire.kalium.logic.feature.call.scenario + +import com.benasher44.uuid.uuid4 +import com.sun.jna.Pointer +import com.wire.kalium.calling.Calling +import com.wire.kalium.calling.types.Handle +import com.wire.kalium.logic.CoreFailure +import com.wire.kalium.logic.cache.SelfConversationIdProvider +import com.wire.kalium.logic.callingLogger +import com.wire.kalium.logic.data.conversation.ClientId +import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.data.message.Message +import com.wire.kalium.logic.data.message.MessageContent +import com.wire.kalium.logic.data.message.MessageTarget +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.feature.message.MessageSender +import com.wire.kalium.logic.functional.Either +import com.wire.kalium.logic.functional.flatMap +import com.wire.kalium.logic.functional.foldToEitherWhileRight +import com.wire.kalium.util.DateTimeUtil +import io.ktor.http.HttpStatusCode +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.launch + +internal interface CallingMessageSender { + + suspend fun processQueue() + + @Suppress("LongParameterList") + fun enqueueSendingOfCallingMessage( + context: Pointer?, + callHostConversationId: ConversationId, + messageString: String?, + avsSelfUserId: UserId, + avsSelfClientId: ClientId, + messageTarget: CallingMessageTarget, + ) +} + +@Suppress("FunctionNaming") +internal fun CallingMessageSender( + handle: Deferred, + calling: Calling, + messageSender: MessageSender, + callingScope: CoroutineScope, + selfConversationIdProvider: SelfConversationIdProvider +) = object : CallingMessageSender { + + private val logger = callingLogger.withTextTag("CallingMessageSender") + + private val queue = Channel( + capacity = Channel.UNLIMITED, + ) + + @Suppress("LongParameterList") + override fun enqueueSendingOfCallingMessage( + context: Pointer?, + callHostConversationId: ConversationId, + messageString: String?, + avsSelfUserId: UserId, + avsSelfClientId: ClientId, + messageTarget: CallingMessageTarget, + ) { + if (messageString == null) return + callingScope.launch { + queue.send( + CallingMessageInstructions( + context, + callHostConversationId, + messageString, + avsSelfUserId, + avsSelfClientId, + messageTarget, + ) + ) + } + } + + override suspend fun processQueue() { + queue.consumeAsFlow().collect { messageInstructions -> + processInstruction(messageInstructions, selfConversationIdProvider) + } + } + + private suspend fun processInstruction( + messageInstructions: CallingMessageInstructions, + selfConversationIdProvider: SelfConversationIdProvider + ) { + val target = messageInstructions.messageTarget + + val transportConversationIds = when (target) { + is CallingMessageTarget.Self -> { + selfConversationIdProvider() + } + + is CallingMessageTarget.HostConversation -> { + Either.Right(listOf(messageInstructions.callHostConversationId)) + } + } + + val result = transportConversationIds.flatMap { conversations -> + conversations.foldToEitherWhileRight(Unit) { transportConversationId, _ -> + sendCallingMessage( + messageInstructions.callHostConversationId, + messageInstructions.avsSelfUserId, + messageInstructions.avsSelfClientId, + messageInstructions.messageString, + target.specificTarget, + transportConversationId + ) + } + } + + val (code, message) = when (result) { + is Either.Right -> { + logger.i("Notifying AVS - Success sending message") + HttpStatusCode.OK.value to "" + } + + is Either.Left -> { + logger.i("Notifying AVS - Error sending message") + HttpStatusCode.BadRequest.value to "Couldn't send Calling Message" + } + } + calling.wcall_resp( + inst = handle.await(), + status = code, + reason = message, + arg = messageInstructions.context + ) + } + + @Suppress("LongParameterList") + private suspend fun sendCallingMessage( + callHostConversationId: ConversationId, + userId: UserId, + clientId: ClientId, + data: String, + messageTarget: MessageTarget, + transportConversationId: ConversationId + ): Either { + val messageContent = MessageContent.Calling(data, callHostConversationId) + val message = Message.Signaling( + id = uuid4().toString(), + content = messageContent, + conversationId = transportConversationId, + date = DateTimeUtil.currentIsoDateTimeString(), + senderUserId = userId, + senderClientId = clientId, + status = Message.Status.Sent, + isSelfMessage = true, + expirationData = null + ) + return messageSender.sendMessage(message, messageTarget) + } +} diff --git a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnHttpRequest.kt b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnHttpRequest.kt deleted file mode 100644 index 608f1d24f60..00000000000 --- a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnHttpRequest.kt +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Wire - * Copyright (C) 2024 Wire Swiss GmbH - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ - -package com.wire.kalium.logic.feature.call.scenario - -import com.benasher44.uuid.uuid4 -import com.sun.jna.Pointer -import com.wire.kalium.calling.Calling -import com.wire.kalium.calling.types.Handle -import com.wire.kalium.logic.CoreFailure -import com.wire.kalium.logic.cache.SelfConversationIdProvider -import com.wire.kalium.logic.callingLogger -import com.wire.kalium.logic.data.conversation.ClientId -import com.wire.kalium.logic.data.id.ConversationId -import com.wire.kalium.logic.data.message.Message -import com.wire.kalium.logic.data.message.MessageContent -import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.logic.feature.message.MessageSender -import com.wire.kalium.logic.data.message.MessageTarget -import com.wire.kalium.logic.functional.Either -import com.wire.kalium.logic.functional.flatMap -import com.wire.kalium.logic.functional.foldToEitherWhileRight -import com.wire.kalium.util.DateTimeUtil -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.launch - -internal class OnHttpRequest( - private val handle: Deferred, - private val calling: Calling, - private val messageSender: MessageSender, - private val callingScope: CoroutineScope, - private val selfConversationIdProvider: SelfConversationIdProvider -) { - @Suppress("LongParameterList") - fun sendHandlerSuccess( - context: Pointer?, - messageString: String?, - conversationId: ConversationId, - avsSelfUserId: UserId, - avsSelfClientId: ClientId, - messageTarget: MessageTarget, - sendInSelfConversation: Boolean - ) { - callingScope.launch { - messageString?.let { message -> - - val result = if (sendInSelfConversation) { - selfConversationIdProvider().flatMap { selfConversationIds -> - selfConversationIds.foldToEitherWhileRight(Unit) { selfConversationId, _ -> - sendCallingMessage( - conversationId, - avsSelfUserId, - avsSelfClientId, - message, - messageTarget, - selfConversationId - ) - } - } - } else { - sendCallingMessage(conversationId, avsSelfUserId, avsSelfClientId, message, messageTarget) - } - - when (result) { - is Either.Right -> { - callingLogger.i("[OnHttpRequest] -> Success") - calling.wcall_resp( - inst = handle.await(), - status = 200, - reason = "", - arg = context - ) - } - is Either.Left -> { - callingLogger.i("[OnHttpRequest] -> Error") - calling.wcall_resp( - inst = handle.await(), - status = 400, // TODO(calling): Handle the errorCode from CoreFailure - reason = "Couldn't send Calling Message", - arg = context - ) - } - } - } - } - } - - @Suppress("LongParameterList") - private suspend fun sendCallingMessage( - conversationId: ConversationId, - userId: UserId, - clientId: ClientId, - data: String, - messageTarget: MessageTarget, - selfConversationId: ConversationId? = null - ): Either { - val messageContent = MessageContent.Calling(data, conversationId) - val date = DateTimeUtil.currentIsoDateTimeString() - val message = Message.Signaling( - id = uuid4().toString(), - content = messageContent, - conversationId = selfConversationId ?: conversationId, - date = date, - senderUserId = userId, - senderClientId = clientId, - status = Message.Status.Sent, - isSelfMessage = true, - expirationData = null - ) - - return messageSender.sendMessage(message, messageTarget) - } -} diff --git a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTR.kt b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTR.kt index b0b11723d82..6ed0649be68 100644 --- a/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTR.kt +++ b/logic/src/commonJvmAndroid/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTR.kt @@ -19,36 +19,26 @@ package com.wire.kalium.logic.feature.call.scenario import com.sun.jna.Pointer -import com.wire.kalium.calling.Calling import com.wire.kalium.calling.callbacks.SendHandler -import com.wire.kalium.calling.types.Handle import com.wire.kalium.calling.types.Size_t -import com.wire.kalium.logic.cache.SelfConversationIdProvider import com.wire.kalium.logic.callingLogger import com.wire.kalium.logic.data.call.CallClientList import com.wire.kalium.logic.data.call.mapper.CallMapper import com.wire.kalium.logic.data.conversation.ClientId import com.wire.kalium.logic.data.id.QualifiedIdMapper +import com.wire.kalium.logic.data.message.MessageTarget import com.wire.kalium.logic.feature.call.AvsCallBackError import com.wire.kalium.logic.feature.call.CallManagerImpl -import com.wire.kalium.logic.feature.message.MessageSender -import com.wire.kalium.logic.data.message.MessageTarget -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred import kotlinx.serialization.json.Json // TODO(testing): create unit test @Suppress("LongParameterList") internal class OnSendOTR( - private val handle: Deferred, - private val calling: Calling, private val qualifiedIdMapper: QualifiedIdMapper, private val selfUserId: String, private val selfClientId: String, - private val messageSender: MessageSender, - private val selfConversationIdProvider: SelfConversationIdProvider, - private val callingScope: CoroutineScope, - private val callMapper: CallMapper + private val callMapper: CallMapper, + private val callingMessageSender: CallingMessageSender, ) : SendHandler { @Suppress("TooGenericExceptionCaught", "NestedBlockDepth") override fun onSend( @@ -73,26 +63,26 @@ internal class OnSendOTR( try { val messageTarget = if (myClientsOnly) { callingLogger.i("[OnSendOTR] -> Route calling message via self conversation") - MessageTarget.Conversation() + CallingMessageTarget.Self } else { callingLogger.i("[OnSendOTR] -> Decoding Recipients") - targetRecipientsJson?.let { recipientsJson -> + val specificTarget = targetRecipientsJson?.let { recipientsJson -> val callClientList = Json.decodeFromString(recipientsJson) callingLogger.i("[OnSendOTR] -> Mapping Recipients") callMapper.toClientMessageTarget(callClientList = callClientList) } ?: MessageTarget.Conversation() + CallingMessageTarget.HostConversation(specificTarget) } callingLogger.i("[OnSendOTR] -> Success") - OnHttpRequest(handle, calling, messageSender, callingScope, selfConversationIdProvider).sendHandlerSuccess( + callingMessageSender.enqueueSendingOfCallingMessage( context = context, + callHostConversationId = qualifiedIdMapper.fromStringToQualifiedID(remoteConversationId), messageString = data?.getString(0, CallManagerImpl.UTF8_ENCODING), - conversationId = qualifiedIdMapper.fromStringToQualifiedID(remoteConversationId), avsSelfUserId = qualifiedIdMapper.fromStringToQualifiedID(remoteSelfUserId), avsSelfClientId = ClientId(remoteClientIdSelf), - messageTarget = messageTarget, - sendInSelfConversation = myClientsOnly + messageTarget = messageTarget ) AvsCallBackError.NONE.value } catch (exception: Exception) { diff --git a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageSenderTest.kt b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageSenderTest.kt new file mode 100644 index 00000000000..72497421643 --- /dev/null +++ b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/CallingMessageSenderTest.kt @@ -0,0 +1,315 @@ +/* + * Wire + * Copyright (C) 2024 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.feature.call.scenario + +import com.sun.jna.Pointer +import com.wire.kalium.calling.Calling +import com.wire.kalium.calling.types.Handle +import com.wire.kalium.logic.CoreFailure +import com.wire.kalium.logic.StorageFailure +import com.wire.kalium.logic.cache.SelfConversationIdProvider +import com.wire.kalium.logic.data.conversation.ClientId +import com.wire.kalium.logic.data.id.ConversationId +import com.wire.kalium.logic.data.message.MessageContent +import com.wire.kalium.logic.data.message.MessageTarget +import com.wire.kalium.logic.feature.message.MessageSender +import com.wire.kalium.logic.framework.TestConversation +import com.wire.kalium.logic.framework.TestUser +import com.wire.kalium.logic.functional.Either +import io.mockative.Mock +import io.mockative.any +import io.mockative.anyInstanceOf +import io.mockative.anything +import io.mockative.eq +import io.mockative.given +import io.mockative.matching +import io.mockative.mock +import io.mockative.once +import io.mockative.verify +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertEquals + +class CallingMessageSenderTest { + + @Test + fun givenSendInSelfConversation_whenSending_messageIsSentInSelfConversations() = runTest { + val (arrangement, sender) = Arrangement(this) + .givenSelfConversationIdProviderReturns(Either.Right(listOf(Arrangement.selfConversationId))) + .givenSendMessageSuccessful() + .arrange() + + val processingJob = launch { + sender.processQueue() + } + + sender.enqueueSendingOfCallingMessage( + context = null, + messageString = "message", + callHostConversationId = Arrangement.conversationId, + avsSelfUserId = Arrangement.selfUserId, + avsSelfClientId = Arrangement.selfUserCLientId, + messageTarget = CallingMessageTarget.Self + ) + advanceUntilIdle() + + verify(arrangement.messageSender) + .suspendFunction(arrangement.messageSender::sendMessage) + .with( + matching { it.conversationId == Arrangement.selfConversationId }, + matching { it is MessageTarget.Conversation } + ) + .wasInvoked(exactly = once) + processingJob.cancel() + } + + @Test + fun givenSendFails_whenSending_thenAvsIsInformedAboutItWithCode400() = runTest { + val (arrangement, sender) = Arrangement(this) + .givenSelfConversationIdProviderReturns(Either.Right(listOf(Arrangement.selfConversationId))) + .givenSendMessageFails() + .arrange() + + val processingJob = launch { + sender.processQueue() + } + + val contextPointer = Pointer.createConstant(24) + + sender.enqueueSendingOfCallingMessage( + context = contextPointer, + messageString = "message", + callHostConversationId = Arrangement.conversationId, + avsSelfUserId = Arrangement.selfUserId, + avsSelfClientId = Arrangement.selfUserCLientId, + messageTarget = CallingMessageTarget.Self + ) + advanceUntilIdle() + + verify(arrangement.calling) + .function(arrangement.calling::wcall_resp) + .with( + eq(arrangement.handle), + eq(400), + any(), + eq(contextPointer), + ).wasInvoked(exactly = once) + processingJob.cancel() + } + + @Test + fun givenSendSucceeds_whenSending_thenAvsIsInformedAboutItWithCode200() = runTest { + val (arrangement, sender) = Arrangement(this) + .givenSelfConversationIdProviderReturns(Either.Right(listOf(Arrangement.selfConversationId))) + .givenSendMessageSuccessful() + .arrange() + + val processingJob = launch { + sender.processQueue() + } + + val contextPointer = Pointer.createConstant(123) + + sender.enqueueSendingOfCallingMessage( + context = contextPointer, + messageString = "message", + callHostConversationId = Arrangement.conversationId, + avsSelfUserId = Arrangement.selfUserId, + avsSelfClientId = Arrangement.selfUserCLientId, + messageTarget = CallingMessageTarget.Self + ) + advanceUntilIdle() + + verify(arrangement.calling) + .function(arrangement.calling::wcall_resp) + .with( + eq(arrangement.handle), + eq(200), + any(), + eq(contextPointer), + ).wasInvoked(exactly = once) + processingJob.cancel() + } + + @Test + fun givenSendInHostConversation_whenSending_messageIsSentInTargetConversation() = runTest { + val (arrangement, sender) = Arrangement(this) + .givenSendMessageSuccessful() + .arrange() + + val processingJob = launch { + sender.processQueue() + } + + sender.enqueueSendingOfCallingMessage( + context = null, + messageString = "message", + callHostConversationId = Arrangement.conversationId, + avsSelfUserId = Arrangement.selfUserId, + avsSelfClientId = Arrangement.selfUserCLientId, + messageTarget = CallingMessageTarget.HostConversation() + ) + advanceUntilIdle() + + verify(arrangement.messageSender) + .suspendFunction(arrangement.messageSender::sendMessage) + .with( + matching { it.conversationId == Arrangement.conversationId }, + matching { it is MessageTarget.Conversation }, + ).wasInvoked(exactly = once) + processingJob.cancel() + } + + @Test + fun givenMultipleMessagesAreEnqueued_whenSending_messagesAreSentInOrder() = runTest { + var invokeCount = 0 + val firstMessageLock = Job() + + val (arrangement, sender) = Arrangement(this) + .givenSendMessageInvokes { + invokeCount++ + if (invokeCount == 1) { // Delay the FIRST sending + firstMessageLock.join() + } + Either.Right(Unit) + } + .arrange() + + val processingJob = launch { + sender.processQueue() + } + + val firstMessageText = "message" + sender.enqueueSendingOfCallingMessage( + context = null, + messageString = firstMessageText, + callHostConversationId = Arrangement.conversationId, + avsSelfUserId = Arrangement.selfUserId, + avsSelfClientId = Arrangement.selfUserCLientId, + messageTarget = CallingMessageTarget.HostConversation() + ) + val secondMessageText = "SECOND message" + sender.enqueueSendingOfCallingMessage( + context = null, + messageString = secondMessageText, + callHostConversationId = Arrangement.conversationId, + avsSelfUserId = Arrangement.selfUserId, + avsSelfClientId = Arrangement.selfUserCLientId, + messageTarget = CallingMessageTarget.HostConversation() + ) + + advanceUntilIdle() + + assertEquals(1, invokeCount) + verify(arrangement.messageSender) + .suspendFunction(arrangement.messageSender::sendMessage) + .with( + matching { + val content = it.content + secondMessageText == (content as? MessageContent.Calling)?.value && + it.conversationId == Arrangement.conversationId + }, + anyInstanceOf(MessageTarget.Conversation::class) + ) + .wasNotInvoked() + + firstMessageLock.cancel() + advanceUntilIdle() + verify(arrangement.messageSender) + .suspendFunction(arrangement.messageSender::sendMessage) + .with( + matching { + val content = it.content + secondMessageText == (content as? MessageContent.Calling)?.value && + it.conversationId == Arrangement.conversationId + }, + anyInstanceOf(MessageTarget.Conversation::class) + ) + .wasInvoked(exactly = once) + processingJob.cancel() + } + + internal class Arrangement(private val testScope: CoroutineScope) { + + @Mock + val calling = mock(Calling::class) + + @Mock + var messageSender = mock(MessageSender::class) + + @Mock + val selfConversationIdProvider = mock(SelfConversationIdProvider::class) + + val handle = Handle(42) + + init { + given(calling) + .function(calling::wcall_resp) + .whenInvokedWith(anything(), anything(), anything(), anything()) + .thenReturn(0) + } + + fun arrange() = this to CallingMessageSender( + testScope.async { handle }, + calling, + messageSender, + testScope, + selfConversationIdProvider + ) + + companion object { + val conversationId = TestConversation.GROUP().id + val selfConversationId = TestConversation.SELF().id + val selfUserId = TestUser.SELF.id + val selfUserCLientId = ClientId("self_client") + } + + suspend fun givenSelfConversationIdProviderReturns(result: Either>) = apply { + given(selfConversationIdProvider) + .suspendFunction(selfConversationIdProvider::invoke) + .whenInvoked() + .thenReturn(result) + } + + suspend fun givenSendMessageSuccessful() = apply { + given(messageSender) + .suspendFunction(messageSender::sendMessage) + .whenInvokedWith(any(), any()) + .thenReturn(Either.Right(Unit)) + } + + suspend fun givenSendMessageFails() = apply { + given(messageSender) + .suspendFunction(messageSender::sendMessage) + .whenInvokedWith(any(), any()) + .thenReturn(Either.Left(StorageFailure.DataNotFound)) + } + + suspend fun givenSendMessageInvokes(block: suspend () -> Either) = apply { + given(messageSender) + .suspendFunction(messageSender::sendMessage) + .whenInvokedWith(any(), any()) + .thenInvoke(block) + } + } +} diff --git a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnCloseCallTest.kt b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnCloseCallTest.kt similarity index 99% rename from logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnCloseCallTest.kt rename to logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnCloseCallTest.kt index 9904e2b1a23..04f33452189 100644 --- a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnCloseCallTest.kt +++ b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnCloseCallTest.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see http://www.gnu.org/licenses/. */ -package com.wire.kalium.logic.feature.scenario +package com.wire.kalium.logic.feature.call.scenario import com.wire.kalium.calling.CallClosedReason import com.wire.kalium.calling.types.Uint32_t diff --git a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnMuteStateForSelfUserChangedTest.kt b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnMuteStateForSelfUserChangedTest.kt similarity index 98% rename from logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnMuteStateForSelfUserChangedTest.kt rename to logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnMuteStateForSelfUserChangedTest.kt index 769bedf2181..06a8715db62 100644 --- a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnMuteStateForSelfUserChangedTest.kt +++ b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnMuteStateForSelfUserChangedTest.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see http://www.gnu.org/licenses/. */ -package com.wire.kalium.logic.feature.scenario +package com.wire.kalium.logic.feature.call.scenario import com.wire.kalium.logic.data.call.CallRepository import com.wire.kalium.logic.data.conversation.Conversation diff --git a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnSendOTRTest.kt b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTRTest.kt similarity index 73% rename from logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnSendOTRTest.kt rename to logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTRTest.kt index b48a443d0d0..edc10521338 100644 --- a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnSendOTRTest.kt +++ b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/call/scenario/OnSendOTRTest.kt @@ -15,9 +15,10 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see http://www.gnu.org/licenses/. */ -package com.wire.kalium.logic.feature.scenario +package com.wire.kalium.logic.feature.call.scenario import com.sun.jna.Memory +import com.sun.jna.Pointer import com.wire.kalium.calling.Calling import com.wire.kalium.calling.types.Size_t import com.wire.kalium.logic.StorageFailure @@ -26,24 +27,21 @@ import com.wire.kalium.logic.data.call.mapper.CallMapperImpl import com.wire.kalium.logic.data.conversation.ClientId import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.id.QualifiedIdMapperImpl +import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.call.CallManagerImpl -import com.wire.kalium.logic.feature.call.scenario.OnSendOTR -import com.wire.kalium.logic.feature.message.MessageSender -import com.wire.kalium.logic.data.message.MessageTarget import com.wire.kalium.logic.framework.TestConversation import com.wire.kalium.logic.framework.TestUser import com.wire.kalium.logic.functional.Either import com.wire.kalium.logic.test_util.TestKaliumDispatcher import io.mockative.Mock import io.mockative.any -import io.mockative.classOf +import io.mockative.anyInstanceOf +import io.mockative.anything +import io.mockative.eq import io.mockative.given -import io.mockative.matching import io.mockative.mock import io.mockative.once import io.mockative.verify -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.test.runTest import kotlinx.coroutines.yield import org.junit.Test @@ -77,10 +75,14 @@ class OnSendOTRTest { yield() verify(arrangement.messageSender) - .suspendFunction(arrangement.messageSender::sendMessage) + .suspendFunction(arrangement.messageSender::enqueueSendingOfCallingMessage) .with( - matching { it.conversationId == Arrangement.selfConversationId }, - matching { it is MessageTarget.Conversation }, + anything(), + eq(Arrangement.conversationId), + anything(), + anything(), + anything(), + anyInstanceOf(CallingMessageTarget.Self::class), ) .wasInvoked(exactly = once) } @@ -111,10 +113,14 @@ class OnSendOTRTest { yield() verify(arrangement.messageSender) - .suspendFunction(arrangement.messageSender::sendMessage) + .suspendFunction(arrangement.messageSender::enqueueSendingOfCallingMessage) .with( - matching { it.conversationId == Arrangement.conversationId }, - matching { it is MessageTarget.Conversation }, + anything(), + eq(Arrangement.conversationId), + any(), + any(), + any(), + anyInstanceOf(CallingMessageTarget.HostConversation::class), ) .wasInvoked(exactly = once) } @@ -122,28 +128,24 @@ class OnSendOTRTest { internal class Arrangement { @Mock - val calling = mock(classOf()) + val calling = mock(Calling::class) @Mock - val messageSender = mock(classOf()) + val selfConversationIdProvider = mock(SelfConversationIdProvider::class) @Mock - val selfConversationIdProvider = mock(classOf()) + val messageSender = mock(CallingMessageSender::class) val qualifiedIdMapper = QualifiedIdMapperImpl(TestUser.SELF.id) val callMapper = CallMapperImpl(qualifiedIdMapper) fun arrange() = this to OnSendOTR( - CompletableDeferred(), - calling, qualifiedIdMapper, TestUser.SELF.id.toString(), - "self_client_id", + "self_client_id", + callMapper, messageSender, - selfConversationIdProvider, - CoroutineScope(TestKaliumDispatcher.main), - callMapper ) companion object { @@ -153,18 +155,24 @@ class OnSendOTRTest { val selfUserClientId = ClientId("self_client") } - fun givenSelfConversationIdProviderReturns(result: Either>) = apply { + suspend fun givenSelfConversationIdProviderReturns(result: Either>) = apply { given(selfConversationIdProvider) .suspendFunction(selfConversationIdProvider::invoke) .whenInvoked() .thenReturn(result) } - fun givenSendMessageSuccessful() = apply { + suspend fun givenSendMessageSuccessful() = apply { given(messageSender) - .suspendFunction(messageSender::sendMessage) - .whenInvokedWith(any(), any()) - .thenReturn(Either.Right(Unit)) + .suspendFunction(messageSender::enqueueSendingOfCallingMessage) + .whenInvokedWith( + any(), + any(), + any(), + any(), + any(), + any(), + ).thenReturn(Unit) } } diff --git a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnHttpRequestTest.kt b/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnHttpRequestTest.kt deleted file mode 100644 index a431a4c1ba3..00000000000 --- a/logic/src/jvmTest/kotlin/com/wire/kalium/logic/feature/scenario/OnHttpRequestTest.kt +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Wire - * Copyright (C) 2024 Wire Swiss GmbH - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see http://www.gnu.org/licenses/. - */ -package com.wire.kalium.logic.feature.scenario - -import com.wire.kalium.calling.Calling -import com.wire.kalium.logic.StorageFailure -import com.wire.kalium.logic.cache.SelfConversationIdProvider -import com.wire.kalium.logic.data.conversation.ClientId -import com.wire.kalium.logic.data.id.ConversationId -import com.wire.kalium.logic.feature.call.scenario.OnHttpRequest -import com.wire.kalium.logic.feature.message.MessageSender -import com.wire.kalium.logic.data.message.MessageTarget -import com.wire.kalium.logic.framework.TestConversation -import com.wire.kalium.logic.framework.TestUser -import com.wire.kalium.logic.functional.Either -import com.wire.kalium.logic.test_util.TestKaliumDispatcher -import io.mockative.Mock -import io.mockative.any -import io.mockative.classOf -import io.mockative.given -import io.mockative.matching -import io.mockative.mock -import io.mockative.once -import io.mockative.verify -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.test.runTest -import kotlinx.coroutines.yield -import org.junit.Test - -class OnHttpRequestTest { - - @Test - fun givenSendInSelfConversationIsTrue_whenSending_messageIsSentInSelfConversations() = runTest(TestKaliumDispatcher.main) { - val (arrangement, onHttpRequest) = Arrangement() - .givenSelfConversationIdProviderReturns(Either.Right(listOf(Arrangement.selfConversationId))) - .givenSendMessageSuccessful() - .arrange() - - onHttpRequest.sendHandlerSuccess( - context = null, - messageString = "message", - conversationId = Arrangement.conversationId, - avsSelfUserId = Arrangement.selfUserId, - avsSelfClientId = Arrangement.selfUserCLientId, - messageTarget = MessageTarget.Conversation(), - sendInSelfConversation = true - ) - yield() - - verify(arrangement.messageSender) - .suspendFunction(arrangement.messageSender::sendMessage) - .with( - matching { it.conversationId == Arrangement.selfConversationId }, - matching { it is MessageTarget.Conversation }, - ) - .wasInvoked(exactly = once) - } - - @Test - fun givenSendInSelfConversationIsFalse_whenSending_messageIsSentInTargetConversation() = runTest(TestKaliumDispatcher.main) { - val (arrangement, onHttpRequest) = Arrangement() - .givenSendMessageSuccessful() - .arrange() - - onHttpRequest.sendHandlerSuccess( - context = null, - messageString = "message", - conversationId = Arrangement.conversationId, - avsSelfUserId = Arrangement.selfUserId, - avsSelfClientId = Arrangement.selfUserCLientId, - messageTarget = MessageTarget.Conversation(), - sendInSelfConversation = false - ) - yield() - - verify(arrangement.messageSender) - .suspendFunction(arrangement.messageSender::sendMessage) - .with( - matching { it.conversationId == Arrangement.conversationId }, - matching { it is MessageTarget.Conversation }, - ) - .wasInvoked(exactly = once) - } - - internal class Arrangement { - - @Mock - val calling = mock(classOf()) - - @Mock - val messageSender = mock(classOf()) - - @Mock - val selfConversationIdProvider = mock(classOf()) - - fun arrange() = this to OnHttpRequest( - CompletableDeferred(), - calling, - messageSender, - CoroutineScope(TestKaliumDispatcher.main), - selfConversationIdProvider - ) - - companion object { - val conversationId = TestConversation.GROUP().id - val selfConversationId = TestConversation.SELF().id - val selfUserId = TestUser.SELF.id - val selfUserCLientId = ClientId("self_client") - } - - fun givenSelfConversationIdProviderReturns(result: Either>) = apply { - given(selfConversationIdProvider) - .suspendFunction(selfConversationIdProvider::invoke) - .whenInvoked() - .thenReturn(result) - } - - fun givenSendMessageSuccessful() = apply { - given(messageSender) - .suspendFunction(messageSender::sendMessage) - .whenInvokedWith(any(), any()) - .thenReturn(Either.Right(Unit)) - } - } -}