Skip to content

Commit

Permalink
feat(monkeys): Add support for multi-clients in the replay mode [WPB-…
Browse files Browse the repository at this point in the history
…5764]
  • Loading branch information
Augusto César Dias committed Dec 14, 2023
1 parent 4cf1578 commit bf6506f
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ CREATE TABLE IF NOT EXISTS Event (
event_time TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW(),
monkey_index INTEGER NOT NULL,
team VARCHAR(100) NOT NULL,
client_id INTEGER NOT NULL,
event_data JSONB AS EventType NOT NULL,
PRIMARY KEY(id),
CONSTRAINT fk_execution FOREIGN KEY(execution_id) REFERENCES Execution(id)
);

selectByExecutionId:
SELECT id, execution_id, event_time, monkey_index, team, CAST(event_data AS TEXT) event_data
SELECT id, execution_id, event_time, monkey_index, team, client_id, CAST(event_data AS TEXT) event_data
FROM Event WHERE execution_id = ?;

insertEvent:
INSERT INTO Event(execution_id, monkey_index, team, event_data)
VALUES (?, ?, ?, CAST(? AS JSONB));
INSERT INTO Event(execution_id, monkey_index, team, client_id, event_data)
VALUES (?, ?, ?, ?, CAST(? AS JSONB));
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import sun.misc.Signal
import sun.misc.SignalHandler
import java.io.File
import kotlin.system.exitProcess

fun CoroutineScope.stopIM() {
Expand Down Expand Up @@ -102,12 +103,14 @@ class MonkeyApplication : CliktCommand(allowMultipleSubcommands = true) {
null -> DummyEventStorage()
}
eventProcessor.storeBackends(testData.backends)
val kaliumCacheFolders = testData.testCases.map { it.name.replace(' ', '_') }
try {
runMonkeys(testData, eventProcessor)
} catch (e: Throwable) {
logger.e("Error running Infinite Monkeys", e)
} finally {
eventProcessor.releaseResources()
kaliumCacheFolders.forEach{ File(it).deleteRecursively() }
exitProcess(0)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.github.ajalt.clikt.parameters.types.int
import com.wire.kalium.logger.KaliumLogLevel
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.CoreLogger
import com.wire.kalium.logic.CoreLogic
import com.wire.kalium.monkeys.actions.Action
import com.wire.kalium.monkeys.model.Event
import com.wire.kalium.monkeys.model.TestDataImporter
Expand Down Expand Up @@ -103,11 +104,14 @@ class ReplayApplication : CliktCommand(allowMultipleSubcommands = true) {
@OptIn(InternalSerializationApi::class, ExperimentalSerializationApi::class)
@Suppress("TooGenericExceptionCaught")
private suspend fun processEvents(users: List<UserData>, events: ReceiveChannel<Event>) {
val logicClients = mutableMapOf<Int, CoreLogic>()
events.consumeEach { config ->
val actionName = config.eventType::class.serializer().descriptor.serialName
try {
val monkeyPool = MonkeyPool(users, "Replayer")
val coreLogic = coreLogic("${homeDirectory()}/.kalium/replayer-${this.executionId}")
val coreLogic = logicClients.getOrPut(config.monkeyOrigin.clientId) {
coreLogic("${homeDirectory()}/.kalium/replayer-${config.monkeyOrigin.clientId}")
}
logger.i("Running action $actionName")
val startTime = System.currentTimeMillis()
Action.eventFromConfig(config.monkeyOrigin, config.eventType).execute(coreLogic, monkeyPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.wire.kalium.logic.feature.client.RegisterClientUseCase
import com.wire.kalium.logic.feature.conversation.CreateConversationResult
import com.wire.kalium.logic.feature.conversation.CreateGroupConversationUseCase
import com.wire.kalium.logic.feature.publicuser.GetAllContactsResult
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.monkeys.logger
import com.wire.kalium.monkeys.model.Backend
import com.wire.kalium.monkeys.model.MonkeyId
Expand Down Expand Up @@ -76,7 +77,7 @@ class Monkey(val monkeyType: MonkeyType, val internalId: MonkeyId) {
// this means there are users within the team not managed by IM
// We can still send messages and add them to groups but not act on their behalf
// MonkeyId is irrelevant for external users as we will never be able to act on their behalf
fun external(userId: UserId) = Monkey(MonkeyType.External(userId), MonkeyId(-1, ""))
fun external(userId: UserId) = Monkey(MonkeyType.External(userId), MonkeyId(-1, "", -1))
fun internal(user: UserData, monkeyId: MonkeyId) = Monkey(MonkeyType.Internal(user), monkeyId)
}

Expand Down Expand Up @@ -279,7 +280,10 @@ class Monkey(val monkeyType: MonkeyType, val internalId: MonkeyId) {

suspend fun sendMessageTo(conversationId: ConversationId, message: String) {
this.monkeyState.readyThen {
messages.sendTextMessage(conversationId, message)
val result = messages.sendTextMessage(conversationId, message)
if (result is Either.Left) {
error("Error sending message: ${result.value}")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ data class MonkeyId(
@SerialName("index")
val index: Int,
@SerialName("team")
val team: String
val team: String,
@SerialName("client")
val clientId: Int
)

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MonkeyPool(users: List<UserData>, testCase: String) {

init {
users.forEachIndexed { index, userData ->
val monkey = Monkey.internal(userData, MonkeyId(index, userData.team.name))
val monkey = Monkey.internal(userData, MonkeyId(index, userData.team.name, testCase.hashCode()))
this.pool.getOrPut(userData.team.name) { mutableListOf() }.add(monkey)
this.poolLoggedOut.getOrPut(userData.team.name) { ConcurrentHashMap() }[userData.userId] = monkey
this.poolById[userData.userId] = monkey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class PostgresStorage(pgConfig: EventConfig, private val executionId: Int? = nul
execution_id = execution.id,
monkey_index = event.monkeyOrigin.index,
team = event.monkeyOrigin.team,
client_id = event.monkeyOrigin.clientId,
event_data = event.eventType
)
}
Expand All @@ -99,7 +100,7 @@ class PostgresStorage(pgConfig: EventConfig, private val executionId: Int? = nul
override fun CoroutineScope.readEvents() = produce {
withDatabase { database, execution ->
for (event in database.executionEventQueries.selectByExecutionId(execution.id).awaitAsList()) {
send(Event(MonkeyId(event.monkey_index, event.team), Json.decodeFromString(event.event_data)))
send(Event(MonkeyId(event.monkey_index, event.team, event.client_id), Json.decodeFromString(event.event_data)))
}
}
}
Expand Down

0 comments on commit bf6506f

Please sign in to comment.