From bf6506f41bffec3265d41195a75695fff1ed4eaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar=20Dias?= Date: Thu, 14 Dec 2023 13:00:32 +0100 Subject: [PATCH 1/2] feat(monkeys): Add support for multi-clients in the replay mode [WPB-5764] --- .../com/wire/kalium/monkeys/db/ExecutionEvent.sq | 7 ++++--- .../kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt | 3 +++ .../kotlin/com/wire/kalium/monkeys/ReplayApplication.kt | 6 +++++- .../kotlin/com/wire/kalium/monkeys/conversation/Monkey.kt | 8 ++++++-- .../kotlin/com/wire/kalium/monkeys/model/EventData.kt | 4 +++- .../kotlin/com/wire/kalium/monkeys/pool/MonkeyPool.kt | 2 +- .../com/wire/kalium/monkeys/storage/PostgresStorage.kt | 3 ++- 7 files changed, 24 insertions(+), 9 deletions(-) diff --git a/monkeys/src/main/db_monkeys/com/wire/kalium/monkeys/db/ExecutionEvent.sq b/monkeys/src/main/db_monkeys/com/wire/kalium/monkeys/db/ExecutionEvent.sq index 041f9390af6..15cbd0ee620 100644 --- a/monkeys/src/main/db_monkeys/com/wire/kalium/monkeys/db/ExecutionEvent.sq +++ b/monkeys/src/main/db_monkeys/com/wire/kalium/monkeys/db/ExecutionEvent.sq @@ -6,15 +6,16 @@ CREATE TABLE IF NOT EXISTS Event ( event_time TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW(), monkey_index INTEGER NOT NULL, team VARCHAR(100) NOT NULL, + client_id INTEGER NOT NULL, event_data JSONB AS EventType NOT NULL, PRIMARY KEY(id), CONSTRAINT fk_execution FOREIGN KEY(execution_id) REFERENCES Execution(id) ); selectByExecutionId: -SELECT id, execution_id, event_time, monkey_index, team, CAST(event_data AS TEXT) event_data +SELECT id, execution_id, event_time, monkey_index, team, client_id, CAST(event_data AS TEXT) event_data FROM Event WHERE execution_id = ?; insertEvent: -INSERT INTO Event(execution_id, monkey_index, team, event_data) -VALUES (?, ?, ?, CAST(? AS JSONB)); +INSERT INTO Event(execution_id, monkey_index, team, client_id, event_data) +VALUES (?, ?, ?, ?, CAST(? AS JSONB)); diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt index 6e308872f20..4dacb1f4568 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt @@ -53,6 +53,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import sun.misc.Signal import sun.misc.SignalHandler +import java.io.File import kotlin.system.exitProcess fun CoroutineScope.stopIM() { @@ -102,12 +103,14 @@ class MonkeyApplication : CliktCommand(allowMultipleSubcommands = true) { null -> DummyEventStorage() } eventProcessor.storeBackends(testData.backends) + val kaliumCacheFolders = testData.testCases.map { it.name.replace(' ', '_') } try { runMonkeys(testData, eventProcessor) } catch (e: Throwable) { logger.e("Error running Infinite Monkeys", e) } finally { eventProcessor.releaseResources() + kaliumCacheFolders.forEach{ File(it).deleteRecursively() } exitProcess(0) } } diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/ReplayApplication.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/ReplayApplication.kt index 0a03a888201..897d6c31b64 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/ReplayApplication.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/ReplayApplication.kt @@ -32,6 +32,7 @@ import com.github.ajalt.clikt.parameters.types.int import com.wire.kalium.logger.KaliumLogLevel import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logic.CoreLogger +import com.wire.kalium.logic.CoreLogic import com.wire.kalium.monkeys.actions.Action import com.wire.kalium.monkeys.model.Event import com.wire.kalium.monkeys.model.TestDataImporter @@ -103,11 +104,14 @@ class ReplayApplication : CliktCommand(allowMultipleSubcommands = true) { @OptIn(InternalSerializationApi::class, ExperimentalSerializationApi::class) @Suppress("TooGenericExceptionCaught") private suspend fun processEvents(users: List, events: ReceiveChannel) { + val logicClients = mutableMapOf() events.consumeEach { config -> val actionName = config.eventType::class.serializer().descriptor.serialName try { val monkeyPool = MonkeyPool(users, "Replayer") - val coreLogic = coreLogic("${homeDirectory()}/.kalium/replayer-${this.executionId}") + val coreLogic = logicClients.getOrPut(config.monkeyOrigin.clientId) { + coreLogic("${homeDirectory()}/.kalium/replayer-${config.monkeyOrigin.clientId}") + } logger.i("Running action $actionName") val startTime = System.currentTimeMillis() Action.eventFromConfig(config.monkeyOrigin, config.eventType).execute(coreLogic, monkeyPool) diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/conversation/Monkey.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/conversation/Monkey.kt index 2a4ddc87321..44ee5667a3c 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/conversation/Monkey.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/conversation/Monkey.kt @@ -37,6 +37,7 @@ import com.wire.kalium.logic.feature.client.RegisterClientUseCase import com.wire.kalium.logic.feature.conversation.CreateConversationResult import com.wire.kalium.logic.feature.conversation.CreateGroupConversationUseCase import com.wire.kalium.logic.feature.publicuser.GetAllContactsResult +import com.wire.kalium.logic.functional.Either import com.wire.kalium.monkeys.logger import com.wire.kalium.monkeys.model.Backend import com.wire.kalium.monkeys.model.MonkeyId @@ -76,7 +77,7 @@ class Monkey(val monkeyType: MonkeyType, val internalId: MonkeyId) { // this means there are users within the team not managed by IM // We can still send messages and add them to groups but not act on their behalf // MonkeyId is irrelevant for external users as we will never be able to act on their behalf - fun external(userId: UserId) = Monkey(MonkeyType.External(userId), MonkeyId(-1, "")) + fun external(userId: UserId) = Monkey(MonkeyType.External(userId), MonkeyId(-1, "", -1)) fun internal(user: UserData, monkeyId: MonkeyId) = Monkey(MonkeyType.Internal(user), monkeyId) } @@ -279,7 +280,10 @@ class Monkey(val monkeyType: MonkeyType, val internalId: MonkeyId) { suspend fun sendMessageTo(conversationId: ConversationId, message: String) { this.monkeyState.readyThen { - messages.sendTextMessage(conversationId, message) + val result = messages.sendTextMessage(conversationId, message) + if (result is Either.Left) { + error("Error sending message: ${result.value}") + } } } } diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/model/EventData.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/model/EventData.kt index cc55402dfa3..d7d52838ff8 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/model/EventData.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/model/EventData.kt @@ -27,7 +27,9 @@ data class MonkeyId( @SerialName("index") val index: Int, @SerialName("team") - val team: String + val team: String, + @SerialName("client") + val clientId: Int ) @Serializable diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/pool/MonkeyPool.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/pool/MonkeyPool.kt index c10d931fb14..e898897159b 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/pool/MonkeyPool.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/pool/MonkeyPool.kt @@ -52,7 +52,7 @@ class MonkeyPool(users: List, testCase: String) { init { users.forEachIndexed { index, userData -> - val monkey = Monkey.internal(userData, MonkeyId(index, userData.team.name)) + val monkey = Monkey.internal(userData, MonkeyId(index, userData.team.name, testCase.hashCode())) this.pool.getOrPut(userData.team.name) { mutableListOf() }.add(monkey) this.poolLoggedOut.getOrPut(userData.team.name) { ConcurrentHashMap() }[userData.userId] = monkey this.poolById[userData.userId] = monkey diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/storage/PostgresStorage.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/storage/PostgresStorage.kt index b75bee82462..1d46d8e32fc 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/storage/PostgresStorage.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/storage/PostgresStorage.kt @@ -80,6 +80,7 @@ class PostgresStorage(pgConfig: EventConfig, private val executionId: Int? = nul execution_id = execution.id, monkey_index = event.monkeyOrigin.index, team = event.monkeyOrigin.team, + client_id = event.monkeyOrigin.clientId, event_data = event.eventType ) } @@ -99,7 +100,7 @@ class PostgresStorage(pgConfig: EventConfig, private val executionId: Int? = nul override fun CoroutineScope.readEvents() = produce { withDatabase { database, execution -> for (event in database.executionEventQueries.selectByExecutionId(execution.id).awaitAsList()) { - send(Event(MonkeyId(event.monkey_index, event.team), Json.decodeFromString(event.event_data))) + send(Event(MonkeyId(event.monkey_index, event.team, event.client_id), Json.decodeFromString(event.event_data))) } } } From 4416ba7c9c024a48c219865e4053d446b2f5071f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar=20Dias?= Date: Thu, 14 Dec 2023 14:56:05 +0100 Subject: [PATCH 2/2] detekt --- .../kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt index 4dacb1f4568..c1fd440ad19 100644 --- a/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt +++ b/monkeys/src/main/kotlin/com/wire/kalium/monkeys/MonkeyApplication.kt @@ -110,7 +110,7 @@ class MonkeyApplication : CliktCommand(allowMultipleSubcommands = true) { logger.e("Error running Infinite Monkeys", e) } finally { eventProcessor.releaseResources() - kaliumCacheFolders.forEach{ File(it).deleteRecursively() } + kaliumCacheFolders.forEach { File(it).deleteRecursively() } exitProcess(0) } } @@ -128,12 +128,7 @@ class MonkeyApplication : CliktCommand(allowMultipleSubcommands = true) { logger.i("Creating prefixed groups") testData.conversationDistribution.forEach { (prefix, config) -> ConversationPool.createPrefixedConversations( - coreLogic, - prefix, - config.groupCount, - config.userCount, - config.protocol, - monkeyPool + coreLogic, prefix, config.groupCount, config.userCount, config.protocol, monkeyPool ).forEach { eventChannel.send(Event(it.owner, EventType.CreateConversation(it))) }