Skip to content

Commit

Permalink
CORE-188: batch upsert tracing (#3147)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Dec 6, 2024
1 parent 4b9626b commit 4def381
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -695,81 +695,81 @@ trait AttributeComponent {
*/
def rewriteAttrsAction(attributesToSave: Traversable[RECORD],
existingAttributes: Traversable[RECORD],
insertFunction: Seq[RECORD] => () => WriteAction[Int]
insertFunction: Seq[RECORD] => () => WriteAction[Int],
parentContext: RawlsTracingContext = RawlsTracingContext()
): ReadWriteAction[Set[OWNER_ID]] =
traceDBIOWithParent("AttributeComponent.rewriteAttrsAction", RawlsTracingContext(Option.empty)) {
tracingContext =>
val toSaveAttrMap = toPrimaryKeyMap(attributesToSave)
val existingAttrMap = toPrimaryKeyMap(existingAttributes)

// note that currently-existing attributes will have a populated id e.g. "1234", but to-save will have an id of "0"
// therefore, we use this ComparableRecord class which omits the id when checking equality between existing and to-save.
object ComparableRecord {
def fromRecord(rec: RECORD): ComparableRecord =
new ComparableRecord(
rec.ownerId,
rec.namespace,
rec.name,
rec.valueString,
rec.valueNumber,
rec.valueBoolean,
rec.valueJson,
rec.valueEntityRef,
rec.listIndex,
rec.listLength,
rec.deleted,
rec.deletedDate
)
}
traceDBIOWithParent("AttributeComponent.rewriteAttrsAction", parentContext) { tracingContext =>
val toSaveAttrMap = toPrimaryKeyMap(attributesToSave)
val existingAttrMap = toPrimaryKeyMap(existingAttributes)

// note that currently-existing attributes will have a populated id e.g. "1234", but to-save will have an id of "0"
// therefore, we use this ComparableRecord class which omits the id when checking equality between existing and to-save.
object ComparableRecord {
def fromRecord(rec: RECORD): ComparableRecord =
new ComparableRecord(
rec.ownerId,
rec.namespace,
rec.name,
rec.valueString,
rec.valueNumber,
rec.valueBoolean,
rec.valueJson,
rec.valueEntityRef,
rec.listIndex,
rec.listLength,
rec.deleted,
rec.deletedDate
)
}

case class ComparableRecord(
ownerId: OWNER_ID,
namespace: String,
name: String,
valueString: Option[String],
valueNumber: Option[Double],
valueBoolean: Option[Boolean],
valueJson: Option[String],
valueEntityRef: Option[Long],
listIndex: Option[Int],
listLength: Option[Int],
deleted: Boolean,
deletedDate: Option[Timestamp]
)
case class ComparableRecord(
ownerId: OWNER_ID,
namespace: String,
name: String,
valueString: Option[String],
valueNumber: Option[Double],
valueBoolean: Option[Boolean],
valueJson: Option[String],
valueEntityRef: Option[Long],
listIndex: Option[Int],
listLength: Option[Int],
deleted: Boolean,
deletedDate: Option[Timestamp]
)

// create a set of ComparableRecords representing the existing attributes
val existingAttributesSet: Set[ComparableRecord] =
existingAttributes.toSet.map(r => ComparableRecord.fromRecord(r))
// create a set of ComparableRecords representing the existing attributes
val existingAttributesSet: Set[ComparableRecord] =
existingAttributes.toSet.map(r => ComparableRecord.fromRecord(r))

val existingKeys = existingAttrMap.keySet
val existingKeys = existingAttrMap.keySet

// insert attributes which are in save but not exists
val attributesToInsert = toSaveAttrMap.filterKeys(!existingKeys.contains(_))
// insert attributes which are in save but not exists
val attributesToInsert = toSaveAttrMap.filterKeys(!existingKeys.contains(_))

// delete attributes which are in exists but not save
val attributesToDelete = existingAttrMap.filterKeys(!toSaveAttrMap.keySet.contains(_))
// delete attributes which are in exists but not save
val attributesToDelete = existingAttrMap.filterKeys(!toSaveAttrMap.keySet.contains(_))

val attributesToUpdate = toSaveAttrMap.filter { case (k, v) =>
existingKeys.contains(k) && // if the attribute doesn't already exist, don't attempt to update it
!existingAttributesSet.contains(
ComparableRecord.fromRecord(v)
) // if the attribute exists and is unchanged, don't update it
}
val attributesToUpdate = toSaveAttrMap.filter { case (k, v) =>
existingKeys.contains(k) && // if the attribute doesn't already exist, don't attempt to update it
!existingAttributesSet.contains(
ComparableRecord.fromRecord(v)
) // if the attribute exists and is unchanged, don't update it
}

// collect the parent objects (e.g. entity, workspace) that have writes, so we know which object rows to re-calculate
val ownersWithWrites: Set[OWNER_ID] = (attributesToInsert.values.map(_.ownerId) ++
attributesToUpdate.values.map(_.ownerId) ++
attributesToDelete.values.map(_.ownerId)).toSet

// perform the inserts/updates/deletes
patchAttributesAction(
attributesToInsert.values,
attributesToUpdate.values,
attributesToDelete.values.map(_.id),
insertFunction,
tracingContext
)
.map(_ => ownersWithWrites)
// collect the parent objects (e.g. entity, workspace) that have writes, so we know which object rows to re-calculate
val ownersWithWrites: Set[OWNER_ID] = (attributesToInsert.values.map(_.ownerId) ++
attributesToUpdate.values.map(_.ownerId) ++
attributesToDelete.values.map(_.ownerId)).toSet

// perform the inserts/updates/deletes
patchAttributesAction(
attributesToInsert.values,
attributesToUpdate.values,
attributesToDelete.values.map(_.id),
insertFunction,
tracingContext
)
.map(_ => ownersWithWrites)
}

// noinspection SqlDialectInspection
Expand Down Expand Up @@ -804,8 +804,10 @@ trait AttributeComponent {

def updateAction(insertIntoScratchFunction: () => WriteAction[Int], tracingContext: RawlsTracingContext) =
traceDBIOWithParent("updateAction", tracingContext) { span =>
traceDBIOWithParent("insertIntoScratchFunction", span)(_ => insertIntoScratchFunction()) andThen
traceDBIOWithParent("updateInMasterAction", span)(_ => updateInMasterAction())
for {
_ <- traceDBIOWithParent("insertIntoScratchFunction", span)(_ => insertIntoScratchFunction())
numUpdates <- traceDBIOWithParent("updateInMasterAction", span)(_ => updateInMasterAction())
} yield numUpdates
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,25 +1026,35 @@ trait EntityComponent {
def save(workspaceContext: Workspace, entity: Entity): ReadWriteAction[Entity] =
save(workspaceContext, Seq(entity)).map(_.head)

def save(workspaceContext: Workspace, entities: Traversable[Entity]): ReadWriteAction[Traversable[Entity]] = {
def save(workspaceContext: Workspace,
entities: Traversable[Entity],
parentContext: RawlsTracingContext = RawlsTracingContext()
): ReadWriteAction[Traversable[Entity]] = {
entities.foreach(validateEntity)

for {
_ <- workspaceQuery.updateLastModified(workspaceContext.workspaceIdAsUUID)
preExistingEntityRecs <- getEntityRecords(workspaceContext.workspaceIdAsUUID, entities.map(_.toReference).toSet)
savingEntityRecs <- entityQueryWithInlineAttributes
.insertNewEntities(workspaceContext, entities, preExistingEntityRecs.map(_.toReference))
.map(_ ++ preExistingEntityRecs)
referencedAndSavingEntityRecs <- lookupNotYetLoadedReferences(workspaceContext,
entities,
savingEntityRecs.map(_.toReference)
).map(_ ++ savingEntityRecs)
_ <- traceDBIOWithParent("updateLastModified", parentContext)(_ =>
workspaceQuery.updateLastModified(workspaceContext.workspaceIdAsUUID)
)
preExistingEntityRecs <- traceDBIOWithParent("getEntityRecords", parentContext)(_ =>
getEntityRecords(workspaceContext.workspaceIdAsUUID, entities.map(_.toReference).toSet)
)
savingEntityRecs <- traceDBIOWithParent("insertNewEntities", parentContext)(_ =>
entityQueryWithInlineAttributes
.insertNewEntities(workspaceContext, entities, preExistingEntityRecs.map(_.toReference))
.map(_ ++ preExistingEntityRecs)
)
referencedAndSavingEntityRecs <- traceDBIOWithParent("lookupNotYetLoadedReferences", parentContext)(_ =>
lookupNotYetLoadedReferences(workspaceContext, entities, savingEntityRecs.map(_.toReference))
.map(_ ++ savingEntityRecs)
)

actuallyUpdatedEntityIds <- rewriteAttributes(
workspaceContext.workspaceIdAsUUID,
entities,
savingEntityRecs.map(_.id),
referencedAndSavingEntityRecs.map(e => e.toReference -> e.id).toMap
referencedAndSavingEntityRecs.map(e => e.toReference -> e.id).toMap,
parentContext
)
// find the pre-existing records that we updated
actuallyUpdatedPreExistingEntityRecs = preExistingEntityRecs.filter(e =>
Expand All @@ -1064,7 +1074,9 @@ trait EntityComponent {
// 2) were repeated in the input payload, causing one insert and subsequent update(s)
recsToUpdate = (actuallyUpdatedPreExistingEntityRecs ++ insertedRepeats).distinct

_ <- entityQueryWithInlineAttributes.optimisticLockUpdate(recsToUpdate, entities)
_ <- traceDBIOWithParent("optimisticLockUpdate", parentContext)(_ =>
entityQueryWithInlineAttributes.optimisticLockUpdate(recsToUpdate, entities)
)
} yield entities
}

Expand Down Expand Up @@ -1112,7 +1124,8 @@ trait EntityComponent {
private def rewriteAttributes(workspaceId: UUID,
entitiesToSave: Traversable[Entity],
entityIds: Seq[Long],
entityIdsByRef: Map[AttributeEntityReference, Long]
entityIdsByRef: Map[AttributeEntityReference, Long],
parentContext: RawlsTracingContext = RawlsTracingContext()
) = {
val attributesToSave = for {
entity <- entitiesToSave
Expand All @@ -1127,7 +1140,8 @@ trait EntityComponent {
entityAttributeShardQuery(workspaceId).findByOwnerQuery(entityIds).result flatMap { existingAttributes =>
entityAttributeShardQuery(workspaceId).rewriteAttrsAction(attributesToSave,
existingAttributes,
entityAttributeTempQuery.insertScratchAttributes
entityAttributeTempQuery.insertScratchAttributes,
parentContext
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,15 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments,
} else {
val t = updateTrials.collect { case (entityUpdate, Success(entity)) => entity }

dataAccess.entityQuery
.save(workspaceContext, t)
.withStatementParameters(statementInit = _.setQueryTimeout(queryTimeoutSeconds))
traceDBIOWithParent("saveAction", localContext) { subContext =>
dataAccess.entityQuery
.save(workspaceContext, t, subContext.toTracingContext)
.withStatementParameters(statementInit = _.setQueryTimeout(queryTimeoutSeconds))
}
}
}

traceDBIOWithParent("saveAction", localContext)(_ => saveAction)
saveAction
} recover {
case icve: java.sql.SQLIntegrityConstraintViolationException =>
val userMessage =
Expand Down

0 comments on commit 4def381

Please sign in to comment.