Skip to content

Commit

Permalink
AJ-1524: tracing for submission monitor (#2667)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Dec 15, 2023
1 parent 795de9d commit a3f5633
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 310 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.broadinstitute.dsde.rawls.dataaccess.slick

import akka.http.scaladsl.model.StatusCodes
import io.opencensus.scala.Tracing._
import org.broadinstitute.dsde.rawls.model.Attributable.AttributeMap
import org.broadinstitute.dsde.rawls.model.AttributeName.toDelimitedName
import org.broadinstitute.dsde.rawls.model._
import org.broadinstitute.dsde.rawls.util.TracingUtils.traceDBIOWithParent
import org.broadinstitute.dsde.rawls.{RawlsException, RawlsExceptionWithErrorReport, RawlsFatalExceptionWithErrorReport}
import slick.ast.{BaseTypedType, TypedType}
import slick.dbio.Effect.Write
Expand Down Expand Up @@ -596,17 +598,26 @@ trait AttributeComponent {
def patchAttributesAction(inserts: Traversable[RECORD],
updates: Traversable[RECORD],
deleteIds: Traversable[Long],
insertFunction: Seq[RECORD] => String => WriteAction[Int]
insertFunction: Seq[RECORD] => String => WriteAction[Int],
tracingContext: RawlsTracingContext
) =
for {
_ <- if (deleteIds.nonEmpty) deleteAttributeRecordsById(deleteIds.toSeq) else DBIO.successful(0)
_ <- if (inserts.nonEmpty) batchInsertAttributes(inserts.toSeq) else DBIO.successful(0)
updateResult <-
if (updates.nonEmpty)
AlterAttributesUsingScratchTableQueries.updateAction(insertFunction(updates.toSeq))
else
DBIO.successful(0)
} yield updateResult
traceDBIOWithParent("patchAttributesAction", tracingContext) { span =>
for {
_ <-
if (deleteIds.nonEmpty)
traceDBIOWithParent("deleteAttributeRecordsById", span)(_ => deleteAttributeRecordsById(deleteIds.toSeq))
else DBIO.successful(0)
_ <-
if (inserts.nonEmpty)
traceDBIOWithParent("batchInsertAttributes", span)(_ => batchInsertAttributes(inserts.toSeq))
else DBIO.successful(0)
updateResult <-
if (updates.nonEmpty)
AlterAttributesUsingScratchTableQueries.updateAction(insertFunction(updates.toSeq), span)
else
DBIO.successful(0)
} yield updateResult
}

def deleteAttributes(workspaceContext: Workspace, entityType: String, attributeNames: Set[AttributeName]) =
workspaceQuery.updateLastModified(workspaceContext.workspaceIdAsUUID) andThen
Expand Down Expand Up @@ -696,80 +707,84 @@ trait AttributeComponent {
def rewriteAttrsAction(attributesToSave: Traversable[RECORD],
existingAttributes: Traversable[RECORD],
insertFunction: Seq[RECORD] => String => WriteAction[Int]
): ReadWriteAction[Set[OWNER_ID]] = {
val toSaveAttrMap = toPrimaryKeyMap(attributesToSave)
val existingAttrMap = toPrimaryKeyMap(existingAttributes)

// note that currently-existing attributes will have a populated id e.g. "1234", but to-save will have an id of "0"
// therefore, we use this ComparableRecord class which omits the id when checking equality between existing and to-save.
// note this does not include transactionId for AttributeScratchRecords. We do not expect AttributeScratchRecords
// here, and transactionId will eventually be going away, so don't bother
object ComparableRecord {
def fromRecord(rec: RECORD): ComparableRecord =
new ComparableRecord(
rec.ownerId,
rec.namespace,
rec.name,
rec.valueString,
rec.valueNumber,
rec.valueBoolean,
rec.valueJson,
rec.valueEntityRef,
rec.listIndex,
rec.listLength,
rec.deleted,
rec.deletedDate
): ReadWriteAction[Set[OWNER_ID]] =
traceDBIOWithParent("AttributeComponent.rewriteAttrsAction", RawlsTracingContext(Option.empty)) {
tracingContext =>
val toSaveAttrMap = toPrimaryKeyMap(attributesToSave)
val existingAttrMap = toPrimaryKeyMap(existingAttributes)

// note that currently-existing attributes will have a populated id e.g. "1234", but to-save will have an id of "0"
// therefore, we use this ComparableRecord class which omits the id when checking equality between existing and to-save.
// note this does not include transactionId for AttributeScratchRecords. We do not expect AttributeScratchRecords
// here, and transactionId will eventually be going away, so don't bother
object ComparableRecord {
def fromRecord(rec: RECORD): ComparableRecord =
new ComparableRecord(
rec.ownerId,
rec.namespace,
rec.name,
rec.valueString,
rec.valueNumber,
rec.valueBoolean,
rec.valueJson,
rec.valueEntityRef,
rec.listIndex,
rec.listLength,
rec.deleted,
rec.deletedDate
)
}

case class ComparableRecord(
ownerId: OWNER_ID,
namespace: String,
name: String,
valueString: Option[String],
valueNumber: Option[Double],
valueBoolean: Option[Boolean],
valueJson: Option[String],
valueEntityRef: Option[Long],
listIndex: Option[Int],
listLength: Option[Int],
deleted: Boolean,
deletedDate: Option[Timestamp]
)
}

case class ComparableRecord(
ownerId: OWNER_ID,
namespace: String,
name: String,
valueString: Option[String],
valueNumber: Option[Double],
valueBoolean: Option[Boolean],
valueJson: Option[String],
valueEntityRef: Option[Long],
listIndex: Option[Int],
listLength: Option[Int],
deleted: Boolean,
deletedDate: Option[Timestamp]
)
// create a set of ComparableRecords representing the existing attributes
val existingAttributesSet: Set[ComparableRecord] =
existingAttributes.toSet.map(r => ComparableRecord.fromRecord(r))

// create a set of ComparableRecords representing the existing attributes
val existingAttributesSet: Set[ComparableRecord] =
existingAttributes.toSet.map(r => ComparableRecord.fromRecord(r))
val existingKeys = existingAttrMap.keySet

val existingKeys = existingAttrMap.keySet
// insert attributes which are in save but not exists
val attributesToInsert = toSaveAttrMap.filterKeys(!existingKeys.contains(_))

// insert attributes which are in save but not exists
val attributesToInsert = toSaveAttrMap.filterKeys(!existingKeys.contains(_))
// delete attributes which are in exists but not save
val attributesToDelete = existingAttrMap.filterKeys(!toSaveAttrMap.keySet.contains(_))

// delete attributes which are in exists but not save
val attributesToDelete = existingAttrMap.filterKeys(!toSaveAttrMap.keySet.contains(_))
val attributesToUpdate = toSaveAttrMap.filter { case (k, v) =>
existingKeys.contains(k) && // if the attribute doesn't already exist, don't attempt to update it
!existingAttributesSet.contains(
ComparableRecord.fromRecord(v)
) // if the attribute exists and is unchanged, don't update it
}

val attributesToUpdate = toSaveAttrMap.filter { case (k, v) =>
existingKeys.contains(k) && // if the attribute doesn't already exist, don't attempt to update it
!existingAttributesSet.contains(
ComparableRecord.fromRecord(v)
) // if the attribute exists and is unchanged, don't update it
// collect the parent objects (e.g. entity, workspace) that have writes, so we know which object rows to re-calculate
val ownersWithWrites: Set[OWNER_ID] = (attributesToInsert.values.map(_.ownerId) ++
attributesToUpdate.values.map(_.ownerId) ++
attributesToDelete.values.map(_.ownerId)).toSet

// perform the inserts/updates/deletes
patchAttributesAction(
attributesToInsert.values,
attributesToUpdate.values,
attributesToDelete.values.map(_.id),
insertFunction,
tracingContext
)
.map(_ => ownersWithWrites)
}

// collect the parent objects (e.g. entity, workspace) that have writes, so we know which object rows to re-calculate
val ownersWithWrites: Set[OWNER_ID] = (attributesToInsert.values.map(_.ownerId) ++
attributesToUpdate.values.map(_.ownerId) ++
attributesToDelete.values.map(_.ownerId)).toSet

// perform the inserts/updates/deletes
patchAttributesAction(attributesToInsert.values,
attributesToUpdate.values,
attributesToDelete.values.map(_.id),
insertFunction
)
.map(_ => ownersWithWrites)
}

// noinspection SqlDialectInspection
object AlterAttributesUsingScratchTableQueries extends RawSqlQuery {
val driver: JdbcProfile = AttributeComponent.this.driver
Expand Down Expand Up @@ -809,12 +824,15 @@ trait AttributeComponent {
else DBIO.successful(0)
}

def updateAction(insertIntoScratchFunction: String => WriteAction[Int]) = {
val transactionId = UUID.randomUUID().toString
insertIntoScratchFunction(transactionId) andThen
updateInMasterAction(transactionId) andThen
clearAttributeScratchTableAction(transactionId)
}
def updateAction(insertIntoScratchFunction: String => WriteAction[Int], tracingContext: RawlsTracingContext) =
traceDBIOWithParent("updateAction", tracingContext) { span =>
val transactionId = UUID.randomUUID().toString
traceDBIOWithParent("insertIntoScratchFunction", span)(_ => insertIntoScratchFunction(transactionId)) andThen
traceDBIOWithParent("updateInMasterAction", span)(_ => updateInMasterAction(transactionId)) andThen
traceDBIOWithParent("clearAttributeScratchTableAction", span)(_ =>
clearAttributeScratchTableAction(transactionId)
)
}
}

def unmarshalAttributes[ID](
Expand Down
Loading

0 comments on commit a3f5633

Please sign in to comment.