Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(monkeys): Add support for multi-clients in the replay mode [WPB-5764] #2309

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ CREATE TABLE IF NOT EXISTS Event (
event_time TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW(),
monkey_index INTEGER NOT NULL,
team VARCHAR(100) NOT NULL,
client_id INTEGER NOT NULL,
event_data JSONB AS EventType NOT NULL,
PRIMARY KEY(id),
CONSTRAINT fk_execution FOREIGN KEY(execution_id) REFERENCES Execution(id)
);

selectByExecutionId:
SELECT id, execution_id, event_time, monkey_index, team, CAST(event_data AS TEXT) event_data
SELECT id, execution_id, event_time, monkey_index, team, client_id, CAST(event_data AS TEXT) event_data
FROM Event WHERE execution_id = ?;

insertEvent:
INSERT INTO Event(execution_id, monkey_index, team, event_data)
VALUES (?, ?, ?, CAST(? AS JSONB));
INSERT INTO Event(execution_id, monkey_index, team, client_id, event_data)
VALUES (?, ?, ?, ?, CAST(? AS JSONB));
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import sun.misc.Signal
import sun.misc.SignalHandler
import java.io.File
import kotlin.system.exitProcess

fun CoroutineScope.stopIM() {
Expand Down Expand Up @@ -102,12 +103,14 @@ class MonkeyApplication : CliktCommand(allowMultipleSubcommands = true) {
null -> DummyEventStorage()
}
eventProcessor.storeBackends(testData.backends)
val kaliumCacheFolders = testData.testCases.map { it.name.replace(' ', '_') }
try {
runMonkeys(testData, eventProcessor)
} catch (e: Throwable) {
logger.e("Error running Infinite Monkeys", e)
} finally {
eventProcessor.releaseResources()
kaliumCacheFolders.forEach { File(it).deleteRecursively() }
exitProcess(0)
}
}
Expand All @@ -125,12 +128,7 @@ class MonkeyApplication : CliktCommand(allowMultipleSubcommands = true) {
logger.i("Creating prefixed groups")
testData.conversationDistribution.forEach { (prefix, config) ->
ConversationPool.createPrefixedConversations(
coreLogic,
prefix,
config.groupCount,
config.userCount,
config.protocol,
monkeyPool
coreLogic, prefix, config.groupCount, config.userCount, config.protocol, monkeyPool
).forEach {
eventChannel.send(Event(it.owner, EventType.CreateConversation(it)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.github.ajalt.clikt.parameters.types.int
import com.wire.kalium.logger.KaliumLogLevel
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.CoreLogger
import com.wire.kalium.logic.CoreLogic
import com.wire.kalium.monkeys.actions.Action
import com.wire.kalium.monkeys.model.Event
import com.wire.kalium.monkeys.model.TestDataImporter
Expand Down Expand Up @@ -103,11 +104,14 @@ class ReplayApplication : CliktCommand(allowMultipleSubcommands = true) {
@OptIn(InternalSerializationApi::class, ExperimentalSerializationApi::class)
@Suppress("TooGenericExceptionCaught")
private suspend fun processEvents(users: List<UserData>, events: ReceiveChannel<Event>) {
val logicClients = mutableMapOf<Int, CoreLogic>()
events.consumeEach { config ->
val actionName = config.eventType::class.serializer().descriptor.serialName
try {
val monkeyPool = MonkeyPool(users, "Replayer")
val coreLogic = coreLogic("${homeDirectory()}/.kalium/replayer-${this.executionId}")
val coreLogic = logicClients.getOrPut(config.monkeyOrigin.clientId) {
coreLogic("${homeDirectory()}/.kalium/replayer-${config.monkeyOrigin.clientId}")
}
logger.i("Running action $actionName")
val startTime = System.currentTimeMillis()
Action.eventFromConfig(config.monkeyOrigin, config.eventType).execute(coreLogic, monkeyPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.wire.kalium.logic.feature.client.RegisterClientUseCase
import com.wire.kalium.logic.feature.conversation.CreateConversationResult
import com.wire.kalium.logic.feature.conversation.CreateGroupConversationUseCase
import com.wire.kalium.logic.feature.publicuser.GetAllContactsResult
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.monkeys.logger
import com.wire.kalium.monkeys.model.Backend
import com.wire.kalium.monkeys.model.MonkeyId
Expand Down Expand Up @@ -76,7 +77,7 @@ class Monkey(val monkeyType: MonkeyType, val internalId: MonkeyId) {
// this means there are users within the team not managed by IM
// We can still send messages and add them to groups but not act on their behalf
// MonkeyId is irrelevant for external users as we will never be able to act on their behalf
fun external(userId: UserId) = Monkey(MonkeyType.External(userId), MonkeyId(-1, ""))
fun external(userId: UserId) = Monkey(MonkeyType.External(userId), MonkeyId(-1, "", -1))
fun internal(user: UserData, monkeyId: MonkeyId) = Monkey(MonkeyType.Internal(user), monkeyId)
}

Expand Down Expand Up @@ -279,7 +280,10 @@ class Monkey(val monkeyType: MonkeyType, val internalId: MonkeyId) {

suspend fun sendMessageTo(conversationId: ConversationId, message: String) {
this.monkeyState.readyThen {
messages.sendTextMessage(conversationId, message)
val result = messages.sendTextMessage(conversationId, message)
if (result is Either.Left) {
error("Error sending message: ${result.value}")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ data class MonkeyId(
@SerialName("index")
val index: Int,
@SerialName("team")
val team: String
val team: String,
@SerialName("client")
val clientId: Int
)

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MonkeyPool(users: List<UserData>, testCase: String) {

init {
users.forEachIndexed { index, userData ->
val monkey = Monkey.internal(userData, MonkeyId(index, userData.team.name))
val monkey = Monkey.internal(userData, MonkeyId(index, userData.team.name, testCase.hashCode()))
this.pool.getOrPut(userData.team.name) { mutableListOf() }.add(monkey)
this.poolLoggedOut.getOrPut(userData.team.name) { ConcurrentHashMap() }[userData.userId] = monkey
this.poolById[userData.userId] = monkey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class PostgresStorage(pgConfig: EventConfig, private val executionId: Int? = nul
execution_id = execution.id,
monkey_index = event.monkeyOrigin.index,
team = event.monkeyOrigin.team,
client_id = event.monkeyOrigin.clientId,
event_data = event.eventType
)
}
Expand All @@ -99,7 +100,7 @@ class PostgresStorage(pgConfig: EventConfig, private val executionId: Int? = nul
override fun CoroutineScope.readEvents() = produce {
withDatabase { database, execution ->
for (event in database.executionEventQueries.selectByExecutionId(execution.id).awaitAsList()) {
send(Event(MonkeyId(event.monkey_index, event.team), Json.decodeFromString(event.event_data)))
send(Event(MonkeyId(event.monkey_index, event.team, event.client_id), Json.decodeFromString(event.event_data)))
}
}
}
Expand Down
Loading