diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponent.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponent.scala index efbd80179a..0050468923 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponent.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponent.scala @@ -1,9 +1,11 @@ package org.broadinstitute.dsde.rawls.dataaccess.slick import akka.http.scaladsl.model.StatusCodes +import io.opencensus.scala.Tracing._ import org.broadinstitute.dsde.rawls.model.Attributable.AttributeMap import org.broadinstitute.dsde.rawls.model.AttributeName.toDelimitedName import org.broadinstitute.dsde.rawls.model._ +import org.broadinstitute.dsde.rawls.util.TracingUtils.traceDBIOWithParent import org.broadinstitute.dsde.rawls.{RawlsException, RawlsExceptionWithErrorReport, RawlsFatalExceptionWithErrorReport} import slick.ast.{BaseTypedType, TypedType} import slick.dbio.Effect.Write @@ -596,17 +598,26 @@ trait AttributeComponent { def patchAttributesAction(inserts: Traversable[RECORD], updates: Traversable[RECORD], deleteIds: Traversable[Long], - insertFunction: Seq[RECORD] => String => WriteAction[Int] + insertFunction: Seq[RECORD] => String => WriteAction[Int], + tracingContext: RawlsTracingContext ) = - for { - _ <- if (deleteIds.nonEmpty) deleteAttributeRecordsById(deleteIds.toSeq) else DBIO.successful(0) - _ <- if (inserts.nonEmpty) batchInsertAttributes(inserts.toSeq) else DBIO.successful(0) - updateResult <- - if (updates.nonEmpty) - AlterAttributesUsingScratchTableQueries.updateAction(insertFunction(updates.toSeq)) - else - DBIO.successful(0) - } yield updateResult + traceDBIOWithParent("patchAttributesAction", tracingContext) { span => + for { + _ <- + if (deleteIds.nonEmpty) + traceDBIOWithParent("deleteAttributeRecordsById", span)(_ => deleteAttributeRecordsById(deleteIds.toSeq)) + else DBIO.successful(0) + _ <- + if (inserts.nonEmpty) + traceDBIOWithParent("batchInsertAttributes", span)(_ => batchInsertAttributes(inserts.toSeq)) + else DBIO.successful(0) + updateResult <- + if (updates.nonEmpty) + AlterAttributesUsingScratchTableQueries.updateAction(insertFunction(updates.toSeq), span) + else + DBIO.successful(0) + } yield updateResult + } def deleteAttributes(workspaceContext: Workspace, entityType: String, attributeNames: Set[AttributeName]) = workspaceQuery.updateLastModified(workspaceContext.workspaceIdAsUUID) andThen @@ -696,80 +707,84 @@ trait AttributeComponent { def rewriteAttrsAction(attributesToSave: Traversable[RECORD], existingAttributes: Traversable[RECORD], insertFunction: Seq[RECORD] => String => WriteAction[Int] - ): ReadWriteAction[Set[OWNER_ID]] = { - val toSaveAttrMap = toPrimaryKeyMap(attributesToSave) - val existingAttrMap = toPrimaryKeyMap(existingAttributes) - - // note that currently-existing attributes will have a populated id e.g. "1234", but to-save will have an id of "0" - // therefore, we use this ComparableRecord class which omits the id when checking equality between existing and to-save. - // note this does not include transactionId for AttributeScratchRecords. We do not expect AttributeScratchRecords - // here, and transactionId will eventually be going away, so don't bother - object ComparableRecord { - def fromRecord(rec: RECORD): ComparableRecord = - new ComparableRecord( - rec.ownerId, - rec.namespace, - rec.name, - rec.valueString, - rec.valueNumber, - rec.valueBoolean, - rec.valueJson, - rec.valueEntityRef, - rec.listIndex, - rec.listLength, - rec.deleted, - rec.deletedDate + ): ReadWriteAction[Set[OWNER_ID]] = + traceDBIOWithParent("AttributeComponent.rewriteAttrsAction", RawlsTracingContext(Option.empty)) { + tracingContext => + val toSaveAttrMap = toPrimaryKeyMap(attributesToSave) + val existingAttrMap = toPrimaryKeyMap(existingAttributes) + + // note that currently-existing attributes will have a populated id e.g. "1234", but to-save will have an id of "0" + // therefore, we use this ComparableRecord class which omits the id when checking equality between existing and to-save. + // note this does not include transactionId for AttributeScratchRecords. We do not expect AttributeScratchRecords + // here, and transactionId will eventually be going away, so don't bother + object ComparableRecord { + def fromRecord(rec: RECORD): ComparableRecord = + new ComparableRecord( + rec.ownerId, + rec.namespace, + rec.name, + rec.valueString, + rec.valueNumber, + rec.valueBoolean, + rec.valueJson, + rec.valueEntityRef, + rec.listIndex, + rec.listLength, + rec.deleted, + rec.deletedDate + ) + } + + case class ComparableRecord( + ownerId: OWNER_ID, + namespace: String, + name: String, + valueString: Option[String], + valueNumber: Option[Double], + valueBoolean: Option[Boolean], + valueJson: Option[String], + valueEntityRef: Option[Long], + listIndex: Option[Int], + listLength: Option[Int], + deleted: Boolean, + deletedDate: Option[Timestamp] ) - } - case class ComparableRecord( - ownerId: OWNER_ID, - namespace: String, - name: String, - valueString: Option[String], - valueNumber: Option[Double], - valueBoolean: Option[Boolean], - valueJson: Option[String], - valueEntityRef: Option[Long], - listIndex: Option[Int], - listLength: Option[Int], - deleted: Boolean, - deletedDate: Option[Timestamp] - ) + // create a set of ComparableRecords representing the existing attributes + val existingAttributesSet: Set[ComparableRecord] = + existingAttributes.toSet.map(r => ComparableRecord.fromRecord(r)) - // create a set of ComparableRecords representing the existing attributes - val existingAttributesSet: Set[ComparableRecord] = - existingAttributes.toSet.map(r => ComparableRecord.fromRecord(r)) + val existingKeys = existingAttrMap.keySet - val existingKeys = existingAttrMap.keySet + // insert attributes which are in save but not exists + val attributesToInsert = toSaveAttrMap.filterKeys(!existingKeys.contains(_)) - // insert attributes which are in save but not exists - val attributesToInsert = toSaveAttrMap.filterKeys(!existingKeys.contains(_)) + // delete attributes which are in exists but not save + val attributesToDelete = existingAttrMap.filterKeys(!toSaveAttrMap.keySet.contains(_)) - // delete attributes which are in exists but not save - val attributesToDelete = existingAttrMap.filterKeys(!toSaveAttrMap.keySet.contains(_)) + val attributesToUpdate = toSaveAttrMap.filter { case (k, v) => + existingKeys.contains(k) && // if the attribute doesn't already exist, don't attempt to update it + !existingAttributesSet.contains( + ComparableRecord.fromRecord(v) + ) // if the attribute exists and is unchanged, don't update it + } - val attributesToUpdate = toSaveAttrMap.filter { case (k, v) => - existingKeys.contains(k) && // if the attribute doesn't already exist, don't attempt to update it - !existingAttributesSet.contains( - ComparableRecord.fromRecord(v) - ) // if the attribute exists and is unchanged, don't update it + // collect the parent objects (e.g. entity, workspace) that have writes, so we know which object rows to re-calculate + val ownersWithWrites: Set[OWNER_ID] = (attributesToInsert.values.map(_.ownerId) ++ + attributesToUpdate.values.map(_.ownerId) ++ + attributesToDelete.values.map(_.ownerId)).toSet + + // perform the inserts/updates/deletes + patchAttributesAction( + attributesToInsert.values, + attributesToUpdate.values, + attributesToDelete.values.map(_.id), + insertFunction, + tracingContext + ) + .map(_ => ownersWithWrites) } - // collect the parent objects (e.g. entity, workspace) that have writes, so we know which object rows to re-calculate - val ownersWithWrites: Set[OWNER_ID] = (attributesToInsert.values.map(_.ownerId) ++ - attributesToUpdate.values.map(_.ownerId) ++ - attributesToDelete.values.map(_.ownerId)).toSet - - // perform the inserts/updates/deletes - patchAttributesAction(attributesToInsert.values, - attributesToUpdate.values, - attributesToDelete.values.map(_.id), - insertFunction - ) - .map(_ => ownersWithWrites) - } - // noinspection SqlDialectInspection object AlterAttributesUsingScratchTableQueries extends RawSqlQuery { val driver: JdbcProfile = AttributeComponent.this.driver @@ -809,12 +824,15 @@ trait AttributeComponent { else DBIO.successful(0) } - def updateAction(insertIntoScratchFunction: String => WriteAction[Int]) = { - val transactionId = UUID.randomUUID().toString - insertIntoScratchFunction(transactionId) andThen - updateInMasterAction(transactionId) andThen - clearAttributeScratchTableAction(transactionId) - } + def updateAction(insertIntoScratchFunction: String => WriteAction[Int], tracingContext: RawlsTracingContext) = + traceDBIOWithParent("updateAction", tracingContext) { span => + val transactionId = UUID.randomUUID().toString + traceDBIOWithParent("insertIntoScratchFunction", span)(_ => insertIntoScratchFunction(transactionId)) andThen + traceDBIOWithParent("updateInMasterAction", span)(_ => updateInMasterAction(transactionId)) andThen + traceDBIOWithParent("clearAttributeScratchTableAction", span)(_ => + clearAttributeScratchTableAction(transactionId) + ) + } } def unmarshalAttributes[ID]( diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponent.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponent.scala index 0c96208428..d203433262 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponent.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponent.scala @@ -1053,27 +1053,31 @@ trait EntityComponent { private def applyEntityPatch(workspaceContext: Workspace, entityRecord: EntityRecord, upserts: AttributeMap, - deletes: Traversable[AttributeName] - ) = - // yank the attribute list for this entity to determine what to do with upserts - entityAttributeShardQuery(workspaceContext) - .findByOwnerQuery(Seq(entityRecord.id)) - .map(attr => (attr.namespace, attr.name, attr.id)) - .result flatMap { attrCols => - val existingAttrsToRecordIds: Map[AttributeName, Set[Long]] = - attrCols - .groupBy { case (namespace, name, _) => - (namespace, name) - } - .map { case ((namespace, name), ids) => - AttributeName(namespace, name) -> ids - .map(_._3) - .toSet // maintain the full list of attribute ids since list members are stored as individual attributes - } + deletes: Traversable[AttributeName], + tracingContext: RawlsTracingContext + ) = { + traceDBIOWithParent("applyEntityPatch", tracingContext) { span => + // yank the attribute list for this entity to determine what to do with upserts + traceDBIOWithParent("findByOwnerQuery", span)(_ => + entityAttributeShardQuery(workspaceContext) + .findByOwnerQuery(Seq(entityRecord.id)) + .map(attr => (attr.namespace, attr.name, attr.id)) + .result + ) flatMap { attrCols => + val existingAttrsToRecordIds: Map[AttributeName, Set[Long]] = + attrCols + .groupBy { case (namespace, name, _) => + (namespace, name) + } + .map { case ((namespace, name), ids) => + AttributeName(namespace, name) -> ids + .map(_._3) + .toSet // maintain the full list of attribute ids since list members are stored as individual attributes + } - val entityRefsToLookup = upserts.valuesIterator.collect { case e: AttributeEntityReference => e }.toSet + val entityRefsToLookup = upserts.valuesIterator.collect { case e: AttributeEntityReference => e }.toSet - /* + /* Additional check for resizing entities whose Attribute value type is AttributeList[_] in response to bugs mentioned in WA-32 and WA-153. Currently the update query is such that it matches the listIndex of each attribute value and updates the list index and list size. Previously if the size of the list changes, those @@ -1084,45 +1088,46 @@ trait EntityComponent { or delete extra records from the entity table respectively. NOTE: Attributes that are not lists are treated as a list of size one. - */ + */ + + // Tuple of + // insertRecs: Seq[EntityAttributeRecord] + // updateRecs: Seq[EntityAttributeRecord] + // extraDeleteIds: Seq[Long] + type AttributeModifications = (Seq[EntityAttributeRecord], Seq[EntityAttributeRecord], Seq[Long]) + + def checkAndUpdateRecSize(name: AttributeName, + existingAttrSize: Int, + updateAttrSize: Int, + attrRecords: Seq[EntityAttributeRecord] + ): AttributeModifications = + if (updateAttrSize > existingAttrSize) { + // since the size of the "list" has increased, move these new records to insertRecs + val (newInsertRecs, newUpdateRecs) = attrRecords.partition { + _.listIndex.getOrElse(0) > (existingAttrSize - 1) + } - // Tuple of - // insertRecs: Seq[EntityAttributeRecord] - // updateRecs: Seq[EntityAttributeRecord] - // extraDeleteIds: Seq[Long] - type AttributeModifications = (Seq[EntityAttributeRecord], Seq[EntityAttributeRecord], Seq[Long]) - - def checkAndUpdateRecSize(name: AttributeName, - existingAttrSize: Int, - updateAttrSize: Int, - attrRecords: Seq[EntityAttributeRecord] - ): AttributeModifications = - if (updateAttrSize > existingAttrSize) { - // since the size of the "list" has increased, move these new records to insertRecs - val (newInsertRecs, newUpdateRecs) = attrRecords.partition { - _.listIndex.getOrElse(0) > (existingAttrSize - 1) + (newInsertRecs, newUpdateRecs, Seq.empty[Long]) + } else if (updateAttrSize < existingAttrSize) { + // since the size of the list has decreased, delete the extra rows from table + val deleteIds = existingAttrsToRecordIds(name).toSeq.takeRight(existingAttrSize - updateAttrSize) + (Seq.empty[EntityAttributeRecord], attrRecords, deleteIds) + } else (Seq.empty[EntityAttributeRecord], attrRecords, Seq.empty[Long]) // the list size hasn't changed + + def recordsForUpdateAttribute(name: AttributeName, + attribute: Attribute, + attrRecords: Seq[EntityAttributeRecord] + ): AttributeModifications = { + val existingAttrSize = existingAttrsToRecordIds.get(name).map(_.size).getOrElse(0) + attribute match { + case list: AttributeList[_] => checkAndUpdateRecSize(name, existingAttrSize, list.list.size, attrRecords) + case _ => checkAndUpdateRecSize(name, existingAttrSize, 1, attrRecords) } - - (newInsertRecs, newUpdateRecs, Seq.empty[Long]) - } else if (updateAttrSize < existingAttrSize) { - // since the size of the list has decreased, delete the extra rows from table - val deleteIds = existingAttrsToRecordIds(name).toSeq.takeRight(existingAttrSize - updateAttrSize) - (Seq.empty[EntityAttributeRecord], attrRecords, deleteIds) - } else (Seq.empty[EntityAttributeRecord], attrRecords, Seq.empty[Long]) // the list size hasn't changed - - def recordsForUpdateAttribute(name: AttributeName, - attribute: Attribute, - attrRecords: Seq[EntityAttributeRecord] - ): AttributeModifications = { - val existingAttrSize = existingAttrsToRecordIds.get(name).map(_.size).getOrElse(0) - attribute match { - case list: AttributeList[_] => checkAndUpdateRecSize(name, existingAttrSize, list.list.size, attrRecords) - case _ => checkAndUpdateRecSize(name, existingAttrSize, 1, attrRecords) } - } - lookupNotYetLoadedReferences(workspaceContext, entityRefsToLookup, Seq(entityRecord.toReference)) flatMap { - entityRefRecs => + traceDBIOWithParent("lookupNotYetLoadedReferences", span)(_ => + lookupNotYetLoadedReferences(workspaceContext, entityRefsToLookup, Seq(entityRecord.toReference)) + ) flatMap { entityRefRecs => val allTheEntityRefs = entityRefRecs ++ Seq(entityRecord) // re-add the current entity val refsToIds = allTheEntityRefs.map(e => e.toReference -> e.id).toMap @@ -1156,48 +1161,69 @@ trait EntityComponent { insertRecs, updateRecs, totalDeleteIds, - entityAttributeTempQuery.insertScratchAttributes + entityAttributeTempQuery.insertScratchAttributes, + span ) + } } } + } // "patch" this entity by applying the upserts and the deletes to its attributes, then save. a little more database efficient than a "full" save, but requires the entity already exist. def saveEntityPatch(workspaceContext: Workspace, entityRef: AttributeEntityReference, upserts: AttributeMap, - deletes: Traversable[AttributeName] - ) = { - val deleteIntersectUpsert = deletes.toSet intersect upserts.keySet - if (upserts.isEmpty && deletes.isEmpty) { - DBIO.successful(()) // no-op - } else if (deleteIntersectUpsert.nonEmpty) { - DBIO.failed( - new RawlsException( - s"Can't saveEntityPatch on $entityRef because upserts and deletes share attributes $deleteIntersectUpsert" + deletes: Traversable[AttributeName], + tracingContext: RawlsTracingContext + ) = + traceDBIOWithParent("saveEntityPatch", tracingContext) { span => + span.tracingSpan.foreach { s => + s.putAttribute("workspaceId", OpenCensusAttributeValue.stringAttributeValue(workspaceContext.workspaceId)) + s.putAttribute("workspace", + OpenCensusAttributeValue.stringAttributeValue(workspaceContext.toWorkspaceName.toString) ) - ) - } else { - getEntityRecords(workspaceContext.workspaceIdAsUUID, Set(entityRef)) flatMap { entityRecs => - if (entityRecs.length != 1) { - throw new RawlsException( - s"saveEntityPatch looked up $entityRef expecting 1 record, got ${entityRecs.length} instead" + s.putAttribute("entityType", OpenCensusAttributeValue.stringAttributeValue(entityRef.entityType)) + s.putAttribute("entityName", OpenCensusAttributeValue.stringAttributeValue(entityRef.entityName)) + s.putAttribute("numUpserts", OpenCensusAttributeValue.longAttributeValue(upserts.size)) + s.putAttribute("numDeletes", OpenCensusAttributeValue.longAttributeValue(deletes.size)) + } + val deleteIntersectUpsert = deletes.toSet intersect upserts.keySet + if (upserts.isEmpty && deletes.isEmpty) { + DBIO.successful(()) // no-op + } else if (deleteIntersectUpsert.nonEmpty) { + DBIO.failed( + new RawlsException( + s"Can't saveEntityPatch on $entityRef because upserts and deletes share attributes $deleteIntersectUpsert" ) - } + ) + } else { + traceDBIOWithParent("getEntityRecords", span)(_ => + getEntityRecords(workspaceContext.workspaceIdAsUUID, Set(entityRef)) + ) flatMap { entityRecs => + if (entityRecs.length != 1) { + throw new RawlsException( + s"saveEntityPatch looked up $entityRef expecting 1 record, got ${entityRecs.length} instead" + ) + } - val entityRecord = entityRecs.head - upserts.keys.foreach { attrName => - validateUserDefinedString(attrName.name) - validateAttributeName(attrName, entityRecord.entityType) - } + val entityRecord = entityRecs.head + upserts.keys.foreach { attrName => + validateUserDefinedString(attrName.name) + validateAttributeName(attrName, entityRecord.entityType) + } - for { - _ <- applyEntityPatch(workspaceContext, entityRecord, upserts, deletes) - updatedEntities <- entityQuery.getEntities(workspaceContext.workspaceIdAsUUID, Seq(entityRecord.id)) - _ <- entityQueryWithInlineAttributes.optimisticLockUpdate(entityRecs, updatedEntities.map(elem => elem._2)) - } yield {} + for { + _ <- applyEntityPatch(workspaceContext, entityRecord, upserts, deletes, span) + updatedEntities <- traceDBIOWithParent("getEntities", span)(_ => + entityQuery.getEntities(workspaceContext.workspaceIdAsUUID, Seq(entityRecord.id)) + ) + _ <- traceDBIOWithParent("optimisticLockUpdate", span)(_ => + entityQueryWithInlineAttributes.optimisticLockUpdate(entityRecs, updatedEntities.map(elem => elem._2)) + ) + } yield {} + } } } - } private def lookupNotYetLoadedReferences(workspaceContext: Workspace, entities: Traversable[Entity], diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala index 7966b20ab4..db0604720e 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala @@ -7,6 +7,7 @@ import cats.effect.unsafe.implicits.global import cats.implicits._ import com.google.api.client.auth.oauth2.Credential import com.typesafe.scalalogging.LazyLogging +import io.opencensus.trace.{AttributeValue => OpenCensusAttributeValue} import nl.grons.metrics4.scala.Counter import org.broadinstitute.dsde.rawls.coordination.DataSourceAccess import org.broadinstitute.dsde.rawls.dataaccess._ @@ -28,6 +29,7 @@ import org.broadinstitute.dsde.rawls.model.SubmissionStatuses.SubmissionStatus import org.broadinstitute.dsde.rawls.model.WorkflowStatuses.WorkflowStatus import org.broadinstitute.dsde.rawls.model._ import org.broadinstitute.dsde.rawls.util.{addJitter, AuthUtil, FutureSupport} +import org.broadinstitute.dsde.rawls.util.TracingUtils.{trace, traceDBIOWithParent, traceWithParent} import org.broadinstitute.dsde.rawls.{RawlsException, RawlsFatalExceptionWithErrorReport} import org.broadinstitute.dsde.workbench.dataaccess.NotificationDAO import org.broadinstitute.dsde.workbench.model.{Notifications, WorkbenchUserId} @@ -322,81 +324,96 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum */ def handleStatusResponses( response: ExecutionServiceStatusResponse - )(implicit executionContext: ExecutionContext): Future[StatusCheckComplete] = { - response.statusResponse.collect { case Failure(t) => t }.foreach { t => - logger.error(s"Failure monitoring workflow in submission $submissionId", t) - } - - // all workflow records in this status response list - val allWorkflows = response.statusResponse.collect { case Success(Some((aWorkflow, _))) => - aWorkflow - } - - // just the workflow records in this response list which have outputs - val workflowsWithOutputs = response.statusResponse.collect { case Success(Some((workflowRec, Some(outputs)))) => - (workflowRec, outputs) - } + )(implicit executionContext: ExecutionContext): Future[StatusCheckComplete] = + trace("SubmissionMonitorActor.handleStatusResponses") { rootSpan => + response.statusResponse.collect { case Failure(t) => t }.foreach { t => + logger.error(s"Failure monitoring workflow in submission $submissionId", t) + } - def markWorkflowsFailed(workflows: Seq[WorkflowRecord], fatal: RawlsFatalExceptionWithErrorReport) = { - logger.error( - s"Marking ${workflows.length} workflows as failed handling outputs in $submissionId with user-visible reason ${fatal.toString})" - ) - datasource.inTransaction { dataAccess => - DBIO.sequence(workflows map { workflowRecord => - dataAccess.workflowQuery.updateStatus(workflowRecord, WorkflowStatuses.Failed) andThen - dataAccess.workflowQuery.saveMessages(Seq(AttributeString(fatal.toString)), workflowRecord.id) - }) + // all workflow records in this status response list + val allWorkflows = response.statusResponse.collect { case Success(Some((aWorkflow, _))) => + aWorkflow } - } - // Attach the outputs in a txn of their own. - // If attaching outputs fails for legit reasons (e.g. they're missing), it will mark the workflow as failed. This is correct. - // If attaching outputs throws an exception (because e.g. deadlock or ConcurrentModificationException), the status will remain un-updated - // and will be re-processed next time we call queryForWorkflowStatus(). - // This is why it's important to attach the outputs before updating the status -- if you update the status to Successful first, and the attach - // outputs fails, we'll stop querying for the workflow status and never attach the outputs. - // traverse and IO stuff ensures serial execution of the batches - batchWorkflowsWithOutputs(workflowsWithOutputs) - .traverse { batch => - IO.fromFuture(IO(datasource.inTransactionWithAttrTempTable { dataAccess => - handleOutputs(batch, dataAccess) - })) + // just the workflow records in this response list which have outputs + val workflowsWithOutputs = response.statusResponse.collect { case Success(Some((workflowRec, Some(outputs)))) => + (workflowRec, outputs) } - .unsafeToFuture() - .recoverWith { - // If there is something fatally wrong handling outputs for any workflow, mark all the workflows as failed - case fatal: RawlsFatalExceptionWithErrorReport => - markWorkflowsFailed(allWorkflows, fatal) - } flatMap { _ => - // NEW TXN! Update statuses for workflows and submission. - datasource.inTransaction { dataAccess => - // Refetch workflows as some may have been marked as Failed by handleOutputs. - dataAccess.workflowQuery.findWorkflowByIds(allWorkflows.map(_.id)).result flatMap { updatedRecs => - // New statuses according to the execution service. - val workflowIdToNewStatus = allWorkflows.map(workflowRec => workflowRec.id -> workflowRec.status).toMap - - // No need to update statuses for any workflows that are in terminal statuses. - // Doing so would potentially overwrite them with the execution service status if they'd been marked as failed by attachOutputs. - val workflowsToUpdate = updatedRecs.filter(rec => - !WorkflowStatuses.terminalStatuses.contains(WorkflowStatuses.withName(rec.status)) - ) - val workflowsWithNewStatuses = workflowsToUpdate.map(rec => rec.copy(status = workflowIdToNewStatus(rec.id))) - // to minimize database updates batch 1 update per workflow status - DBIO.sequence(workflowsWithNewStatuses.groupBy(_.status).map { case (status, recs) => - dataAccess.workflowQuery.batchUpdateStatus(recs, WorkflowStatuses.withName(status)) + def markWorkflowsFailed(workflows: Seq[WorkflowRecord], fatal: RawlsFatalExceptionWithErrorReport) = { + logger.error( + s"Marking ${workflows.length} workflows as failed handling outputs in $submissionId with user-visible reason ${fatal.toString})" + ) + datasource.inTransaction { dataAccess => + DBIO.sequence(workflows map { workflowRecord => + dataAccess.workflowQuery.updateStatus(workflowRecord, WorkflowStatuses.Failed) andThen + dataAccess.workflowQuery.saveMessages(Seq(AttributeString(fatal.toString)), workflowRecord.id) }) + } + } + + // Attach the outputs in a txn of their own. + // If attaching outputs fails for legit reasons (e.g. they're missing), it will mark the workflow as failed. This is correct. + // If attaching outputs throws an exception (because e.g. deadlock or ConcurrentModificationException), the status will remain un-updated + // and will be re-processed next time we call queryForWorkflowStatus(). + // This is why it's important to attach the outputs before updating the status -- if you update the status to Successful first, and the attach + // outputs fails, we'll stop querying for the workflow status and never attach the outputs. + // traverse and IO stuff ensures serial execution of the batches + val batchedWorkflowsWithOutputs = batchWorkflowsWithOutputs(workflowsWithOutputs).zipWithIndex + + traceWithParent("batchedWorkflowsWithOutputs", rootSpan) { span => + span.tracingSpan.foreach { s => + s.putAttribute("numBatches", OpenCensusAttributeValue.longAttributeValue(batchedWorkflowsWithOutputs.length)) + } - } flatMap { _ => - // update submission after workflows are updated - updateSubmissionStatus(dataAccess) map { shouldStop: Boolean => - // return a message about whether our submission is done entirely - StatusCheckComplete(shouldStop) + batchedWorkflowsWithOutputs + .traverse { case (batch, idx) => + IO.fromFuture(IO(datasource.inTransactionWithAttrTempTable { dataAccess => + traceDBIOWithParent(s"batch", span) { innerSpan => + innerSpan.tracingSpan.foreach { s => + s.putAttribute("batchIndex", OpenCensusAttributeValue.longAttributeValue(idx)) + } + handleOutputs(batch, dataAccess, innerSpan) + } + })) + } + .unsafeToFuture() + .recoverWith { + // If there is something fatally wrong handling outputs for any workflow, mark all the workflows as failed + case fatal: RawlsFatalExceptionWithErrorReport => + markWorkflowsFailed(allWorkflows, fatal) + } + } flatMap { _ => + // NEW TXN! Update statuses for workflows and submission. + datasource.inTransaction { dataAccess => + // Refetch workflows as some may have been marked as Failed by handleOutputs. + dataAccess.workflowQuery.findWorkflowByIds(allWorkflows.map(_.id)).result flatMap { updatedRecs => + // New statuses according to the execution service. + val workflowIdToNewStatus = allWorkflows.map(workflowRec => workflowRec.id -> workflowRec.status).toMap + + // No need to update statuses for any workflows that are in terminal statuses. + // Doing so would potentially overwrite them with the execution service status if they'd been marked as failed by attachOutputs. + val workflowsToUpdate = updatedRecs.filter(rec => + !WorkflowStatuses.terminalStatuses.contains(WorkflowStatuses.withName(rec.status)) + ) + val workflowsWithNewStatuses = + workflowsToUpdate.map(rec => rec.copy(status = workflowIdToNewStatus(rec.id))) + + // to minimize database updates batch 1 update per workflow status + DBIO.sequence(workflowsWithNewStatuses.groupBy(_.status).map { case (status, recs) => + dataAccess.workflowQuery.batchUpdateStatus(recs, WorkflowStatuses.withName(status)) + }) + + } flatMap { _ => + // update submission after workflows are updated + updateSubmissionStatus(dataAccess) map { shouldStop: Boolean => + // return a message about whether our submission is done entirely + StatusCheckComplete(shouldStop) + } } } } } - } /** * Batch workflowsWithOutputs into groups so as not to exceed config.attributeUpdatesPerWorkflow attributes per transaction @@ -537,49 +554,71 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum } } - def handleOutputs(workflowsWithOutputs: Seq[(WorkflowRecord, ExecutionServiceOutputs)], dataAccess: DataAccess)( - implicit executionContext: ExecutionContext + def handleOutputs(workflowsWithOutputs: Seq[(WorkflowRecord, ExecutionServiceOutputs)], + dataAccess: DataAccess, + tracingContext: RawlsTracingContext + )(implicit + executionContext: ExecutionContext ): ReadWriteAction[Unit] = if (workflowsWithOutputs.isEmpty) { DBIO.successful(()) } else { - for { - // load all the starting data - workspace <- getWorkspace(dataAccess).map( - _.getOrElse(throw new RawlsException(s"workspace for submission $submissionId not found")) - ) - entitiesById <- listWorkflowEntitiesById(workspace, workflowsWithOutputs, dataAccess) - outputExpressionMap <- listMethodConfigOutputsForSubmission(dataAccess) - emptyOutputs <- getSubmissionEmptyOutputParam(dataAccess) - - // figure out the updates that need to occur to entities and workspaces - updatedEntitiesAndWorkspace = attachOutputs(workspace, - workflowsWithOutputs, - entitiesById, - outputExpressionMap, - emptyOutputs - ) - - // for debugging purposes - workspacesToUpdate = updatedEntitiesAndWorkspace.collect { case Left((_, Some(workspace))) => workspace } - entityUpdates = updatedEntitiesAndWorkspace.collect { - case Left((Some(entityUpdate), _)) if entityUpdate.upserts.nonEmpty => entityUpdate + traceDBIOWithParent("handleOutputs", tracingContext) { rootSpan => + rootSpan.tracingSpan.foreach { span => + span.putAttribute("submissionId", OpenCensusAttributeValue.stringAttributeValue(submissionId.toString)) + span.putAttribute("numWorkflowsWithOutputs", + OpenCensusAttributeValue.longAttributeValue(workflowsWithOutputs.length) + ) } - _ = - if (workspacesToUpdate.nonEmpty && entityUpdates.nonEmpty) - logger.info("handleOutputs writing to both workspace and entity attributes") - else if (workspacesToUpdate.nonEmpty) - logger.info("handleOutputs writing to workspace attributes only") - else if (entityUpdates.nonEmpty) - logger.info("handleOutputs writing to entity attributes only") - else - logger.info("handleOutputs writing to neither workspace nor entity attributes; could be errors") - - // save everything to the db - _ <- saveWorkspace(dataAccess, updatedEntitiesAndWorkspace) - _ <- saveEntities(dataAccess, workspace, updatedEntitiesAndWorkspace) - _ <- saveErrors(updatedEntitiesAndWorkspace.collect { case Right(errors) => errors }, dataAccess) - } yield () + + for { + // load all the starting data + workspace <- traceDBIOWithParent("getWorkspace", rootSpan)(_ => getWorkspace(dataAccess)).map( + _.getOrElse(throw new RawlsException(s"workspace for submission $submissionId not found")) + ) + entitiesById <- traceDBIOWithParent("listWorkflowEntitiesById", rootSpan)(_ => + listWorkflowEntitiesById(workspace, workflowsWithOutputs, dataAccess) + ) + outputExpressionMap <- traceDBIOWithParent("listMethodConfigOutputsForSubmission", rootSpan)(_ => + listMethodConfigOutputsForSubmission(dataAccess) + ) + emptyOutputs <- traceDBIOWithParent("getSubmissionEmptyOutputParam", rootSpan)(_ => + getSubmissionEmptyOutputParam(dataAccess) + ) + + // figure out the updates that need to occur to entities and workspaces + updatedEntitiesAndWorkspace = attachOutputs(workspace, + workflowsWithOutputs, + entitiesById, + outputExpressionMap, + emptyOutputs + ) + + // for debugging purposes + workspacesToUpdate = updatedEntitiesAndWorkspace.collect { case Left((_, Some(workspace))) => workspace } + entityUpdates = updatedEntitiesAndWorkspace.collect { + case Left((Some(entityUpdate), _)) if entityUpdate.upserts.nonEmpty => entityUpdate + } + _ = + if (workspacesToUpdate.nonEmpty && entityUpdates.nonEmpty) + logger.info("handleOutputs writing to both workspace and entity attributes") + else if (workspacesToUpdate.nonEmpty) + logger.info("handleOutputs writing to workspace attributes only") + else if (entityUpdates.nonEmpty) + logger.info("handleOutputs writing to entity attributes only") + else + logger.info("handleOutputs writing to neither workspace nor entity attributes; could be errors") + + // save everything to the db + _ <- traceDBIOWithParent("saveWorkspace", rootSpan)(_ => + saveWorkspace(dataAccess, updatedEntitiesAndWorkspace) + ) + _ <- saveEntities(dataAccess, workspace, updatedEntitiesAndWorkspace, rootSpan) // has its own tracing + _ <- traceDBIOWithParent("saveErrors", rootSpan)(_ => + saveErrors(updatedEntitiesAndWorkspace.collect { case Right(errors) => errors }, dataAccess) + ) + } yield () + } } def getWorkspace(dataAccess: DataAccess): ReadAction[Option[Workspace]] = @@ -625,21 +664,28 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum workspace: Workspace, updatedEntitiesAndWorkspace: Seq[ Either[(Option[WorkflowEntityUpdate], Option[Workspace]), (WorkflowRecord, scala.Seq[AttributeString])] - ] - )(implicit executionContext: ExecutionContext) = { - val entityUpdates = updatedEntitiesAndWorkspace.collect { - case Left((Some(entityUpdate), _)) if entityUpdate.upserts.nonEmpty => entityUpdate - } - if (entityUpdates.isEmpty) { - DBIO.successful(()) - } else { - DBIO.sequence(entityUpdates map { entityUpd => - dataAccess.entityQuery - .saveEntityPatch(workspace, entityUpd.entityRef, entityUpd.upserts, Seq()) - .withStatementParameters(statementInit = _.setQueryTimeout(queryTimeout.toSeconds.toInt)) - }) + ], + tracingContext: RawlsTracingContext + )(implicit executionContext: ExecutionContext) = + traceDBIOWithParent("saveEntities", tracingContext) { span => + val entityUpdates = updatedEntitiesAndWorkspace.collect { + case Left((Some(entityUpdate), _)) if entityUpdate.upserts.nonEmpty => entityUpdate + } + span.tracingSpan.foreach { s => + s.putAttribute("numEntityUpdates", OpenCensusAttributeValue.longAttributeValue(entityUpdates.length)) + } + if (entityUpdates.isEmpty) { + DBIO.successful(()) + } else { + traceDBIOWithParent("saveEntityPatchSequence", span) { innerSpan => + DBIO.sequence(entityUpdates map { entityUpd => + dataAccess.entityQuery + .saveEntityPatch(workspace, entityUpd.entityRef, entityUpd.upserts, Seq(), innerSpan) + .withStatementParameters(statementInit = _.setQueryTimeout(queryTimeout.toSeconds.toInt)) + }) + } + } } - } private def attributeIsEmpty(attribute: Attribute): Boolean = attribute match { diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala index e9cf911bc7..a86103f381 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionSupervisor.scala @@ -262,9 +262,12 @@ class SubmissionSupervisor(executionServiceCluster: ExecutionServiceCluster, case MonitoredSubmissionException(workspaceName, submissionId, cause) => // increment smaRestart counter restartCounter(workspaceName, submissionId, cause) += 1 - logger.error(s"error monitoring submission $submissionId in workspace $workspaceName after $count times", cause) + logger.error( + s"error monitoring submission $submissionId in workspace $workspaceName after $count times: ${cause.getMessage}", + cause + ) case _ => - logger.error(s"error monitoring submission after $count times", throwable) + logger.error(s"error monitoring submission after $count times: ${throwable.getMessage}", throwable) } new ThresholdOneForOneStrategy(thresholdLimit = 3)(alwaysRestart)(thresholdFunc) diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponentSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponentSpec.scala index b848ef8ba9..674f2fc9d2 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponentSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/AttributeComponentSpec.scala @@ -1665,7 +1665,8 @@ class AttributeComponentSpec workspaceAttributeQuery.patchAttributesAction(Seq(insert), Seq(update), Seq(), - workspaceAttributeTempQuery.insertScratchAttributes + workspaceAttributeTempQuery.insertScratchAttributes, + RawlsTracingContext(None) ) ) assertExpectedRecords(Seq(existing.head, update, insert): _*) @@ -1675,7 +1676,8 @@ class AttributeComponentSpec workspaceAttributeQuery.patchAttributesAction(Seq(), Seq(), existing.map(_.id), - workspaceAttributeTempQuery.insertScratchAttributes + workspaceAttributeTempQuery.insertScratchAttributes, + RawlsTracingContext(None) ) ) assertExpectedRecords(Seq(insert): _*) @@ -1871,7 +1873,8 @@ class AttributeComponentSpec context, testData.sample1.toReference, inserts, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -1885,7 +1888,12 @@ class AttributeComponentSpec val expectedAfterUpdate = testData.sample1.attributes ++ updates runAndWait( - entityQuery.saveEntityPatch(context, testData.sample1.toReference, updates, Seq.empty[AttributeName]) + entityQuery.saveEntityPatch(context, + testData.sample1.toReference, + updates, + Seq.empty[AttributeName], + RawlsTracingContext(None) + ) ) val resultAfterUpdate = runAndWait(entityQuery.get(context, "Sample", "sample1")).head.attributes @@ -1988,6 +1996,7 @@ class AttributeComponentSpec verify(spiedAttrQuery, times(1)).patchAttributesAction(insertsCaptor.capture(), updatesCaptor.capture(), deletesCaptor.capture(), + any(), any() ) @@ -2050,6 +2059,7 @@ class AttributeComponentSpec verify(spiedAttrQuery, times(1)).patchAttributesAction(insertsCaptor.capture(), updatesCaptor.capture(), deletesCaptor.capture(), + any(), any() ) @@ -2112,6 +2122,7 @@ class AttributeComponentSpec verify(spiedAttrQuery, times(1)).patchAttributesAction(insertsCaptor.capture(), updatesCaptor.capture(), deletesCaptor.capture(), + any(), any() ) @@ -2170,6 +2181,7 @@ class AttributeComponentSpec verify(spiedAttrQuery, times(1)).patchAttributesAction(insertsCaptor.capture(), updatesCaptor.capture(), deletesCaptor.capture(), + any(), any() ) @@ -2236,6 +2248,7 @@ class AttributeComponentSpec verify(spiedAttrQuery, times(1)).patchAttributesAction(insertsCaptor.capture(), updatesCaptor.capture(), deletesCaptor.capture(), + any(), any() ) diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponentSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponentSpec.scala index da73b4531a..c544e31fac 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponentSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/EntityComponentSpec.scala @@ -176,7 +176,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit context, AttributeEntityReference("Sample", "nonexistent"), Map(AttributeName.withDefaultNS("newAttribute") -> AttributeNumber(2)), - Seq(AttributeName.withDefaultNS("type")) + Seq(AttributeName.withDefaultNS("type")), + RawlsTracingContext(None) ) ) } @@ -194,7 +195,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit context, AttributeEntityReference("Sample", "sample1"), Map(AttributeName.withDefaultNS("type") -> AttributeNumber(2)), - Seq(AttributeName.withDefaultNS("type")) + Seq(AttributeName.withDefaultNS("type")), + RawlsTracingContext(None) ) ) } @@ -237,7 +239,12 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit val expected = testData.sample1.attributes ++ inserts ++ updates -- deletes runAndWait { - entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), inserts ++ updates, deletes) + entityQuery.saveEntityPatch(context, + AttributeEntityReference("Sample", "sample1"), + inserts ++ updates, + deletes, + RawlsTracingContext(None) + ) } assertSameElements( @@ -282,7 +289,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), inserts, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -307,7 +315,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), updates, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -328,7 +337,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), inserts, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -354,7 +364,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), updates, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -382,7 +393,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), inserts, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -399,7 +411,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(context, AttributeEntityReference("Sample", "sample1"), updates, - Seq.empty[AttributeName] + Seq.empty[AttributeName], + RawlsTracingContext(None) ) ) @@ -1614,7 +1627,8 @@ class EntityComponentSpec extends TestDriverComponentWithFlatSpecAndMatchers wit entityQuery.saveEntityPatch(wsctx, entityToSave.toReference, Map.empty, - List(AttributeName.withDefaultNS("attributeListToDelete")) + List(AttributeName.withDefaultNS("attributeListToDelete")), + RawlsTracingContext(None) ) ) diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActorTimeoutSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActorTimeoutSpec.scala index c08d1beed7..fa6f653708 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActorTimeoutSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActorTimeoutSpec.scala @@ -9,7 +9,13 @@ import org.broadinstitute.dsde.rawls.dataaccess.{ MockShardedExecutionServiceCluster, SlickDataSource } -import org.broadinstitute.dsde.rawls.model.WorkflowStatuses +import org.broadinstitute.dsde.rawls.model.{ + AttributeEntityReference, + AttributeName, + AttributeString, + RawlsTracingContext, + WorkflowStatuses +} import org.broadinstitute.dsde.rawls.dataaccess.slick.TestDriverComponent import org.broadinstitute.dsde.rawls.mock.MockSamDAO import org.broadinstitute.dsde.rawls.util.MockitoTestUtils @@ -18,7 +24,6 @@ import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers import java.sql.BatchUpdateException -import org.broadinstitute.dsde.rawls.model.{AttributeEntityReference, AttributeName, AttributeString} import slick.jdbc.TransactionIsolation import java.util.UUID @@ -98,7 +103,8 @@ class SubmissionMonitorActorTimeoutSpec(_system: ActorSystem) runAndWait( submissionMonitorActorRef.underlyingActor.saveEntities(dataSource.dataAccess, workspaceContext, - updatedEntitiesAndWorkspace + updatedEntitiesAndWorkspace, + RawlsTracingContext(None) ), Duration.create(lockTime, SECONDS) ) diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala index 13692df6d1..7953ecd957 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala @@ -828,7 +828,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) workflowRecs.map(r => (r, ExecutionServiceOutputs(r.externalId.get, Map("o1" -> Left(AttributeString("result"))))) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -891,7 +892,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) workflowRecs.map(r => (r, ExecutionServiceOutputs(r.externalId.get, Map("o1_lib" -> Left(AttributeString("result"))))) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -947,7 +949,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) workflowRecs2.map(r => (r, ExecutionServiceOutputs(r.externalId.get, Map("o2_lib" -> Left(AttributeString("result2"))))) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -985,7 +988,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) ) ) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1030,7 +1034,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) ) ) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1055,7 +1060,10 @@ class SubmissionMonitorSpec(_system: ActorSystem) "o1" -> Left(AttributeValueList(Vector(AttributeString("123"), AttributeString("456"), AttributeString("789")))) ) runAndWait( - monitor.handleOutputs(workflowRecs.map(r => (r, ExecutionServiceOutputs(r.externalId.get, newOutputs))), this) + monitor.handleOutputs(workflowRecs.map(r => (r, ExecutionServiceOutputs(r.externalId.get, newOutputs))), + this, + RawlsTracingContext(Option.empty) + ) ) assertResult( @@ -1103,7 +1111,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) ) ) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1133,7 +1142,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) ) ) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1344,7 +1354,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) runAndWait( monitor.handleOutputs(Seq((workflowRec, ExecutionServiceOutputs(workflowRec.externalId.get, execOutputs))), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1428,7 +1439,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) workflowRecs.map(r => (r, ExecutionServiceOutputs(r.externalId.get, Map("bad1" -> Left(AttributeString("result"))))) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1575,7 +1587,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) ) ) ), - this + this, + RawlsTracingContext(Option.empty) ) ) @@ -1648,7 +1661,8 @@ class SubmissionMonitorSpec(_system: ActorSystem) ) ) ), - this + this, + RawlsTracingContext(Option.empty) ) )