Skip to content

Commit

Permalink
WM-2419: batch handling workflow outputs (#2671)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvoet authored Dec 15, 2023
1 parent c223b74 commit 795de9d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package org.broadinstitute.dsde.rawls.jobexec

import akka.actor._
import akka.pattern._
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import com.google.api.client.auth.oauth2.Credential
import com.typesafe.scalalogging.LazyLogging
import nl.grons.metrics4.scala.Counter
Expand Down Expand Up @@ -352,13 +355,19 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
// and will be re-processed next time we call queryForWorkflowStatus().
// This is why it's important to attach the outputs before updating the status -- if you update the status to Successful first, and the attach
// outputs fails, we'll stop querying for the workflow status and never attach the outputs.
datasource.inTransactionWithAttrTempTable { dataAccess =>
handleOutputs(workflowsWithOutputs, dataAccess)
} recoverWith {
// If there is something fatally wrong handling outputs for any workflow, mark all the workflows as failed
case fatal: RawlsFatalExceptionWithErrorReport =>
markWorkflowsFailed(allWorkflows, fatal)
} flatMap { _ =>
// traverse and IO stuff ensures serial execution of the batches
batchWorkflowsWithOutputs(workflowsWithOutputs)
.traverse { batch =>
IO.fromFuture(IO(datasource.inTransactionWithAttrTempTable { dataAccess =>
handleOutputs(batch, dataAccess)
}))
}
.unsafeToFuture()
.recoverWith {
// If there is something fatally wrong handling outputs for any workflow, mark all the workflows as failed
case fatal: RawlsFatalExceptionWithErrorReport =>
markWorkflowsFailed(allWorkflows, fatal)
} flatMap { _ =>
// NEW TXN! Update statuses for workflows and submission.
datasource.inTransaction { dataAccess =>
// Refetch workflows as some may have been marked as Failed by handleOutputs.
Expand Down Expand Up @@ -389,6 +398,24 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
}
}

/**
* Batch workflowsWithOutputs into groups so as not to exceed config.attributeUpdatesPerWorkflow attributes per transaction
*
* @param workflowsWithOutputs
* @return
*/
def batchWorkflowsWithOutputs(
workflowsWithOutputs: Seq[(WorkflowRecord, ExecutionServiceOutputs)]
): List[Seq[(WorkflowRecord, ExecutionServiceOutputs)]] = {
// batch workfowsWithOutpus into groups so as not to exceed config.attributeUpdatesPerWorkflow attributes per transaction
val countOfAllAttributes = workflowsWithOutputs.foldLeft(0) { case (subTotal, (_, outputs)) =>
attributeCount(outputs.outputs.values.collect { case Left(output) => output }) + subTotal
}
// plus 1 because integer division rounds down
val batchCount = countOfAllAttributes / config.attributeUpdatesPerWorkflow + 1
workflowsWithOutputs.grouped(workflowsWithOutputs.size / batchCount + 1).toList
}

private def toThurloeNotification(submission: Submission,
workspaceName: WorkspaceName,
finalStatus: SubmissionStatus,
Expand Down Expand Up @@ -666,7 +693,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
val (optEntityUpdates, optWs) = updates

val entityAttributeCount = optEntityUpdates map { update: WorkflowEntityUpdate =>
val cnt = attributeCount(update.upserts)
val cnt = attributeCount(update.upserts.values)
logger.debug(
s"Updating $cnt attribute values for entity ${update.entityRef.entityName} of type ${update.entityRef.entityType} in ${submissionId.toString}/${workflowRecord.externalId
.getOrElse("MISSING_WORKFLOW")}. ${safePrint(workspace.attributes)}"
Expand All @@ -675,7 +702,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
} getOrElse 0

val workspaceAttributeCount = optWs map { workspace: Workspace =>
val cnt = attributeCount(workspace.attributes)
val cnt = attributeCount(workspace.attributes.values)
logger.debug(
s"Updating $cnt attribute values for workspace in ${submissionId.toString}/${workflowRecord.externalId
.getOrElse("MISSING_WORKFLOW")}. ${safePrint(workspace.attributes)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,36 @@ class SubmissionMonitorSpec(_system: ActorSystem)
}
}

Set((5, 1000), (5, 1002), (5000, 1002), (5000, 0)).foreach { case (workflowsPerBatch, workflowCount) =>
it should s"batchWorkflowsWithOutputs workflowsPerBatch=$workflowsPerBatch, workflowCount=$workflowCount" in withDefaultTestDatabase {
dataSource: SlickDataSource =>
val status = WorkflowStatuses.Succeeded
val attributesPerWorkflow = outputs.outputs.size * workflowsPerBatch
val monitor = createSubmissionMonitor(
dataSource,
mockSamDAO,
mockGoogleServicesDAO,
testData.submissionUpdateEntity,
testData.wsName,
new SubmissionTestExecutionServiceDAO(status.toString),
attributesPerWorkflow
)

val workflowsRecs = runAndWait(
workflowQuery.listWorkflowRecsForSubmission(UUID.fromString(testData.submissionUpdateEntity.submissionId))
)
val result = monitor.batchWorkflowsWithOutputs(Seq.fill(workflowCount)((workflowsRecs.head, outputs))).size
val expected = if (workflowCount % workflowsPerBatch == 0) {
workflowCount / workflowsPerBatch
} else {
workflowCount / workflowsPerBatch + 1
}
assertResult(expected) {
result
}
}
}

it should "queryExecutionServiceForStatus submitted" in withDefaultTestDatabase { dataSource: SlickDataSource =>
val monitor = createSubmissionMonitor(
dataSource,
Expand Down Expand Up @@ -1901,9 +1931,10 @@ class SubmissionMonitorSpec(_system: ActorSystem)
googleServicesDAO: GoogleServicesDAO,
submission: Submission,
wsName: WorkspaceName,
execSvcDAO: ExecutionServiceDAO
execSvcDAO: ExecutionServiceDAO,
attributesPerWorkflow: Int = 10
): SubmissionMonitor = {
val config = SubmissionMonitorConfig(1 minutes, 30 days, true, 10, true)
val config = SubmissionMonitorConfig(1 minutes, 30 days, true, attributesPerWorkflow, true)
new TestSubmissionMonitor(
wsName,
UUID.fromString(submission.submissionId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ object Attributable {
.map(AttributeName.withDefaultNS)
type AttributeMap = Map[AttributeName, Attribute]

def attributeCount(map: AttributeMap): Int = {
def attributeCount(values: Iterable[Attribute]): Int = {
def countAttributes(attribute: Attribute): Int =
attribute match {
case _: AttributeListElementable => 1
case attributeList: AttributeList[_] => attributeList.list.map(countAttributes).sum
}

map.values.map(countAttributes).sum
values.map(countAttributes).sum
}

def safePrint(map: AttributeMap, depth: Int = 10): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,12 @@ class WorkspaceModelSpec extends AnyFreeSpec with Matchers {
)
)

Attributable.attributeCount(map1) shouldBe 1
Attributable.attributeCount(map2) shouldBe 0
Attributable.attributeCount(map3) shouldBe 0
Attributable.attributeCount(map4) shouldBe 3
Attributable.attributeCount(map5) shouldBe 2
Attributable.attributeCount(map6) shouldBe 6
Attributable.attributeCount(map1.values) shouldBe 1
Attributable.attributeCount(map2.values) shouldBe 0
Attributable.attributeCount(map3.values) shouldBe 0
Attributable.attributeCount(map4.values) shouldBe 3
Attributable.attributeCount(map5.values) shouldBe 2
Attributable.attributeCount(map6.values) shouldBe 6
}
}

Expand Down

0 comments on commit 795de9d

Please sign in to comment.