Skip to content

Commit

Permalink
Save test results to cache concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Chelombitko committed Dec 17, 2024
1 parent 732b77e commit fb30a86
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,45 +1,39 @@
package com.malinskiy.marathon.cache.test

import com.malinskiy.marathon.actor.unboundedChannel
import com.malinskiy.marathon.cache.test.key.TestCacheKeyFactory
import com.malinskiy.marathon.device.DevicePoolId
import com.malinskiy.marathon.execution.TestResult
import com.malinskiy.marathon.log.MarathonLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch

class TestCacheSaver(
private val cache: TestResultsCache,
private val testCacheKeyProvider: TestCacheKeyFactory
) {
) : AutoCloseable {

private val logger = MarathonLogging.getLogger(TestCacheSaver::class.java)

private val tasks: Channel<SaveTask> = unboundedChannel()
private lateinit var completableDeferred: Deferred<Unit>

fun initialize(scope: CoroutineScope) {
completableDeferred = scope.async {
for (task in tasks) {
val cacheKey = testCacheKeyProvider.getCacheKey(task.poolId, task.result.test)
cache.store(cacheKey, task.result)
}
private val job = SupervisorJob()
private val dispatcher = Dispatchers.IO.limitedParallelism(16, "Cache saver")
private val scope = CoroutineScope(job + dispatcher)

fun saveTestResult(poolId: DevicePoolId, result: TestResult) {
scope.launch {
val cacheKey = testCacheKeyProvider.getCacheKey(poolId, result.test)
cache.store(cacheKey, result)
}
}

fun saveTestResult(poolId: DevicePoolId, result: TestResult) = runBlocking {
// channel is unbounded, so it will return immediately
tasks.send(SaveTask(poolId, result))
}

suspend fun terminate() {
tasks.close()
completableDeferred.await()
job.complete()
job.join()
logger.debug("Cache saver is terminated")
}

private class SaveTask(val poolId: DevicePoolId, val result: TestResult)
override fun close() {
scope.cancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Scheduler(
override fun close() {
deviceProvider.close()
cacheLoader.close()
cacheSaver.close()
cacheService.close()
}

Expand All @@ -128,9 +129,6 @@ class Scheduler(
if (configuration.cache.isEnabled) {
cacheLoader.start(scope, ::onCacheResult)
}
if (configuration.cache.isPushEnabled) {
cacheSaver.initialize(scope)
}
}

private suspend fun initializeDeviceProvider(scope: CoroutineScope) {
Expand Down

0 comments on commit fb30a86

Please sign in to comment.