diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index da34339d8b..ac47719a61 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -78,7 +78,7 @@ private val log = LoggerFactory.getLogger(Orchestrator::class.java) * It creates jobs for the single processing steps and passes them to the corresponding workers. It collects the results * produced by the workers until the complete ORT result is available or the run has failed. */ -@Suppress("LongParameterList", "TooManyFunctions") +@Suppress("TooManyFunctions") class Orchestrator( private val db: Database, private val workerJobRepositories: WorkerJobRepositories, @@ -99,9 +99,8 @@ class Orchestrator( "Repository '${ortRun.repositoryId}' not found." } - val context = WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, emptyMap()) - context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = true) } - }.scheduleNextJobs { + scheduleConfigWorkerJob(ortRun, header, updateRun = true) + }.onFailure { log.warn("Failed to handle 'CreateOrtRun' message.", it) } } @@ -111,10 +110,12 @@ class Orchestrator( */ fun handleConfigWorkerResult(header: MessageHeader, configWorkerResult: ConfigWorkerResult) { db.blockingQueryCatching(transactionIsolation = isolationLevel) { - val ortRun = getCurrentOrtRun(configWorkerResult.ortRunId) + val ortRun = getOrtRun(configWorkerResult.ortRunId) - nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap()) - }.scheduleNextJobs { + createWorkerScheduleContext(ortRun, header) + }.onSuccess { context -> + scheduleNextJobs(context) + }.onFailure { log.warn("Failed to handle 'ConfigWorkerResult' message.", it) } } @@ -248,11 +249,12 @@ class Orchestrator( "ORT run '$ortRunId' not found." } - repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let { - nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header) - } - } ?: (createWorkerSchedulerContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList()) - }.scheduleNextJobs { + repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED) + createWorkerScheduleContext(getOrtRun(ortRunId), header) + } ?: createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true) + }.onSuccess { context -> + scheduleNextJobs(context) + }.onFailure { log.warn("Failed to handle 'WorkerError' message.", it) } } @@ -265,15 +267,18 @@ class Orchestrator( log.info("Handling a lost schedule for ORT run {}.", lostSchedule.ortRunId) db.blockingQueryCatching(transactionIsolation = isolationLevel) { - val ortRun = getCurrentOrtRun(lostSchedule.ortRunId) - val context = createWorkerSchedulerContext(ortRun, header) + val ortRun = getOrtRun(lostSchedule.ortRunId) + val context = createWorkerScheduleContext(ortRun, header) - if (context.jobs.isNotEmpty()) { - fetchNextJobs(context) + if (context.jobs.isEmpty()) { + scheduleConfigWorkerJob(ortRun, header, updateRun = false) + null } else { - context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = false) } + context } - }.scheduleNextJobs { + }.onSuccess { context -> + context?.let { scheduleNextJobs(context) } + }.onFailure { log.warn("Failed to handle 'LostSchedule' message.", it) } } @@ -281,7 +286,7 @@ class Orchestrator( /** * Obtain the [OrtRun] with the given [ortRunId] of fail with an exception if it does not exist. */ - private fun getCurrentOrtRun(ortRunId: Long): OrtRun = + private fun getOrtRun(ortRunId: Long): OrtRun = requireNotNull(ortRunRepository.get(ortRunId)) { "ORT run '$ortRunId' not found." } @@ -332,52 +337,20 @@ class Orchestrator( val job = workerJobRepositories.updateJobStatus(endpoint, message.jobId, status) if (issues.isNotEmpty()) ortRunRepository.update(job.ortRunId, issues = issues.asPresent()) - nextJobsToSchedule(endpoint, job.ortRunId, header) - }.scheduleNextJobs { + createWorkerScheduleContext(getOrtRun(job.ortRunId), header) + }.onSuccess { context -> + scheduleNextJobs(context) + }.onFailure { log.warn("Failed to handle '{}' message.", message::class.java.simpleName, it) } } - /** - * Determine the next jobs that can be scheduled after a job for the given [endpoint] for the run with the given - * [ortRunId] has completed. Use the given [header] to send messages to the worker endpoints. Optionally, - * accept a map with the [jobs] that have been run. Return a list with the new jobs to schedule and the current - * [WorkerScheduleContext]. - */ - private fun nextJobsToSchedule( - endpoint: Endpoint<*>, - ortRunId: Long, - header: MessageHeader, - jobs: Map<String, WorkerJob>? = null - ): Pair<WorkerScheduleContext, List<JobScheduleFunc>> { - log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId) - - val ortRun = getCurrentOrtRun(ortRunId) - val scheduleContext = createWorkerSchedulerContext(ortRun, header, workerJobs = jobs) - - return fetchNextJobs(scheduleContext) - } - - /** - * Convenience function to evaluate and process this [Result] with information about the next jobs to be scheduled. - * If the result is successful, actually trigger the jobs. Otherwise, call the given [onFailure] function with the - * exception that occurred. - */ - private fun Result<Pair<WorkerScheduleContext, List<JobScheduleFunc>>>.scheduleNextJobs( - onFailure: (Throwable) -> Unit - ) { - onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) - } - this@scheduleNextJobs.onFailure { onFailure(it) } - } - /** * Create a [WorkerScheduleContext] for the given [ortRun] and message [header] with the given [failed] flag. * The context is initialized with the status of all jobs for this run, either from the given [workerJobs] * parameter or by loading the job status from the database. */ - private fun createWorkerSchedulerContext( + private fun createWorkerScheduleContext( ortRun: OrtRun, header: MessageHeader, failed: Boolean = false, @@ -390,23 +363,41 @@ class Orchestrator( return WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, jobs, failed) } - /** - * Trigger the scheduling of the given new [createdJobs] for the ORT run contained in the given [context]. This - * also includes sending corresponding messages to the worker endpoints. - */ - private fun scheduleCreatedJobs(context: WorkerScheduleContext, createdJobs: CreatedJobs) { - // TODO: Handle errors during job scheduling. + /** Schedule the next jobs for the current ORT run based on the current state of the run. */ + private fun scheduleNextJobs(context: WorkerScheduleContext) { + val configuredJobs = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { + it.isConfigured(context.jobConfigs()) + } + + val jobInfos = configuredJobs.mapNotNull { + context.jobs[it.endpoint.configPrefix]?.let { job -> + it to WorkerJobInfo(job.id, job.status) + } + }.toMap() + + val ortRunInfo = OrtRunInfo(context.ortRun.id, context.failed, configuredJobs, jobInfos) - createdJobs.forEach { it() } + val nextJobs = ortRunInfo.getNextJobs() - if (createdJobs.isEmpty() && !context.hasRunningJobs()) { + nextJobs.forEach { info -> + info.createJob(context)?.let { job -> + // TODO: Handle errors during job scheduling. + info.publishJob(context, job) + context.workerJobRepositories.updateJobStatus( + info.endpoint, + job.id, + JobStatus.SCHEDULED, + finished = false + ) + } + } + + if (nextJobs.isEmpty() && !context.hasRunningJobs()) { cleanupJobs(context.ortRun.id) val ortRunStatus = when { context.isFailed() -> OrtRunStatus.FAILED - context.isFinishedWithIssues() -> OrtRunStatus.FINISHED_WITH_ISSUES - else -> OrtRunStatus.FINISHED } @@ -466,11 +457,6 @@ class Orchestrator( ) } -/** - * Type definition to represent a list of jobs that have been created and must be scheduled. - */ -typealias CreatedJobs = List<JobScheduleFunc> - /** * Create an [Issue] object representing an error that occurred in any [Endpoint]. */ @@ -480,12 +466,3 @@ fun <T : Any> Endpoint<T>.createErrorIssue(): Issue = Issue( message = "The $configPrefix worker failed due to an unexpected error.", severity = Severity.ERROR ) - -/** - * Return a [Pair] with the given [scheduleContext] and the list of jobs that can be scheduled in the current phase - * of the affected ORT run. - */ -private fun fetchNextJobs( - scheduleContext: WorkerScheduleContext -): Pair<WorkerScheduleContext, List<JobScheduleFunc>> = - scheduleContext to WorkerScheduleInfo.entries.mapNotNull { it.createAndScheduleJobIfPossible(scheduleContext) } diff --git a/orchestrator/src/main/kotlin/OrtRunInfo.kt b/orchestrator/src/main/kotlin/OrtRunInfo.kt new file mode 100644 index 0000000000..7c3fbb2687 --- /dev/null +++ b/orchestrator/src/main/kotlin/OrtRunInfo.kt @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + +package org.eclipse.apoapsis.ortserver.orchestrator + +import org.eclipse.apoapsis.ortserver.model.JobStatus + +/** A class to store the required information to determine which jobs can be run. */ +internal class OrtRunInfo( + /** The ORT run ID. */ + val id: Long, + + /** Whether the config worker has failed. */ + val configWorkerFailed: Boolean, + + /** The jobs configured to run in this ORT run. */ + val configuredJobs: Set<WorkerScheduleInfo>, + + /** Status information for already created jobs. */ + val jobInfos: Map<WorkerScheduleInfo, WorkerJobInfo> +) { + /** Get the next jobs that can be run. */ + fun getNextJobs(): Set<WorkerScheduleInfo> = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { canRun(it) } + + /** Return true if the job can be run. */ + private fun canRun(info: WorkerScheduleInfo): Boolean = + isConfigured(info) && + !wasScheduled(info) && + canRunIfPreviousJobFailed(info) && + info.dependsOn.all { isCompleted(it) } && + info.runsAfterTransitively.none { isPending(it) } + + /** Return true if no previous job has failed or if the job is configured to run after a failure. */ + private fun canRunIfPreviousJobFailed(info: WorkerScheduleInfo): Boolean = info.runAfterFailure || !isFailed() + + /** Return true if the job has been completed. */ + private fun isCompleted(info: WorkerScheduleInfo): Boolean = jobInfos[info]?.status?.final == true + + /** Return true if the job is configured to run. */ + private fun isConfigured(info: WorkerScheduleInfo): Boolean = info in configuredJobs + + /** Return true if any job has failed. */ + private fun isFailed(): Boolean = configWorkerFailed || jobInfos.any { it.value.status == JobStatus.FAILED } + + /** Return true if the job is pending execution. */ + private fun isPending(info: WorkerScheduleInfo): Boolean = + isConfigured(info) && + !isCompleted(info) && + canRunIfPreviousJobFailed(info) && + info.dependsOn.all { wasScheduled(it) || isPending(it) } + + /** Return true if the job has been scheduled. */ + private fun wasScheduled(info: WorkerScheduleInfo): Boolean = jobInfos.containsKey(info) +} + +/** A class to store information of a worker job required by [OrtRunInfo]. */ +internal class WorkerJobInfo( + /** The job ID. */ + val id: Long, + + /** The job status. */ + val status: JobStatus +) diff --git a/orchestrator/src/main/kotlin/WorkerScheduleContext.kt b/orchestrator/src/main/kotlin/WorkerScheduleContext.kt index 1c8ec57d05..6aaaa140ed 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleContext.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleContext.kt @@ -60,7 +60,7 @@ internal class WorkerScheduleContext( * jobs that have been run. With this flag, this mechanism can be overridden, which is necessary for workers that * do not spawn jobs like the Config worker. */ - private val failed: Boolean = false + val failed: Boolean = false ) { /** * Return the [JobConfigurations] object for the current run. Prefer the resolved configurations if available; @@ -87,20 +87,7 @@ internal class WorkerScheduleContext( * Return a flag whether the current [OrtRun] has at least one running job. */ fun hasRunningJobs(): Boolean = - jobs.values.any { !it.isCompleted() } - - /** - * Return a flag whether the worker job for the given [endpoint] was scheduled for the current ORT run. It may - * still be running or have finished already. - */ - fun wasScheduled(endpoint: Endpoint<*>): Boolean = - endpoint.configPrefix in jobs - - /** - * Return a flag whether the worker job for the given [endpoint] has already completed. - */ - fun isJobCompleted(endpoint: Endpoint<*>): Boolean = - jobs[endpoint.configPrefix]?.isCompleted() ?: false + jobs.values.any { !it.status.final } /** * Return a flag whether this [OrtRun] has failed, i.e. it has at least one job in failed state. @@ -114,9 +101,3 @@ internal class WorkerScheduleContext( fun isFinishedWithIssues(): Boolean = !isFailed() && jobs.values.any { it.status == JobStatus.FINISHED_WITH_ISSUES } } - -/** - * Return a flag whether this [WorkerJob] is already completed. - */ -private fun WorkerJob.isCompleted(): Boolean = - status == JobStatus.FINISHED || status == JobStatus.FAILED || status == JobStatus.FINISHED_WITH_ISSUES diff --git a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt index 5c3b544ec3..a930a7a8fc 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt @@ -20,7 +20,6 @@ package org.eclipse.apoapsis.ortserver.orchestrator import org.eclipse.apoapsis.ortserver.model.JobConfigurations -import org.eclipse.apoapsis.ortserver.model.JobStatus import org.eclipse.apoapsis.ortserver.model.WorkerJob import org.eclipse.apoapsis.ortserver.model.orchestrator.AdvisorRequest import org.eclipse.apoapsis.ortserver.model.orchestrator.AnalyzerRequest @@ -36,11 +35,6 @@ import org.eclipse.apoapsis.ortserver.transport.NotifierEndpoint import org.eclipse.apoapsis.ortserver.transport.ReporterEndpoint import org.eclipse.apoapsis.ortserver.transport.ScannerEndpoint -/** - * Type definition for a function that schedules another worker job. - */ -typealias JobScheduleFunc = () -> Unit - /** * An enumeration class with constants that describe if and when a job for a specific worker should be scheduled. * @@ -50,24 +44,24 @@ typealias JobScheduleFunc = () -> Unit */ internal enum class WorkerScheduleInfo( /** The endpoint of the worker represented by this schedule info. */ - private val endpoint: Endpoint<*>, + val endpoint: Endpoint<*>, /** * A list defining the worker jobs that this job depends on. This job will only be executed after all the * dependencies have been successfully completed. */ - private val dependsOn: List<Endpoint<*>> = emptyList(), + val dependsOn: List<WorkerScheduleInfo> = emptyList(), /** * A list defining the worker jobs that must run before this job. The difference to [dependsOn] is that this job * can also run if these other jobs will not be executed. It is only guaranteed that it runs after all of them. */ - private val runsAfter: List<Endpoint<*>> = emptyList(), + val runsAfter: List<WorkerScheduleInfo> = emptyList(), /** * A flag determining whether the represented worker should be run even if previous workers have already failed. */ - private val runAfterFailure: Boolean = false + val runAfterFailure: Boolean = false ) { ANALYZER(AnalyzerEndpoint) { override fun createJob(context: WorkerScheduleContext): WorkerJob = @@ -83,7 +77,7 @@ internal enum class WorkerScheduleInfo( override fun isConfigured(configs: JobConfigurations): Boolean = true }, - ADVISOR(AdvisorEndpoint, dependsOn = listOf(AnalyzerEndpoint)) { + ADVISOR(AdvisorEndpoint, dependsOn = listOf(ANALYZER)) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().advisor?.let { config -> context.workerJobRepositories.advisorJobRepository.create(context.ortRun.id, config) @@ -97,7 +91,7 @@ internal enum class WorkerScheduleInfo( configs.advisor != null }, - SCANNER(ScannerEndpoint, dependsOn = listOf(AnalyzerEndpoint)) { + SCANNER(ScannerEndpoint, dependsOn = listOf(ANALYZER)) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().scanner?.let { config -> context.workerJobRepositories.scannerJobRepository.create(context.ortRun.id, config) @@ -111,7 +105,7 @@ internal enum class WorkerScheduleInfo( configs.scanner != null }, - EVALUATOR(EvaluatorEndpoint, runsAfter = listOf(AdvisorEndpoint, ScannerEndpoint)) { + EVALUATOR(EvaluatorEndpoint, runsAfter = listOf(ADVISOR, SCANNER)) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().evaluator?.let { config -> context.workerJobRepositories.evaluatorJobRepository.create(context.ortRun.id, config) @@ -125,7 +119,7 @@ internal enum class WorkerScheduleInfo( configs.evaluator != null }, - REPORTER(ReporterEndpoint, runsAfter = listOf(EvaluatorEndpoint), runAfterFailure = true) { + REPORTER(ReporterEndpoint, dependsOn = listOf(ANALYZER), runsAfter = listOf(EVALUATOR), runAfterFailure = true) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().reporter?.let { config -> context.workerJobRepositories.reporterJobRepository.create(context.ortRun.id, config) @@ -139,7 +133,7 @@ internal enum class WorkerScheduleInfo( configs.reporter != null }, - NOTIFIER(NotifierEndpoint, dependsOn = listOf(ReporterEndpoint), runAfterFailure = true) { + NOTIFIER(NotifierEndpoint, dependsOn = listOf(REPORTER), runAfterFailure = true) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().notifier?.let { config -> context.workerJobRepositories.notifierJobRepository.create(context.ortRun.id, config) @@ -153,82 +147,28 @@ internal enum class WorkerScheduleInfo( configs.notifier != null }; - companion object { - private val entriesByPrefix = entries.associateBy { it.endpoint.configPrefix } - - operator fun get(endpoint: Endpoint<*>): WorkerScheduleInfo = entriesByPrefix.getValue(endpoint.configPrefix) - } - /** * Return the transitive set of the workers that must complete before this one can run. This is necessary to * determine whether this worker can be started in the current phase of an ORT run. Note that it is assumed that * no cycles exist in the dependency graph of workers; otherwise, the scheduler algorithm would have a severe * problem. */ - private val runsAfterTransitively: Set<Endpoint<*>> - get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { WorkerScheduleInfo[it].runsAfterTransitively + it } - - /** - * Check whether a job for the represented worker can be scheduled now based on the given [context]. If so, create - * the job in the database and return a function that schedules the job. - */ - fun createAndScheduleJobIfPossible(context: WorkerScheduleContext): JobScheduleFunc? { - if (!canRun(context)) return null - - return createJob(context)?.let { job -> - { - publishJob(context, job) - - context.workerJobRepositories.updateJobStatus(endpoint, job.id, JobStatus.SCHEDULED, finished = false) - } - } - } + val runsAfterTransitively: Set<WorkerScheduleInfo> + get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { it.runsAfterTransitively + it } /** * Create a new job for this worker based on the information in the given [context]. */ - protected abstract fun createJob(context: WorkerScheduleContext): WorkerJob? + abstract fun createJob(context: WorkerScheduleContext): WorkerJob? /** * Publish a message to the worker endpoint to schedule the given [job] based on the information in the given * [context]. */ - protected abstract fun publishJob(context: WorkerScheduleContext, job: WorkerJob) + abstract fun publishJob(context: WorkerScheduleContext, job: WorkerJob) /** * Return a flag whether this worker is configured to run for the current ORT run based on the given [configs]. */ - protected abstract fun isConfigured(configs: JobConfigurations): Boolean - - /** - * Return a flag whether a job for the represented worker can be started now based on the given [context]. - * This function checks whether this worker is configured to run and whether the jobs it depends on have been - * completed. - */ - private fun canRun(context: WorkerScheduleContext): Boolean = - isConfigured(context.jobConfigs()) && - !context.wasScheduled(endpoint) && - canRunWithFailureState(context) && - dependsOn.all { context.isJobCompleted(it) } && - runsAfterTransitively.none { WorkerScheduleInfo[it].isPending(context) } - - /** - * Check whether the represented worker is pending for the current ORT run based on the given [context]. This - * means that the worker has not yet run, but - given the current state - is supposed to run later. - */ - private fun isPending(context: WorkerScheduleContext): Boolean = - isConfigured(context.jobConfigs()) && - !context.isJobCompleted(endpoint) && - canRunWithFailureState(context) && - dependsOn.all { - context.wasScheduled(it) || - WorkerScheduleInfo[it].isPending(context) - } - - /** - * Check whether the represented worker can be executed for the failure state stored in the given [context]. Here - * a worker can decide whether it can always run or only if all previous workers were successful. - */ - private fun canRunWithFailureState(context: WorkerScheduleContext) = - runAfterFailure || !context.isFailed() + abstract fun isConfigured(configs: JobConfigurations): Boolean } diff --git a/orchestrator/src/test/kotlin/OrtRunInfoTest.kt b/orchestrator/src/test/kotlin/OrtRunInfoTest.kt new file mode 100644 index 0000000000..d8f1f96596 --- /dev/null +++ b/orchestrator/src/test/kotlin/OrtRunInfoTest.kt @@ -0,0 +1,346 @@ +/* + * Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + +package org.eclipse.apoapsis.ortserver.orchestrator + +import io.kotest.core.spec.style.WordSpec +import io.kotest.matchers.collections.beEmpty +import io.kotest.matchers.collections.containExactly +import io.kotest.matchers.should + +import org.eclipse.apoapsis.ortserver.model.JobStatus + +class OrtRunInfoTest : WordSpec({ + "getNextJobs() with all jobs configured" should { + val configuredJobs = WorkerScheduleInfo.entries.toSet() + + "return ANALYZER if no job was created yet" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = emptyMap() + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ANALYZER) + } + + "return nothing if ANALYZER is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return ADVISOR and SCANNER if all previous jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ADVISOR, WorkerScheduleInfo.SCANNER) + } + + "return nothing if ADVISOR is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return nothing if SCANNER is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return EVALUATOR if all previous jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.EVALUATOR) + } + + "return nothing if EVALUATOR is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return REPORTER if all previous jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return nothing if config worker failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = true, + configuredJobs = configuredJobs, + jobInfos = emptyMap() + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return REPORTER if ANALYZER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if ADVISOR failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FAILED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if SCANNER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if EVALUATOR failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return NOTIFIER if REPORTER finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.REPORTER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.NOTIFIER) + } + + "return NOTIFIER if REPORTER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.REPORTER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.NOTIFIER) + } + + "return nothing if all jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = WorkerScheduleInfo.entries.associateWith { + WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + } + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + } + + "getNextJobs() with only ANALYZER and EVALUATOR configured" should { + val configuredJobs = setOf(WorkerScheduleInfo.ANALYZER, WorkerScheduleInfo.EVALUATOR) + + "return ANALYZER if no job was created yet" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = emptyMap() + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ANALYZER) + } + + "return nothing if ANALYZER is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return nothing if ANALYZER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return EVALUATOR if ANALYZER finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.EVALUATOR) + } + + "return nothing if EVALUATOR is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return nothing if all jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + } +})