Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start untangling orchestrator #1739

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
137 changes: 57 additions & 80 deletions orchestrator/src/main/kotlin/Orchestrator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private val log = LoggerFactory.getLogger(Orchestrator::class.java)
* It creates jobs for the single processing steps and passes them to the corresponding workers. It collects the results
* produced by the workers until the complete ORT result is available or the run has failed.
*/
@Suppress("LongParameterList", "TooManyFunctions")
@Suppress("TooManyFunctions")
class Orchestrator(
private val db: Database,
private val workerJobRepositories: WorkerJobRepositories,
Expand All @@ -99,9 +99,8 @@ class Orchestrator(
"Repository '${ortRun.repositoryId}' not found."
}

val context = WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, emptyMap())
context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = true) }
}.scheduleNextJobs {
scheduleConfigWorkerJob(ortRun, header, updateRun = true)
}.onFailure {
log.warn("Failed to handle 'CreateOrtRun' message.", it)
}
}
Expand All @@ -111,10 +110,12 @@ class Orchestrator(
*/
fun handleConfigWorkerResult(header: MessageHeader, configWorkerResult: ConfigWorkerResult) {
db.blockingQueryCatching(transactionIsolation = isolationLevel) {
val ortRun = getCurrentOrtRun(configWorkerResult.ortRunId)
val ortRun = getOrtRun(configWorkerResult.ortRunId)

nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap())
}.scheduleNextJobs {
createWorkerScheduleContext(ortRun, header)
}.onSuccess { context ->
scheduleNextJobs(context)
}.onFailure {
log.warn("Failed to handle 'ConfigWorkerResult' message.", it)
}
}
Expand Down Expand Up @@ -248,11 +249,12 @@ class Orchestrator(
"ORT run '$ortRunId' not found."
}

repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let {
nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header)
}
} ?: (createWorkerSchedulerContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList())
}.scheduleNextJobs {
repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)
createWorkerScheduleContext(getOrtRun(ortRunId), header)
} ?: createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true)
}.onSuccess { context ->
scheduleNextJobs(context)
}.onFailure {
log.warn("Failed to handle 'WorkerError' message.", it)
}
}
Expand All @@ -265,23 +267,26 @@ class Orchestrator(
log.info("Handling a lost schedule for ORT run {}.", lostSchedule.ortRunId)

db.blockingQueryCatching(transactionIsolation = isolationLevel) {
val ortRun = getCurrentOrtRun(lostSchedule.ortRunId)
val context = createWorkerSchedulerContext(ortRun, header)
val ortRun = getOrtRun(lostSchedule.ortRunId)
val context = createWorkerScheduleContext(ortRun, header)

if (context.jobs.isNotEmpty()) {
fetchNextJobs(context)
if (context.jobs.isEmpty()) {
scheduleConfigWorkerJob(ortRun, header, updateRun = false)
null
} else {
context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = false) }
context
}
}.scheduleNextJobs {
}.onSuccess { context ->
context?.let { scheduleNextJobs(context) }
}.onFailure {
log.warn("Failed to handle 'LostSchedule' message.", it)
}
}

/**
* Obtain the [OrtRun] with the given [ortRunId] of fail with an exception if it does not exist.
*/
private fun getCurrentOrtRun(ortRunId: Long): OrtRun =
private fun getOrtRun(ortRunId: Long): OrtRun =
requireNotNull(ortRunRepository.get(ortRunId)) {
"ORT run '$ortRunId' not found."
}
Expand Down Expand Up @@ -332,52 +337,20 @@ class Orchestrator(
val job = workerJobRepositories.updateJobStatus(endpoint, message.jobId, status)
if (issues.isNotEmpty()) ortRunRepository.update(job.ortRunId, issues = issues.asPresent())

nextJobsToSchedule(endpoint, job.ortRunId, header)
}.scheduleNextJobs {
createWorkerScheduleContext(getOrtRun(job.ortRunId), header)
}.onSuccess { context ->
scheduleNextJobs(context)
}.onFailure {
log.warn("Failed to handle '{}' message.", message::class.java.simpleName, it)
}
}

/**
* Determine the next jobs that can be scheduled after a job for the given [endpoint] for the run with the given
* [ortRunId] has completed. Use the given [header] to send messages to the worker endpoints. Optionally,
* accept a map with the [jobs] that have been run. Return a list with the new jobs to schedule and the current
* [WorkerScheduleContext].
*/
private fun nextJobsToSchedule(
endpoint: Endpoint<*>,
ortRunId: Long,
header: MessageHeader,
jobs: Map<String, WorkerJob>? = null
): Pair<WorkerScheduleContext, List<JobScheduleFunc>> {
log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId)

val ortRun = getCurrentOrtRun(ortRunId)
val scheduleContext = createWorkerSchedulerContext(ortRun, header, workerJobs = jobs)

return fetchNextJobs(scheduleContext)
}

/**
* Convenience function to evaluate and process this [Result] with information about the next jobs to be scheduled.
* If the result is successful, actually trigger the jobs. Otherwise, call the given [onFailure] function with the
* exception that occurred.
*/
private fun Result<Pair<WorkerScheduleContext, List<JobScheduleFunc>>>.scheduleNextJobs(
onFailure: (Throwable) -> Unit
) {
onSuccess { (context, schedules) ->
scheduleCreatedJobs(context, schedules)
}
[email protected] { onFailure(it) }
}

/**
* Create a [WorkerScheduleContext] for the given [ortRun] and message [header] with the given [failed] flag.
* The context is initialized with the status of all jobs for this run, either from the given [workerJobs]
* parameter or by loading the job status from the database.
*/
private fun createWorkerSchedulerContext(
private fun createWorkerScheduleContext(
ortRun: OrtRun,
header: MessageHeader,
failed: Boolean = false,
Expand All @@ -390,23 +363,41 @@ class Orchestrator(
return WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, jobs, failed)
}

/**
* Trigger the scheduling of the given new [createdJobs] for the ORT run contained in the given [context]. This
* also includes sending corresponding messages to the worker endpoints.
*/
private fun scheduleCreatedJobs(context: WorkerScheduleContext, createdJobs: CreatedJobs) {
// TODO: Handle errors during job scheduling.
/** Schedule the next jobs for the current ORT run based on the current state of the run. */
private fun scheduleNextJobs(context: WorkerScheduleContext) {
val configuredJobs = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) {
it.isConfigured(context.jobConfigs())
}

val jobInfos = configuredJobs.mapNotNull {
context.jobs[it.endpoint.configPrefix]?.let { job ->
it to WorkerJobInfo(job.id, job.status)
}
}.toMap()

val ortRunInfo = OrtRunInfo(context.ortRun.id, context.failed, configuredJobs, jobInfos)

createdJobs.forEach { it() }
val nextJobs = ortRunInfo.getNextJobs()

if (createdJobs.isEmpty() && !context.hasRunningJobs()) {
nextJobs.forEach { info ->
info.createJob(context)?.let { job ->
// TODO: Handle errors during job scheduling.
info.publishJob(context, job)
context.workerJobRepositories.updateJobStatus(
info.endpoint,
job.id,
JobStatus.SCHEDULED,
finished = false
)
}
}

if (nextJobs.isEmpty() && !context.hasRunningJobs()) {
cleanupJobs(context.ortRun.id)

val ortRunStatus = when {
context.isFailed() -> OrtRunStatus.FAILED

context.isFinishedWithIssues() -> OrtRunStatus.FINISHED_WITH_ISSUES

else -> OrtRunStatus.FINISHED
}

Expand Down Expand Up @@ -466,11 +457,6 @@ class Orchestrator(
)
}

/**
* Type definition to represent a list of jobs that have been created and must be scheduled.
*/
typealias CreatedJobs = List<JobScheduleFunc>

/**
* Create an [Issue] object representing an error that occurred in any [Endpoint].
*/
Expand All @@ -480,12 +466,3 @@ fun <T : Any> Endpoint<T>.createErrorIssue(): Issue = Issue(
message = "The $configPrefix worker failed due to an unexpected error.",
severity = Severity.ERROR
)

/**
* Return a [Pair] with the given [scheduleContext] and the list of jobs that can be scheduled in the current phase
* of the affected ORT run.
*/
private fun fetchNextJobs(
scheduleContext: WorkerScheduleContext
): Pair<WorkerScheduleContext, List<JobScheduleFunc>> =
scheduleContext to WorkerScheduleInfo.entries.mapNotNull { it.createAndScheduleJobIfPossible(scheduleContext) }
79 changes: 79 additions & 0 deletions orchestrator/src/main/kotlin/OrtRunInfo.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

package org.eclipse.apoapsis.ortserver.orchestrator

import org.eclipse.apoapsis.ortserver.model.JobStatus

/** A class to store the required information to determine which jobs can be run. */
internal class OrtRunInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I like the idea to extract the scheduling logic to a dedicated class, I have some problems with the current implementation:

  • IIUC, this class now contains the scheduling logic and is responsible to determine the next jobs that should run. This should also be reflected by the class name. OrtRunInfo is meaningless in this context and rather reminds of a data model class.
  • The relation between this class and WorkerScheduleContext is unclear. Orchestrator now creates a WorkerScheduleContext, and with the help of this context, an OrtRunInfo. This is because the latter has its own state derived from the context (this is not really untangling). It would be better if OrtRunInfo was stateless and only implemented the scheduling strategy. The getNextJobs() function could be passed a WorkerScheduleContext info object and obtain all required information from there.

/** The ORT run ID. */
val id: Long,

/** Whether the config worker has failed. */
val configWorkerFailed: Boolean,

/** The jobs configured to run in this ORT run. */
val configuredJobs: Set<WorkerScheduleInfo>,

/** Status information for already created jobs. */
val jobInfos: Map<WorkerScheduleInfo, WorkerJobInfo>
) {
/** Get the next jobs that can be run. */
fun getNextJobs(): Set<WorkerScheduleInfo> = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { canRun(it) }

/** Return true if the job can be run. */
private fun canRun(info: WorkerScheduleInfo): Boolean =
isConfigured(info) &&
!wasScheduled(info) &&
canRunIfPreviousJobFailed(info) &&
info.dependsOn.all { isCompleted(it) } &&
info.runsAfterTransitively.none { isPending(it) }

/** Return true if no previous job has failed or if the job is configured to run after a failure. */
private fun canRunIfPreviousJobFailed(info: WorkerScheduleInfo): Boolean = info.runAfterFailure || !isFailed()

/** Return true if the job has been completed. */
private fun isCompleted(info: WorkerScheduleInfo): Boolean = jobInfos[info]?.status?.final == true

/** Return true if the job is configured to run. */
private fun isConfigured(info: WorkerScheduleInfo): Boolean = info in configuredJobs

/** Return true if any job has failed. */
private fun isFailed(): Boolean = configWorkerFailed || jobInfos.any { it.value.status == JobStatus.FAILED }

/** Return true if the job is pending execution. */
private fun isPending(info: WorkerScheduleInfo): Boolean =
isConfigured(info) &&
!isCompleted(info) &&
canRunIfPreviousJobFailed(info) &&
info.dependsOn.all { wasScheduled(it) || isPending(it) }

/** Return true if the job has been scheduled. */
private fun wasScheduled(info: WorkerScheduleInfo): Boolean = jobInfos.containsKey(info)
}

/** A class to store information of a worker job required by [OrtRunInfo]. */
internal class WorkerJobInfo(
/** The job ID. */
val id: Long,

/** The job status. */
val status: JobStatus
)
23 changes: 2 additions & 21 deletions orchestrator/src/main/kotlin/WorkerScheduleContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ internal class WorkerScheduleContext(
* jobs that have been run. With this flag, this mechanism can be overridden, which is necessary for workers that
* do not spawn jobs like the Config worker.
*/
private val failed: Boolean = false
val failed: Boolean = false
) {
/**
* Return the [JobConfigurations] object for the current run. Prefer the resolved configurations if available;
Expand All @@ -87,20 +87,7 @@ internal class WorkerScheduleContext(
* Return a flag whether the current [OrtRun] has at least one running job.
*/
fun hasRunningJobs(): Boolean =
jobs.values.any { !it.isCompleted() }

/**
* Return a flag whether the worker job for the given [endpoint] was scheduled for the current ORT run. It may
* still be running or have finished already.
*/
fun wasScheduled(endpoint: Endpoint<*>): Boolean =
endpoint.configPrefix in jobs

/**
* Return a flag whether the worker job for the given [endpoint] has already completed.
*/
fun isJobCompleted(endpoint: Endpoint<*>): Boolean =
jobs[endpoint.configPrefix]?.isCompleted() ?: false
jobs.values.any { !it.status.final }

/**
* Return a flag whether this [OrtRun] has failed, i.e. it has at least one job in failed state.
Expand All @@ -114,9 +101,3 @@ internal class WorkerScheduleContext(
fun isFinishedWithIssues(): Boolean =
!isFailed() && jobs.values.any { it.status == JobStatus.FINISHED_WITH_ISSUES }
}

/**
* Return a flag whether this [WorkerJob] is already completed.
*/
private fun WorkerJob.isCompleted(): Boolean =
status == JobStatus.FINISHED || status == JobStatus.FAILED || status == JobStatus.FINISHED_WITH_ISSUES
Loading
Loading