diff --git a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml index b61fd8c4f2..63388b999a 100644 --- a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml +++ b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changelog.xml @@ -125,6 +125,7 @@ + diff --git a/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20240910_entity_json_support.xml b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20240910_entity_json_support.xml new file mode 100644 index 0000000000..d344e808a3 --- /dev/null +++ b/core/src/main/resources/org/broadinstitute/dsde/rawls/liquibase/changesets/20240910_entity_json_support.xml @@ -0,0 +1,105 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DROP TRIGGER IF EXISTS after_entity_insert ~ + DROP TRIGGER IF EXISTS after_entity_update ~ + + CREATE TRIGGER after_entity_insert AFTER INSERT ON ENTITY + FOR EACH ROW + if new.attributes is not null then + INSERT INTO ENTITY_KEYS + (id, workspace_id, entity_type, attribute_keys, last_updated) + VALUES + (new.id, new.workspace_id, new.entity_type, JSON_KEYS(new.attributes), now(3)); + end if ~ + + CREATE TRIGGER after_entity_update AFTER UPDATE ON ENTITY + FOR EACH ROW + BEGIN + -- is this row soft-deleted? + if old.deleted = 0 and new.deleted = 1 then + DELETE FROM ENTITY_KEYS WHERE id = new.id; + elseif new.attributes is not null then + -- compare old keys to new keys; update the ENTITY_KEYS table only if they are different + set @new_keys := JSON_KEYS(new.attributes); + set @old_keys := JSON_KEYS(old.attributes); + if JSON_LENGTH(@new_keys) != JSON_LENGTH(@old_keys) OR JSON_CONTAINS(@new_keys, @old_keys) = 0 then + UPDATE ENTITY_KEYS SET attribute_keys=@new_keys, last_updated=now(3) + WHERE id = new.id; + end if; + end if; + END ~ + + + DROP TRIGGER IF EXISTS after_entity_insert; + DROP TRIGGER IF EXISTS after_entity_update; + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/DataAccess.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/DataAccess.scala index 264f446d5d..5d99e9a9d0 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/DataAccess.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/DataAccess.scala @@ -13,6 +13,7 @@ trait DataAccess with RawlsBillingProjectComponent with WorkspaceComponent with EntityComponent + with JsonEntityComponent with AttributeComponent with MethodConfigurationComponent with SubmissionComponent diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/JsonEntityComponent.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/JsonEntityComponent.scala new file mode 100644 index 0000000000..433c200e1f --- /dev/null +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/dataaccess/slick/JsonEntityComponent.scala @@ -0,0 +1,464 @@ +package org.broadinstitute.dsde.rawls.dataaccess.slick + +import com.typesafe.scalalogging.LazyLogging +import org.broadinstitute.dsde.rawls.RawlsException +import org.broadinstitute.dsde.rawls.model.Attributable.AttributeMap +import org.broadinstitute.dsde.rawls.model.WorkspaceJsonSupport._ +import org.broadinstitute.dsde.rawls.model._ +import slick.jdbc._ +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import java.sql.Timestamp +import java.util.{Date, UUID} +import scala.language.postfixOps + +/** + * model class for rows in the ENTITY table, used for high-level Slick operations + */ +case class JsonEntitySlickRecord(id: Long, + name: String, + entityType: String, + workspaceId: UUID, + recordVersion: Long, + deleted: Boolean, + deletedDate: Option[Timestamp], + attributes: Option[String] +) { + def toEntity: Entity = + Entity(name, entityType, attributes.getOrElse("{}").parseJson.convertTo[AttributeMap]) +} + +/** + * model class for rows in the ENTITY table, used for low-level raw SQL operations + */ +// TODO AJ-2008: handle the all_attribute_values column? +// TODO AJ-2008: probably don't need deletedDate here +case class JsonEntityRecord(id: Long, + name: String, + entityType: String, + workspaceId: UUID, + recordVersion: Long, + deleted: Boolean, + deletedDate: Option[Timestamp], + attributes: JsValue +) { + def toEntity: Entity = + Entity(name, entityType, attributes.convertTo[AttributeMap]) + def toSlick: JsonEntitySlickRecord = + JsonEntitySlickRecord(id, + name, + entityType, + workspaceId, + recordVersion, + deleted, + deletedDate, + Some(attributes.compactPrint) + ) +} + +/** + * abbreviated model for rows in the ENTITY table when we don't need all the columns + */ +case class JsonEntityRefRecord(id: Long, name: String, entityType: String) + +/** + * model class for rows in the ENTITY_REFS table + */ +case class RefPointerRecord(fromId: Long, toId: Long) + +/** + * companion object for constants, etc. + */ +object JsonEntityComponent { + // the length of the all_attribute_values column, which is TEXT, minus a few bytes because i'm nervous + val allAttributeValuesColumnSize = 65532 +} + +/** + * Slick component for reading/writing JSON-based entities + */ +trait JsonEntityComponent extends LazyLogging { + this: DriverComponent => + + import slick.jdbc.MySQLProfile.api._ + + // json codec for entity attributes + implicit val attributeFormat: AttributeFormat = new AttributeFormat with PlainArrayAttributeListSerializer + + /** high-level Slick table for ENTITY */ + class JsonEntityTable(tag: Tag) extends Table[JsonEntitySlickRecord](tag, "ENTITY") { + def id = column[Long]("id", O.PrimaryKey, O.AutoInc) + def name = column[String]("name", O.Length(254)) + def entityType = column[String]("entity_type", O.Length(254)) + def workspaceId = column[UUID]("workspace_id") + def version = column[Long]("record_version") + def deleted = column[Boolean]("deleted") + def deletedDate = column[Option[Timestamp]]("deleted_date") + def attributes = column[Option[String]]("attributes") + + // TODO AJ-2008: are these useful? + // def workspace = foreignKey("FK_ENTITY_WORKSPACE", workspaceId, workspaceQuery)(_.id) + // def uniqueTypeName = index("idx_entity_type_name", (workspaceId, entityType, name), unique = true) + + def * = + (id, name, entityType, workspaceId, version, deleted, deletedDate, attributes) <> (JsonEntitySlickRecord.tupled, + JsonEntitySlickRecord.unapply + ) + } + + /** high-level Slick table for ENTITY_REFS */ + class JsonEntityRefTable(tag: Tag) extends Table[RefPointerRecord](tag, "ENTITY_REFS") { + def fromId = column[Long]("from_id") + def toId = column[Long]("to_id") + + def * = + (fromId, toId) <> (RefPointerRecord.tupled, RefPointerRecord.unapply) + } + + /** high-level Slick query object for ENTITY */ + object jsonEntityRefSlickQuery extends TableQuery(new JsonEntityRefTable(_)) {} + + /** high-level Slick query object for ENTITY_REFS */ + object jsonEntitySlickQuery extends TableQuery(new JsonEntityTable(_)) {} + + /** low-level raw SQL queries for ENTITY */ + object jsonEntityQuery extends RawSqlQuery { + val driver: JdbcProfile = JsonEntityComponent.this.driver + + // read a json column from the db and translate into a JsValue + implicit val GetJsValueResult: GetResult[JsValue] = GetResult(r => r.nextString().parseJson) + + // write a JsValue to the database by converting it to a string (the db column is still JSON) + implicit object SetJsValueParameter extends SetParameter[JsValue] { + def apply(v: JsValue, pp: PositionedParameters): Unit = + pp.setString(v.compactPrint) + } + + // select id, name, entity_type, workspace_id, record_version, deleted, deleted_date, attributes + // into a JsonEntityRecord + implicit val getJsonEntityRecord: GetResult[JsonEntityRecord] = + GetResult(r => JsonEntityRecord(r.<<, r.<<, r.<<, r.<<, r.<<, r.<<, r.<<, r.<<)) + + implicit val getJsonEntityRefRecord: GetResult[JsonEntityRefRecord] = + GetResult(r => JsonEntityRefRecord(r.<<, r.<<, r.<<)) + + /** + * Insert a single entity to the db + */ + // TODO AJ-2008: return Entity instead of JsonEntityRecord? + def createEntity(workspaceId: UUID, entity: Entity): ReadWriteAction[Int] = { + val attributesJson: JsValue = entity.attributes.toJson + + sqlu"""insert into ENTITY(name, entity_type, workspace_id, record_version, deleted, attributes) + values (${entity.name}, ${entity.entityType}, $workspaceId, 0, 0, $attributesJson)""" + } + + /** + * Update a single entity in the db + */ + // TODO AJ-2008: return Entity instead of JsonEntityRecord? + // TODO AJ-2008: can this use INSERT ... ON DUPLICATE KEY UPDATE instead? That would allow batching multiple updates + // into a single statement. But, how would that work with record_version checking? + def updateEntity(workspaceId: UUID, entity: Entity, recordVersion: Long): ReadWriteAction[Int] = { + val attributesJson: JsValue = entity.attributes.toJson + + sqlu"""update ENTITY set record_version = record_version+1, attributes = $attributesJson + where workspace_id = $workspaceId and entity_type = ${entity.entityType} and name = ${entity.name} + and record_version = $recordVersion; + """ + } + + /** + * Read a single entity from the db + */ + // TODO AJ-2008: return Entity instead of JsonEntityRecord? + def getEntity(workspaceId: UUID, entityType: String, entityName: String): ReadAction[Option[JsonEntityRecord]] = { + val selectStatement: SQLActionBuilder = + sql"""select id, name, entity_type, workspace_id, record_version, deleted, deleted_date, attributes + from ENTITY where workspace_id = $workspaceId and entity_type = $entityType and name = $entityName""" + + uniqueResult(selectStatement.as[JsonEntityRecord]) + } + + /** + * Read a single entity from the db + */ + // TODO AJ-2008: return Entity instead of JsonEntityRecord? + def getEntityRef(workspaceId: UUID, + entityType: String, + entityName: String + ): ReadAction[Option[JsonEntityRefRecord]] = { + val selectStatement: SQLActionBuilder = + sql"""select id, name, entity_type + from ENTITY where workspace_id = $workspaceId and entity_type = $entityType and name = $entityName""" + + uniqueResult(selectStatement.as[JsonEntityRefRecord]) + } + + /** + * All entity types for the given workspace, with their counts of active entities + */ + def typesAndCounts(workspaceId: UUID): ReadAction[Seq[(String, Int)]] = + sql"""select entity_type, count(1) from ENTITY where workspace_id = $workspaceId and deleted = 0 group by entity_type""" + .as[(String, Int)] + + /** + * All attribute names for the given workspace, paired to their entity type + * The ENTITY_KEYS table is automatically populated via triggers on the ENTITY table; see the db + * to understand those triggers. + */ + // TODO AJ-2008: assess performance of ENTITY_KEYS.attribute_keys vs JSON_KEYS(ENTITY.attributes) + def typesAndAttributes(workspaceId: UUID): ReadAction[Seq[(String, String)]] = + sql"""SELECT DISTINCT entity_type, json_key FROM ENTITY_KEYS, + JSON_TABLE(attribute_keys, '$$[*]' COLUMNS(json_key VARCHAR(256) PATH '$$')) t + where workspace_id = $workspaceId;""" + .as[(String, String)] + + def typesAndAttributesV2(workspaceId: UUID): ReadAction[Seq[(String, String)]] = + sql"""SELECT DISTINCT entity_type, json_key FROM ENTITY, + JSON_TABLE(json_keys(attributes), '$$[*]' COLUMNS(json_key VARCHAR(256) PATH '$$')) t + where workspace_id = $workspaceId;""" + .as[(String, String)] + + def queryEntities(workspaceId: UUID, entityType: String, queryParams: EntityQuery): ReadAction[Seq[Entity]] = { + + val offset = queryParams.pageSize * (queryParams.page - 1) + + // get the where clause from the shared method + val whereClause: SQLActionBuilder = queryWhereClause(workspaceId, entityType, queryParams) + + // sorting + val orderByClause: SQLActionBuilder = queryParams.sortField match { + case "name" => sql" order by name #${SortDirections.toSql(queryParams.sortDirection)} " + case attr => + sql" order by JSON_EXTRACT(attributes, '$$.#$attr') #${SortDirections.toSql(queryParams.sortDirection)} " + } + + // TODO AJ-2008: full-table text search + // TODO AJ-2008: filter by column + // TODO AJ-2008: result projection + + val query = concatSqlActions( + sql"select id, name, entity_type, workspace_id, record_version, deleted, deleted_date, attributes from ENTITY ", + whereClause, + orderByClause, + sql" limit #${queryParams.pageSize} offset #$offset" + ) + + query.as[JsonEntityRecord].map(results => results.map(_.toEntity)) + } + + /** + * Count the number of entities that match the query, before applying all filters + */ + def countType(workspaceId: UUID, entityType: String): ReadAction[Int] = + singleResult( + sql"select count(1) from ENTITY where workspace_id = $workspaceId and entity_type = $entityType and deleted = 0" + .as[Int] + ) + + /** + * Count the number of entities that match the query, after applying all filters + */ + def countQuery(workspaceId: UUID, entityType: String, queryParams: EntityQuery): ReadAction[Int] = { + // get the where clause from the shared method + val whereClause = queryWhereClause(workspaceId, entityType, queryParams) + + val query = concatSqlActions( + sql"select count(1) from ENTITY ", + whereClause + ) + singleResult(query.as[Int]) + } + + /** + * Shared method to build the where-clause criteria for entityQuery. Used to generate the results and to generate the counts. + */ + private def queryWhereClause(workspaceId: UUID, entityType: String, queryParams: EntityQuery): SQLActionBuilder = + sql"where workspace_id = $workspaceId and entity_type = $entityType and deleted = 0" + + /** Given a set of entity references, retrieve those entities */ + def getEntities(workspaceId: UUID, refs: Set[AttributeEntityReference]): ReadAction[Seq[JsonEntityRecord]] = + // short-circuit + if (refs.isEmpty) { + DBIO.successful(Seq.empty[JsonEntityRecord]) + } else { + // group the entity type/name pairs by type + val groupedReferences: Map[String, Set[String]] = refs.groupMap(_.entityType)(_.entityName) + + // build select statements for each type + val queryParts: Iterable[SQLActionBuilder] = groupedReferences.map { + case (entityType: String, entityNames: Set[String]) => + // build the "IN" clause values + val entityNamesSql = reduceSqlActionsWithDelim(entityNames.map(name => sql"$name").toSeq, sql",") + + // TODO AJ-2008: check query plan for this and make sure it is properly using indexes + // UNION query does use indexes for each select; but it also requires a temporary table to + // combine the results, and we can probably do better. `where (entity_type, name) in ((?, ?), (?, ?)) + // looks like it works well + // TODO AJ-2008: include `where deleted=0`? Make that an argument? + concatSqlActions( + sql"""select id, name, entity_type, workspace_id, record_version, deleted, deleted_date, attributes + from ENTITY where workspace_id = $workspaceId and entity_type = $entityType + and name in (""", + entityNamesSql, + sql")" + ) + } + + // union the select statements together + val unionQuery = reduceSqlActionsWithDelim(queryParts.toSeq, sql" union ") + + // execute + unionQuery.as[JsonEntityRecord](getJsonEntityRecord) + } + + /** Given a set of entity references, retrieve those entities */ + // TODO AJ-2008: address lots of copy/paste between getEntities and getEntityRefs + def getEntityRefs(workspaceId: UUID, refs: Set[AttributeEntityReference]): ReadAction[Seq[JsonEntityRefRecord]] = + // short-circuit + if (refs.isEmpty) { + DBIO.successful(Seq.empty[JsonEntityRefRecord]) + } else { + // group the entity type/name pairs by type + val groupedReferences: Map[String, Set[String]] = refs.groupMap(_.entityType)(_.entityName) + + // build select statements for each type + val queryParts: Iterable[SQLActionBuilder] = groupedReferences.map { + case (entityType: String, entityNames: Set[String]) => + // build the "IN" clause values + val entityNamesSql = reduceSqlActionsWithDelim(entityNames.map(name => sql"$name").toSeq, sql",") + + // TODO AJ-2008: check query plan for this and make sure it is properly using indexes + // UNION query does use indexes for each select; but it also requires a temporary table to + // combine the results, and we can probably do better. `where (entity_type, name) in ((?, ?), (?, ?)) + // looks like it works well + // TODO AJ-2008: include `where deleted=0`? Make that an argument? + concatSqlActions( + sql"""select id, name, entity_type + from ENTITY where workspace_id = $workspaceId and entity_type = $entityType + and name in (""", + entityNamesSql, + sql")" + ) + } + + // union the select statements together + val unionQuery = reduceSqlActionsWithDelim(queryParts.toSeq, sql" union ") + + // execute + unionQuery.as[JsonEntityRefRecord](getJsonEntityRefRecord) + } + + /** + * Returns the set of entities which directly reference the supplied targets + */ + def getReferrers(workspaceId: UUID, targets: Set[AttributeEntityReference]) = { + val inFragment = refsInFragment(targets) + + val baseSql = sql"""select referrer.id, referrer.name, referrer.entity_type + from ENTITY referrer, ENTITY_REFS refs, ENTITY target + where target.id = refs.to_id + and referrer.id = refs.from_id + and target.workspace_id = $workspaceId + and (target.entity_type, target.name) in """ + + concatSqlActions(baseSql, inFragment, sql";").as[JsonEntityRefRecord] + } + + /** + * Returns the set of entities which directly AND RECURSIVELY reference the supplied targets + */ + def getRecursiveReferrers(workspaceId: UUID, targets: Set[AttributeEntityReference]) = { + + // max number of rows to consider in the recursive query. + // this function will never return more than this many results. When called to validate delete requests, + // it means that we will never return more than 10000 referrers for any given set of entities to be deleted. + val recursionLimit = 10000 + + // a recursive SQL query to retrieve all entities that refer to the ${targets} entities, plus all entities that + // refer to the referring entities, plus all entities that refer to those, plus ... + // + // recursive SQL: https://dev.mysql.com/doc/refman/8.4/en/with.html#common-table-expressions-recursive + val startSql = + sql"""WITH RECURSIVE ancestor AS ( + select r.from_id, r.to_id + from ENTITY_REFS r, ENTITY e + where e.id = r.to_id + and e.workspace_id = $workspaceId + and (e.entity_type, e.name) in """ + + val inFragment = refsInFragment(targets) + + val endSql = + sql""" UNION ALL + select r.from_id, r.to_id + from ancestor, ENTITY_REFS r + where ancestor.from_id = r.to_id + limit #$recursionLimit) + select a.from_id, e.entity_type, e.name from ancestor a, ENTITY e + where a.from_id = e.id;""" + + concatSqlActions(startSql, inFragment, endSql).as[JsonEntityRefRecord] + + } + + def softDelete(workspaceId: UUID, targets: Set[AttributeEntityReference]): ReadWriteAction[Int] = { + // short-circuit + if (targets.isEmpty) { + return DBIO.successful(0) + } + val renameSuffix = "_" + getSufficientlyRandomSuffix(1000000000) + val deletedDate = new Timestamp(new Date().getTime) + + val inFragment = refsInFragment(targets) + + val baseSql = sql"""update ENTITY set deleted=1, deleted_date=$deletedDate, name=CONCAT(name, $renameSuffix) + where deleted=0 AND workspace_id=$workspaceId and (entity_type, name) in """ + + concatSqlActions(baseSql, inFragment, sql";").asUpdate + } + + def renameSingleEntity(workspaceId: UUID, entity: AttributeEntityReference, newName: String): ReadWriteAction[Int] = + sql"""update ENTITY set name = $newName + where workspace_id = $workspaceId + and entity_type = ${entity.entityType} + and name = ${entity.entityName};""".asUpdate + + private def refsInFragment(refs: Set[AttributeEntityReference]) = { + // build select statements for each type + val pairs = refs.map { ref => + sql"""(${ref.entityType}, ${ref.entityName})""" + } + concatSqlActions(sql"(", reduceSqlActionsWithDelim(pairs.toSeq, sql","), sql")") + } + + def renameEmbeddedReferences(workspaceId: UUID, + toId: Long, + oldReference: AttributeEntityReference, + newReference: AttributeEntityReference + ): ReadWriteAction[Int] = { + // build string to be replaced + val oldStr = s"""{"entityName": "${oldReference.entityName}", "entityType": "${oldReference.entityType}"}""" + // build string to be the replacement + val newStr = s"""{"entityName": "${newReference.entityName}", "entityType": "${newReference.entityType}"}""" + + // perform replacements + sql"""update ENTITY set attributes = REPLACE(attributes, $oldStr, $newStr) + where workspace_id = $workspaceId and id in ( + select from_id from ENTITY_REFS er + where er.to_id = $toId + )""".asUpdate + } + + } + + private def singleResult[V](results: ReadAction[Seq[V]]): ReadAction[V] = + results map { + case Seq() => throw new RawlsException(s"Expected 1 result but found 0") + case Seq(one) => one + case tooMany => throw new RawlsException(s"Expected 1 result but found ${tooMany.size}") + } + +} diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityManager.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityManager.scala index 14cc72b0eb..f1bc4588d6 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityManager.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityManager.scala @@ -14,6 +14,7 @@ import org.broadinstitute.dsde.rawls.dataaccess.{ import org.broadinstitute.dsde.rawls.entities.base.{EntityProvider, EntityProviderBuilder} import org.broadinstitute.dsde.rawls.entities.datarepo.{DataRepoEntityProvider, DataRepoEntityProviderBuilder} import org.broadinstitute.dsde.rawls.entities.exceptions.DataEntityException +import org.broadinstitute.dsde.rawls.entities.json.{JsonEntityProvider, JsonEntityProviderBuilder} import org.broadinstitute.dsde.rawls.entities.local.{LocalEntityProvider, LocalEntityProviderBuilder} import org.broadinstitute.dsde.rawls.model.{ErrorReport, WorkspaceType} @@ -57,10 +58,12 @@ class EntityManager(providerBuilders: Set[EntityProviderBuilder[_ <: EntityProvi // soon: look up the reference name to ensure it exists. // for now, this simplistic logic illustrates the approach: choose the right builder for the job. - val targetTag = if (requestArguments.dataReference.isDefined) { - typeTag[DataRepoEntityProvider] - } else { - typeTag[LocalEntityProvider] + + // TODO AJ-2008: this is a temporary hack to get JsonEntityProvider working + val targetTag = (requestArguments.dataReference, requestArguments.workspace) match { + case (Some(_), _) => typeTag[DataRepoEntityProvider] + case (_, x) if x.name.contains("AJ-2008") => typeTag[JsonEntityProvider] + case _ => typeTag[LocalEntityProvider] } providerBuilders.find(_.builds == targetTag) match { @@ -110,6 +113,8 @@ object EntityManager { config ) // implicit executionContext - new EntityManager(Set(defaultEntityProviderBuilder, dataRepoEntityProviderBuilder)) + val jsonEntityProviderBuilder = new JsonEntityProviderBuilder(dataSource, cacheEnabled, queryTimeout, metricsPrefix) + + new EntityManager(Set(defaultEntityProviderBuilder, dataRepoEntityProviderBuilder, jsonEntityProviderBuilder)) } } diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityService.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityService.scala index 6e12a39b0e..af40ff802e 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityService.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/EntityService.scala @@ -102,6 +102,7 @@ class EntityService(protected val ctx: RawlsRequestContext, .recover(bigQueryRecover) } + // TODO AJ-2008: move to EntityProvider def updateEntity(workspaceName: WorkspaceName, entityType: String, entityName: String, @@ -196,6 +197,7 @@ class EntityService(protected val ctx: RawlsRequestContext, .recover(bigQueryRecover) } + // TODO AJ-2008: move to EntityProvider def deleteEntityAttributes(workspaceName: WorkspaceName, entityType: String, attributeNames: Set[AttributeName] @@ -219,27 +221,31 @@ class EntityService(protected val ctx: RawlsRequestContext, sqlLoggingRecover(s"deleteEntityAttributes: $workspaceName $entityType ${attributeNames.size} attribute names") ) - def renameEntity(workspaceName: WorkspaceName, entityType: String, entityName: String, newName: String): Future[Int] = + // TODO AJ-2008: move to EntityProvider + def renameEntity(workspaceName: WorkspaceName, + entityType: String, + entityName: String, + newName: String, + dataReference: Option[DataReferenceName], + billingProject: Option[GoogleProjectId] + ): Future[Int] = (getV2WorkspaceContextAndPermissions(workspaceName, SamWorkspaceActions.write, Some(WorkspaceAttributeSpecs(all = false)) ) flatMap { workspaceContext => - dataSource.inTransaction { dataAccess => - withEntity(workspaceContext, entityType, entityName, dataAccess) { entity => - dataAccess.entityQuery.get(workspaceContext, entity.entityType, newName) flatMap { - case None => dataAccess.entityQuery.rename(workspaceContext, entity.entityType, entity.name, newName) - case Some(_) => - throw new RawlsExceptionWithErrorReport( - errorReport = - ErrorReport(StatusCodes.Conflict, s"Destination ${entity.entityType} ${newName} already exists") - ) - } - } - } + val entityRequestArguments = EntityRequestArguments(workspaceContext, ctx, dataReference, billingProject) + for { + entityProvider <- entityManager.resolveProviderFuture(entityRequestArguments) + numberOfEntitiesRenamed <- entityProvider.renameEntity(AttributeEntityReference(entityType, entityName), + newName + ) + } yield numberOfEntitiesRenamed + }).recover( sqlLoggingRecover(s"renameEntity: $workspaceName $entityType $entityName") ) + // TODO AJ-2008: move to EntityProvider def renameEntityType(workspaceName: WorkspaceName, oldName: String, renameInfo: EntityTypeRename): Future[Int] = { import org.broadinstitute.dsde.rawls.dataaccess.slick.{DataAccess, ReadAction} @@ -292,6 +298,7 @@ class EntityService(protected val ctx: RawlsRequestContext, ) } + // TODO AJ-2008: move to EntityProvider def evaluateExpression(workspaceName: WorkspaceName, entityType: String, entityName: String, @@ -381,6 +388,7 @@ class EntityService(protected val ctx: RawlsRequestContext, Source.fromPublisher(dataSource.database.stream(allAttrsStream)) } + // TODO AJ-2008: move to EntityProvider def listEntities(workspaceName: WorkspaceName, entityType: String) = (getWorkspaceContextAndPermissions(workspaceName, SamWorkspaceActions.read, @@ -419,6 +427,7 @@ class EntityService(protected val ctx: RawlsRequestContext, } } + // TODO AJ-2008: move to EntityProvider def copyEntities(entityCopyDef: EntityCopyDefinition, linkExistingEntities: Boolean): Future[EntityCopyResponse] = (for { destWsCtx <- getV2WorkspaceContextAndPermissions(entityCopyDef.destinationWorkspace, @@ -490,6 +499,7 @@ class EntityService(protected val ctx: RawlsRequestContext, sqlLoggingRecover(s"batchUpsertEntities: $workspaceName ${entityUpdates.size} upserts") ) + // TODO AJ-2008: move to EntityProvider def renameAttribute(workspaceName: WorkspaceName, entityType: String, oldAttributeName: AttributeName, diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/base/EntityProvider.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/base/EntityProvider.scala index 2d7ceba197..4a532e669c 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/base/EntityProvider.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/base/EntityProvider.scala @@ -16,7 +16,8 @@ import org.broadinstitute.dsde.rawls.model.{ EntityTypeMetadata, RawlsRequestContext, SubmissionValidationEntityInputs, - Workspace + Workspace, + WorkspaceName } import scala.concurrent.Future @@ -88,4 +89,6 @@ trait EntityProvider { linkExistingEntities: Boolean, parentContext: RawlsRequestContext ): Future[EntityCopyResponse] + + def renameEntity(entity: AttributeEntityReference, newName: String): Future[Int] } diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/datarepo/DataRepoEntityProvider.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/datarepo/DataRepoEntityProvider.scala index 0c938bd470..2ab01720a7 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/datarepo/DataRepoEntityProvider.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/datarepo/DataRepoEntityProvider.scala @@ -547,4 +547,7 @@ class DataRepoEntityProvider(snapshotModel: SnapshotModel, parentContext: RawlsRequestContext ): Future[EntityCopyResponse] = throw new UnsupportedEntityOperationException("copy entities not supported by this provider.") + + override def renameEntity(entity: AttributeEntityReference, newName: EntityName): Future[Int] = + throw new UnsupportedEntityOperationException("renameEntity not supported by this provider.") } diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/json/JsonEntityProvider.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/json/JsonEntityProvider.scala new file mode 100644 index 0000000000..6c23c6c866 --- /dev/null +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/json/JsonEntityProvider.scala @@ -0,0 +1,648 @@ +package org.broadinstitute.dsde.rawls.entities.json + +import akka.stream.scaladsl.Source +import com.typesafe.scalalogging.LazyLogging +import org.broadinstitute.dsde.rawls.dataaccess.SlickDataSource +import org.broadinstitute.dsde.rawls.entities.EntityRequestArguments +import org.broadinstitute.dsde.rawls.entities.base.ExpressionEvaluationSupport.LookupExpression +import org.broadinstitute.dsde.rawls.entities.base.{EntityProvider, ExpressionEvaluationContext, ExpressionValidator} +import org.broadinstitute.dsde.rawls.jobexec.MethodConfigResolver +import org.broadinstitute.dsde.rawls.model.Attributable.{entityIdAttributeSuffix, workspaceIdAttribute, AttributeMap} +import org.broadinstitute.dsde.rawls.model.{ + AttributeEntityReference, + AttributeEntityReferenceList, + AttributeName, + AttributeUpdateOperations, + AttributeValue, + Entity, + EntityCopyResponse, + EntityQuery, + EntityQueryResponse, + EntityQueryResultMetadata, + EntityTypeMetadata, + RawlsRequestContext, + SubmissionValidationEntityInputs, + Workspace +} +import spray.json._ +import DefaultJsonProtocol._ +import akka.http.scaladsl.model.StatusCodes +import bio.terra.common.exception.NotImplementedException +import io.opentelemetry.api.common.AttributeKey +import org.apache.commons.lang3.time.StopWatch +import org.broadinstitute.dsde.rawls.RawlsException +import org.broadinstitute.dsde.rawls.dataaccess.slick.{ + DataAccess, + JsonEntityRecord, + JsonEntityRefRecord, + JsonEntitySlickRecord, + ReadAction, + RefPointerRecord +} +import org.broadinstitute.dsde.rawls.entities.exceptions.{DataEntityException, DeleteEntitiesConflictException} +import org.broadinstitute.dsde.rawls.model.AttributeUpdateOperations.EntityUpdateDefinition +import org.broadinstitute.dsde.rawls.model.WorkspaceJsonSupport._ +import org.broadinstitute.dsde.rawls.util.AttributeSupport +import org.broadinstitute.dsde.rawls.util.TracingUtils.{ + setTraceSpanAttribute, + traceDBIOWithParent, + traceFutureWithParent +} +import slick.dbio.DBIO +import slick.jdbc.TransactionIsolation + +import java.time.Duration +import java.util.UUID +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Try} + +// TODO AJ-2008: +// - tracing +// - mark transactions as read-only where possible (does this actually help?) +// - error-handling +// - logging +class JsonEntityProvider(requestArguments: EntityRequestArguments, + implicit protected val dataSource: SlickDataSource, + cacheEnabled: Boolean, + queryTimeout: Duration, + val workbenchMetricBaseName: String +)(implicit protected val executionContext: ExecutionContext) + extends EntityProvider + with AttributeSupport + with LazyLogging { + + override def entityStoreId: Option[String] = None + + val workspaceId: UUID = requestArguments.workspace.workspaceIdAsUUID // shorthand for methods below + + /** + * Insert a single entity to the db + */ + override def createEntity(entity: Entity): Future[Entity] = + dataSource.inTransaction { dataAccess => + for { + // find and validate all references in the entity-to-be-saved + referenceTargets <- DBIO.from(validateReferences(entity)) + + // save the entity + _ <- dataAccess.jsonEntityQuery.createEntity(workspaceId, entity) + // did it save correctly? re-retrieve it. By re-retrieving it, we can 1) get its id, and 2) get the actual, + // normalized JSON that was persisted to the db. When we return the entity to the user, we return the + // normalized version. + savedEntityRecordOption <- dataAccess.jsonEntityQuery.getEntity(workspaceId, entity.entityType, entity.name) + savedEntityRecord = savedEntityRecordOption.getOrElse(throw new RuntimeException("Could not save entity")) + // save all references from this entity to other entities + _ <- DBIO.from(replaceReferences(savedEntityRecord.id, referenceTargets, isInsert = true)) + } yield savedEntityRecord.toEntity + } + + /** + * Read a single entity from the db + */ + override def getEntity(entityType: String, entityName: String): Future[Entity] = dataSource.inTransaction { + dataAccess => + dataAccess.jsonEntityQuery.getEntity(workspaceId, entityType, entityName) + } map { result => result.map(_.toEntity).get } + + /** + * Soft-delete specified entities + */ + override def deleteEntities(entityRefs: Seq[AttributeEntityReference]): Future[Int] = { + val stopwatch = StopWatch.createStarted() + val deleteTargets = entityRefs.toSet + dataSource.inTransaction { dataAccess => + dataAccess.jsonEntityQuery.getRecursiveReferrers(workspaceId, deleteTargets) + } flatMap { referrers => + val referringSet = referrers.map(x => AttributeEntityReference(x.entityType, x.name)).toSet + if (referringSet != deleteTargets) { + stopwatch.stop() + logger.info(s"***** deleteEntities complete in ${stopwatch.getTime}ms") + throw new DeleteEntitiesConflictException(referringSet) + } else + dataSource.inTransaction { dataAccess => + dataAccess.jsonEntityQuery.softDelete(workspaceId, deleteTargets) map { result => + stopwatch.stop() + logger.info(s"***** deleteEntities complete in ${stopwatch.getTime}ms") + result + } + } + } + } + + /** + * Return type/count/attribute metadata + * TODO AJ-2008: assess performance and add caching if necessary + */ + override def entityTypeMetadata(useCache: Boolean): Future[Map[String, EntityTypeMetadata]] = { + + def attrsV1(dataAccess: DataAccess) = { + val stopwatch = StopWatch.createStarted() + dataAccess.jsonEntityQuery.typesAndAttributes(workspaceId) map { result => + stopwatch.stop() + logger.info(s"***** attrsV1 complete in ${stopwatch.getTime}ms") + result + } + } + + def attrsV2(dataAccess: DataAccess) = { + val stopwatch = StopWatch.createStarted() + dataAccess.jsonEntityQuery.typesAndAttributesV2(workspaceId) map { result => + stopwatch.stop() + logger.info(s"***** attrsV2 complete in ${stopwatch.getTime}ms") + result + } + } + + dataSource.inTransaction { dataAccess => + // get the types and counts + for { + typesAndCounts <- dataAccess.jsonEntityQuery.typesAndCounts(workspaceId) + // typesAndAttributes <- attrsV1(dataAccess) + typesAndAttributes <- attrsV2(dataAccess) + } yield { + // group attribute names by entity type + val groupedAttributeNames: Map[String, Seq[String]] = + typesAndAttributes + .groupMap(_._1)(_._2) + + // loop through the types and counts and build the EntityTypeMetadata + typesAndCounts.map { case (entityType: String, count: Int) => + // grab attribute names + val attrNames = groupedAttributeNames.getOrElse(entityType, Seq()) + val metadata = EntityTypeMetadata(count, s"$entityType$entityIdAttributeSuffix", attrNames) + (entityType, metadata) + }.toMap + } + } + } + + /** + * stream a page of entities + */ + override def queryEntitiesSource(entityType: String, + entityQuery: EntityQuery, + parentContext: RawlsRequestContext + ): Future[(EntityQueryResultMetadata, Source[Entity, _])] = { + val stopwatch = StopWatch.createStarted() + queryEntities(entityType, entityQuery, parentContext).map { queryResponse => + stopwatch.stop() + logger.info(s"***** queryEntitiesSource complete in ${stopwatch.getTime}ms") + // TODO AJ-2008: actually stream! + (queryResponse.resultMetadata, Source.apply(queryResponse.results)) + } + } + + /** + * return a page of entities + */ + override def queryEntities(entityType: String, + entityQuery: EntityQuery, + parentContext: RawlsRequestContext + ): Future[EntityQueryResponse] = dataSource.inTransaction { dataAccess => + for { + results <- dataAccess.jsonEntityQuery.queryEntities(workspaceId, entityType, entityQuery) + // TODO AJ-2008: optimize; if no filters are present, don't need separate queries for counts + unfilteredCount <- dataAccess.jsonEntityQuery.countType(workspaceId, entityType) + filteredCount <- dataAccess.jsonEntityQuery.countQuery(workspaceId, entityType, entityQuery) + } yield { + val pageCount: Int = Math.ceil(filteredCount.toFloat / entityQuery.pageSize).toInt + if (filteredCount > 0 && entityQuery.page > pageCount) { + throw new DataEntityException( + code = StatusCodes.BadRequest, + message = s"requested page ${entityQuery.page} is greater than the number of pages $pageCount" + ) + } + val queryMetadata = EntityQueryResultMetadata(unfilteredCount, filteredCount, pageCount) + EntityQueryResponse(entityQuery, queryMetadata, results) + } + } + + /** + * update multiple entities; they must pre-exist + */ + override def batchUpdateEntities( + entityUpdates: Seq[AttributeUpdateOperations.EntityUpdateDefinition] + ): Future[Iterable[Entity]] = batchUpdateEntitiesImpl(entityUpdates, upsert = false) + + /** + * upsert multiple entities; will create if they do not pre-exist + */ + override def batchUpsertEntities( + entityUpdates: Seq[AttributeUpdateOperations.EntityUpdateDefinition] + ): Future[Iterable[Entity]] = batchUpdateEntitiesImpl(entityUpdates, upsert = true) + + /** + * internal implementation for both batchUpsert and batchUpdate + */ + def batchUpdateEntitiesImpl(entityUpdates: Seq[EntityUpdateDefinition], upsert: Boolean): Future[Iterable[Entity]] = { + // start tracing + traceFutureWithParent("JsonEntityProvider.batchUpdateEntitiesImpl", requestArguments.ctx) { localContext => + setTraceSpanAttribute(localContext, AttributeKey.stringKey("workspaceId"), workspaceId.toString) + setTraceSpanAttribute(localContext, AttributeKey.booleanKey("upsert"), java.lang.Boolean.valueOf(upsert)) + setTraceSpanAttribute(localContext, + AttributeKey.longKey("entityUpdatesCount"), + java.lang.Long.valueOf(entityUpdates.length) + ) + setTraceSpanAttribute(localContext, + AttributeKey.longKey("entityOperationsCount"), + java.lang.Long.valueOf(entityUpdates.map(_.operations.length).sum) + ) + + val stopwatch = StopWatch.createStarted() + + val numUpdates = entityUpdates.size + val numOperations = entityUpdates.flatMap(_.operations).size + + logger.info(s"***** batchUpdateEntitiesImpl processing $numUpdates updates with $numOperations operations") + + // find all attribute names mentioned + val namesToCheck = for { + update <- entityUpdates + operation <- update.operations + } yield operation.name + + // validate all attribute names + withAttributeNamespaceCheck(namesToCheck)(() => ()) + + dataSource + .inTransaction { dataAccess => + import dataAccess.driver.api._ + + // identify all the entities mentioned in entityUpdates + val allMentionedEntities: Set[AttributeEntityReference] = + entityUpdates.map(eu => AttributeEntityReference(eu.entityType, eu.name)).toSet + + logger.info(s"***** the $numUpdates updates target ${allMentionedEntities.size} distinct entities.") + + // retrieve all of ${allMentionedEntities} in one query and validate existence if these are not upserts + dataAccess.jsonEntityQuery.getEntities(workspaceId, allMentionedEntities) flatMap { existingEntities => + if (!upsert && existingEntities.size != allMentionedEntities.size) { + throw new RuntimeException( + s"Expected all entities being updated to exist; missing ${allMentionedEntities.size - existingEntities.size}" + ) + } + + logger.info( + s"***** of the ${allMentionedEntities.size} distinct entities being updated, ${existingEntities.size} already exist." + ) + + // build map of (entityType, name) -> JsonEntityRecord for efficient lookup + val existingEntityMap: Map[(String, String), JsonEntityRecord] = + existingEntities.map(rec => (rec.entityType, rec.name) -> rec).toMap + + // iterate through the desired updates and apply them + val tableRecords: Seq[Option[JsonEntitySlickRecord]] = entityUpdates.map { entityUpdate => + // attempt to retrieve an existing entity + val existingRecordOption = existingEntityMap.get((entityUpdate.entityType, entityUpdate.name)) + + // this shouldn't happen because we validated above, but we're being defensive + if (!upsert && existingRecordOption.isEmpty) { + throw new RuntimeException("Expected all entities being updated to exist") + } + + // TODO AJ-2008/AJ-2009: Re-retrieve the existing entity if we are updating the same entity multiple times + // see AJ-2009; the existing code does the wrong thing and this code should do better + val baseEntity: Entity = + existingRecordOption + .map(_.toEntity) + .getOrElse(Entity(entityUpdate.name, entityUpdate.entityType, Map())) + + // TODO AJ-2008: collect all the apply errors instead of handling them one-by-one? + val updatedEntity: Entity = applyOperationsToEntity(baseEntity, entityUpdate.operations) + + // if the entity hasn't changed, skip it + if (existingRecordOption.nonEmpty && baseEntity.attributes == updatedEntity.attributes) { + Option.empty[JsonEntitySlickRecord] + } else { + // translate back to a JsonEntitySlickRecord for later insert/update + // TODO AJ-2008: so far we retrieved a JsonEntityRecord, translated it to an Entity, and are now + // translating it to JsonEntitySlickRecord; we could do better + Some( + JsonEntitySlickRecord( + id = existingRecordOption.map(_.id).getOrElse(0), + name = updatedEntity.name, + entityType = updatedEntity.entityType, + workspaceId = workspaceId, + recordVersion = existingRecordOption.map(_.recordVersion).getOrElse(0), + deleted = false, + deletedDate = None, + attributes = Some(updatedEntity.attributes.toJson.compactPrint) + ) + ) + } + } + + // for logging purposes, count the noops + val noopCount = tableRecords.count(_.isEmpty) + + // separate the records-to-be-saved into inserts and updates + // we identify inserts as those having id 0 + val (inserts, updates) = tableRecords.flatten.partition(_.id == 0) + + logger.info( + s"***** all updates have been prepared: ${inserts.size} inserts, ${updates.size} updates, ${noopCount} noop updates." + ) + + // do NOT use the "returning" syntax above, as it forces individual insert statements for each entity. + // instead, we insert using non-returning syntax, then perform a second query to get the ids + val insertResult = dataAccess.jsonEntitySlickQuery ++= inserts + + // TODO AJ-2008: don't eagerly kick these off; can cause parallelism problems +// val updateRefFutures: Seq[Future[_]] = updates.map { upd => +// synchronizeReferences(upd.id, upd.toEntity) +// } + + logger.info(s"***** performing inserts ...") + insertResult.flatMap { _ => + // skip any inserts that have zero references + val insertsWithReferences = + inserts.flatMap(ins => + if (findAllReferences(ins.toEntity).isEmpty) { None } + else { Some(ins) } + ) + logger.info(s"***** adding references for ${insertsWithReferences.size} inserts ...") + + // retrieve the ids for the inserts that do have references + dataAccess.jsonEntityQuery.getEntityRefs( + workspaceId, + insertsWithReferences.map(x => AttributeEntityReference(x.entityType, x.name)).toSet + ) flatMap { inserted => + // map the inserted ids back to the full entities that were inserted + val insertedIds = inserted.map(x => (x.entityType, x.name) -> x.id).toMap + + val insertsWithReferencesAndIds: Seq[JsonEntitySlickRecord] = insertsWithReferences.map { ins => + val id = insertedIds.getOrElse((ins.entityType, ins.name), + throw new RuntimeException("couldn't find inserted id") + ) + ins.copy(id = id) + } + + slick.dbio.DBIO.from(synchronizeInsertedReferences(insertsWithReferencesAndIds)) flatMap { _ => + logger.info(s"***** performing updates ...") + val idsBeingUpdated: Seq[Long] = updates.map(_.id) + + slick.dbio.DBIO.sequence(updates.map { upd => + dataAccess.jsonEntityQuery.updateEntity(workspaceId, upd.toEntity, upd.recordVersion) map { + updatedCount => + if (updatedCount == 0) { + throw new RuntimeException("Update failed. Concurrent modifications?") + } + } + }) flatMap { _ => + logger.info(s"***** adding references for updates ...") + // delete all from ENTITY_REFS where from_id in (entities being updated) + dataAccess.jsonEntityRefSlickQuery.filter(_.fromId inSetBind idsBeingUpdated).delete flatMap { _ => + // insert all references for the records being updated + // TODO AJ-2008: instead of delete all/insert all, can we optimize? + slick.dbio.DBIO.from(synchronizeInsertedReferences(updates)) flatMap { _ => + stopwatch.stop() + logger.info(s"***** all writes complete in ${stopwatch.getTime}ms") + slick.dbio.DBIO.successful(()) + } + } + } + } + } + } + } + } + .map { _ => + logger.info(s"***** all inserts and updates completed.") + // returns nothing. EntityApiService explicitly returns a 204 with no response body; so we don't bother + // returning anything at all from here. + // TODO AJ-2008: does this have any compatibility issues elsewhere? LocalEntityProvider does return entities. + Seq() + } + } // end trace + } + + override def copyEntities(sourceWorkspaceContext: Workspace, + destWorkspaceContext: Workspace, + entityType: String, + entityNames: Seq[String], + linkExistingEntities: Boolean, + parentContext: RawlsRequestContext + ): Future[EntityCopyResponse] = ??? + + override def deleteEntitiesOfType(entityType: String): Future[Int] = ??? + + override def evaluateExpressions(expressionEvaluationContext: ExpressionEvaluationContext, + gatherInputsResult: MethodConfigResolver.GatherInputsResult, + workspaceExpressionResults: Map[LookupExpression, Try[Iterable[AttributeValue]]] + ): Future[LazyList[SubmissionValidationEntityInputs]] = ??? + + override def expressionValidator: ExpressionValidator = ??? + + override def renameEntity(entity: AttributeEntityReference, newName: String): Future[Int] = + dataSource.inTransaction { dataAccess => + import dataAccess.driver.api._ + + // get the entity. This validates it exists, as well as retrieves its id which we will need later + dataAccess.jsonEntityQuery.getEntityRef(workspaceId, entity.entityType, entity.entityName) flatMap { + existingOption => + val existing = existingOption.getOrElse(throw new DataEntityException("Entity not found")) + // rename the specific entity + dataAccess.jsonEntityQuery.renameSingleEntity(workspaceId, entity, newName) flatMap { numRenamed => + if (numRenamed == 0) { + // this shouldn't happen, since we just verified its existence + throw new DataEntityException("Entity not renamed") + } else if (numRenamed > 1) { + // this shouldn't happen, since the db enforces uniqueness of workspaceId+entityType+name + throw new DataEntityException( + "Unexpected error; found more than one entity to rename" + ) + } else { + // replace the reference in all referrers + // TODO! + dataAccess.jsonEntityQuery.renameEmbeddedReferences(workspaceId, + existing.id, + entity, + entity.copy(entityName = newName) + ) map { embeddedUpdates => + logger.info(s"***** renameEntity updated $embeddedUpdates embedded references") + // return the number of entities renamed, which should be one + numRenamed + } + } + } + } + + } + + // ==================================================================================================== + // helper methods + // ==================================================================================================== + + // given potential references from an entity, verify that the reference targets all exist, + // and return their ids. + private def validateReferences(entity: Entity): Future[Map[AttributeName, Seq[JsonEntityRefRecord]]] = { + // find all refs in the entity + val refs: Map[AttributeName, Seq[AttributeEntityReference]] = findAllReferences(entity) + + // short-circuit + if (refs.isEmpty) { + Future.successful(Map()) + } else { + // validate all refs + val allRefs: Set[AttributeEntityReference] = refs.values.flatten.toSet + + dataSource.inTransaction { dataAccess => + dataAccess.jsonEntityQuery.getEntityRefs(workspaceId, allRefs) map { foundRefs => + if (foundRefs.size != allRefs.size) { + throw new RuntimeException("Did not find all references") + } + // convert the foundRefs to a map for easier lookup + val foundMap: Map[(String, String), JsonEntityRefRecord] = foundRefs.map { foundRef => + ((foundRef.entityType, foundRef.name), foundRef) + }.toMap + + // return all the references found in this entity, mapped to the ids they are referencing + refs.map { case (name: AttributeName, refs: Seq[AttributeEntityReference]) => + val refRecords: Seq[JsonEntityRefRecord] = refs.map(ref => + foundMap.getOrElse((ref.entityType, ref.entityName), + throw new RuntimeException("unexpected; couldn't find ref") + ) + ) + (name, refRecords) + } + } + } + } + } + + private def findReferences(entity: Entity): Seq[AttributeEntityReference] = + entity.attributes + .collect { + case (_: AttributeName, aer: AttributeEntityReference) => Seq(aer) + case (_: AttributeName, aerl: AttributeEntityReferenceList) => aerl.list + } + .flatten + .toSeq + + // given an entity, finds all references in that entity, grouped by their attribute names + private def findAllReferences(entity: Entity): Map[AttributeName, Seq[AttributeEntityReference]] = + entity.attributes + .collect { + case (name: AttributeName, aer: AttributeEntityReference) => Seq((name, aer)) + case (name: AttributeName, aerl: AttributeEntityReferenceList) => aerl.list.map(ref => (name, ref)) + } + .flatten + .toSeq + .groupMap(_._1)(_._2) + + // given already-validated references, including target ids, update the ENTITY_REFS table for a given source + // entity + private def replaceReferences(fromId: Long, + foundRefs: Map[AttributeName, Seq[JsonEntityRefRecord]], + isInsert: Boolean = false + ): Future[Map[AttributeName, Seq[JsonEntityRefRecord]]] = { + // short-circuit + if (isInsert && foundRefs.isEmpty) { + return Future.successful(Map()) + } + dataSource.inTransaction { dataAccess => + import dataAccess.driver.api._ + // we don't actually care about the referencing attribute name or referenced type&name; reduce to just the referenced ids. + val currentEntityRefTargets: Set[Long] = foundRefs.values.flatten.map(_.id).toSet + logger.trace(s"~~~~~ found ${currentEntityRefTargets.size} ref targets in entity $fromId") + for { + // TODO AJ-2008: instead of (retrieve all, then calculate diffs, then execute diffs), try doing it all in the db: + // - delete from ENTITY_REFS where from_id = $fromId and to_id not in ($currentEntityRefTargets) + // - insert into ENTITY_REFS (from_id, to_id) values ($fromId, $currentEntityRefTargets:_*) on duplicate key update from_id=from_id (noop) + // retrieve all existing refs in ENTITY_REFS for this entity; create a set of the target ids + existingRowsSeq <- + if (isInsert) { + slick.dbio.DBIO.successful(Seq.empty[Long]) + } else { + dataAccess.jsonEntityRefSlickQuery.filter(_.fromId === fromId).map(_.toId).result + } + existingRefTargets = existingRowsSeq.toSet + + _ = logger.trace(s"~~~~~ found ${existingRefTargets.size} ref targets in db for entity $fromId") + // find all target ids in the db that are not in the current entity + deletes = existingRefTargets diff currentEntityRefTargets + // find all target ids in the current entity that are not in the db + inserts = currentEntityRefTargets diff existingRefTargets + insertPairs = inserts.map(toId => (fromId, toId)) + _ = logger.trace( + s"~~~~~ prepared ${inserts.size} inserts and ${deletes.size} deletes to perform for entity $fromId" + ) + _ = logger.trace(s"~~~~~ inserts: $insertPairs for entity $fromId") + // insert what needs to be inserted + insertResult <- + if (inserts.nonEmpty) { dataAccess.jsonEntityRefSlickQuery.map(r => (r.fromId, r.toId)) ++= insertPairs } + else { slick.dbio.DBIO.successful(0) } +// insertResult <- dataAccess.jsonEntityQuery.bulkInsertReferences(fromId, inserts) + _ = logger.trace(s"~~~~~ actually inserted ${insertResult} rows for entity $fromId") + // delete what needs to be deleted + deleteResult <- + if (deletes.nonEmpty) { + dataAccess.jsonEntityRefSlickQuery + .filter(x => x.fromId === fromId && x.toId.inSetBind(deletes)) + .delete + } else { slick.dbio.DBIO.successful(0) } + _ = logger.trace(s"~~~~~ actually deleted ${deleteResult} rows for entity $fromId") + } yield foundRefs + } + } + + private def synchronizeInsertedReferences(inserted: Seq[JsonEntitySlickRecord]): Future[Int] = { + // find all references for all records + val referenceRequestsByEntityId: Map[Long, Seq[AttributeEntityReference]] = + inserted.map(ins => ins.id -> findReferences(ins.toEntity)).toMap + + // validate all references for all records + val uniqueReferences: Set[AttributeEntityReference] = referenceRequestsByEntityId.values.flatten.toSet + + // short-circuit + if (uniqueReferences.isEmpty) { + return Future.successful(0) + } + + dataSource.inTransaction { dataAccess => + import dataAccess.driver.api._ + + dataAccess.jsonEntityQuery.getEntityRefs(workspaceId, uniqueReferences) flatMap { foundRefs => + if (foundRefs.size != uniqueReferences.size) { + throw new RuntimeException("Did not find all references") + } + + // convert the foundRefs to a map for easier lookup + val foundMap: Map[(String, String), Long] = foundRefs.map { foundRef => + ((foundRef.entityType, foundRef.name), foundRef.id) + }.toMap + + // generate the (from_id, to_id) pairs to insert into ENTITY_REFS + val targetIdsByFromId: Map[Long, Seq[Long]] = referenceRequestsByEntityId.map { + case (fromId, desiredReferences) => + val targetIds = desiredReferences.map { desiredRef => + foundMap.getOrElse((desiredRef.entityType, desiredRef.entityName), + throw new RuntimeException("this shouldn't happen") + ) + } + (fromId, targetIds) + } + val pairsToInsert: Seq[RefPointerRecord] = targetIdsByFromId.flatMap { case (fromId, toIds) => + toIds.map(toId => RefPointerRecord(fromId, toId)) + }.toSeq + + // perform the insert + (dataAccess.jsonEntityRefSlickQuery ++= pairsToInsert).map(x => x.sum) + + } + } + } + + // helper to call validateReferences followed by replaceReferences + private def synchronizeReferences(fromId: Long, + entity: Entity, + isInsert: Boolean = false + ): Future[Map[AttributeName, Seq[JsonEntityRefRecord]]] = dataSource.inTransaction { _ => + for { + // find and validate all references in this entity. This returns the target internal ids for each reference. + foundRefs <- DBIO.from(validateReferences(entity)) + // + _ <- DBIO.from(replaceReferences(fromId, foundRefs, isInsert)) + } yield foundRefs + } +} diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/json/JsonEntityProviderBuilder.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/json/JsonEntityProviderBuilder.scala new file mode 100644 index 0000000000..5594d6a8bb --- /dev/null +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/json/JsonEntityProviderBuilder.scala @@ -0,0 +1,31 @@ +package org.broadinstitute.dsde.rawls.entities.json + +import org.broadinstitute.dsde.rawls.dataaccess.SlickDataSource +import org.broadinstitute.dsde.rawls.entities.EntityRequestArguments +import org.broadinstitute.dsde.rawls.entities.base.EntityProviderBuilder +import org.broadinstitute.dsde.rawls.entities.local.LocalEntityProvider + +import java.time.Duration +import scala.concurrent.ExecutionContext +import scala.reflect.runtime.universe +import scala.reflect.runtime.universe.typeTag +import scala.util.{Success, Try} + +class JsonEntityProviderBuilder(dataSource: SlickDataSource, + cacheEnabled: Boolean, + queryTimeout: Duration, + metricsPrefix: String +)(implicit + protected val executionContext: ExecutionContext +) extends EntityProviderBuilder[JsonEntityProvider] { + + /** declares the type of EntityProvider this builder will build. + */ + override def builds: universe.TypeTag[JsonEntityProvider] = typeTag[JsonEntityProvider] + + /** create the EntityProvider this builder knows how to create. + */ + override def build(requestArguments: EntityRequestArguments): Try[JsonEntityProvider] = Success( + new JsonEntityProvider(requestArguments, dataSource, cacheEnabled, queryTimeout, metricsPrefix) + ) +} diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/EntityStatisticsCacheSupport.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/EntityStatisticsCacheSupport.scala index 81804029f4..d101ab01e1 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/EntityStatisticsCacheSupport.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/EntityStatisticsCacheSupport.scala @@ -1,6 +1,7 @@ package org.broadinstitute.dsde.rawls.entities.local import com.typesafe.scalalogging.LazyLogging +import org.apache.commons.lang3.time.StopWatch import org.broadinstitute.dsde.rawls.dataaccess.SlickDataSource import org.broadinstitute.dsde.rawls.dataaccess.slick.{DataAccess, ReadAction, ReadWriteAction} import org.broadinstitute.dsde.rawls.metrics.RawlsInstrumented @@ -161,9 +162,15 @@ trait EntityStatisticsCacheSupport extends LazyLogging with RawlsInstrumented { /** wrapper for uncached type-attributes lookup, includes performance tracing */ def uncachedTypeAttributes(dataAccess: DataAccess, parentContext: RawlsRequestContext - ): ReadAction[Map[String, Seq[AttributeName]]] = + ): ReadAction[Map[String, Seq[AttributeName]]] = { + val stopwatch = StopWatch.createStarted() traceDBIOWithParent("getAttrNamesAndEntityTypes", parentContext) { _ => - dataAccess.entityQuery.getAttrNamesAndEntityTypes(workspaceContext.workspaceIdAsUUID) + dataAccess.entityQuery.getAttrNamesAndEntityTypes(workspaceContext.workspaceIdAsUUID) map { result => + stopwatch.stop() + logger.info(s"***** getAttrNamesAndEntityTypes complete in ${stopwatch.getTime}ms") + result + } } + } } diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/LocalEntityProvider.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/LocalEntityProvider.scala index 6417e93c45..0efff36f8d 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/LocalEntityProvider.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/entities/local/LocalEntityProvider.scala @@ -7,6 +7,7 @@ import akka.stream.scaladsl.{Sink, Source} import com.typesafe.scalalogging.LazyLogging import io.opencensus.trace.{AttributeValue => OpenCensusAttributeValue} import io.opentelemetry.api.common.AttributeKey +import org.apache.commons.lang3.time.StopWatch import org.broadinstitute.dsde.rawls.RawlsExceptionWithErrorReport import org.broadinstitute.dsde.rawls.dataaccess.slick.{ DataAccess, @@ -181,7 +182,8 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, } // EntityApiServiceSpec has good test coverage for this api - override def deleteEntities(entRefs: Seq[AttributeEntityReference]): Future[Int] = + override def deleteEntities(entRefs: Seq[AttributeEntityReference]): Future[Int] = { + val stopwatch = StopWatch.createStarted() dataSource.inTransaction { dataAccess => // withAllEntityRefs throws exception if some entities not found; passes through if all ok traceDBIOWithParent("LocalEntityProvider.deleteEntities", requestArguments.ctx) { localContext => @@ -191,13 +193,19 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, traceDBIOWithParent("entityQuery.getAllReferringEntities", localContext)(innerSpan => dataAccess.entityQuery.getAllReferringEntities(workspaceContext, entRefs.toSet) flatMap { referringEntities => - if (referringEntities != entRefs.toSet) + if (referringEntities != entRefs.toSet) { + stopwatch.stop() + logger.info(s"***** deleteEntities complete in ${stopwatch.getTime}ms") throw new DeleteEntitiesConflictException(referringEntities) - else { + } else { traceDBIOWithParent("entityQuery.hide", innerSpan)(_ => dataAccess.entityQuery .hide(workspaceContext, entRefs) - .withStatementParameters(statementInit = _.setQueryTimeout(queryTimeoutSeconds)) + .withStatementParameters(statementInit = _.setQueryTimeout(queryTimeoutSeconds)) map { result => + stopwatch.stop() + logger.info(s"***** deleteEntities complete in ${stopwatch.getTime}ms") + result + } ) } } @@ -205,6 +213,7 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, } } } + } override def deleteEntitiesOfType(entityType: String): Future[Int] = dataSource.inTransaction { dataAccess => @@ -310,6 +319,7 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, query: EntityQuery, parentContext: RawlsRequestContext = requestArguments.ctx ): Future[(EntityQueryResultMetadata, Source[Entity, _])] = { + val stopwatch = StopWatch.createStarted() // look for a columnFilter that specifies the primary key for this entityType; // such a columnFilter means we are filtering by name and can greatly simplify the underlying query. val nameFilter: Option[String] = query.columnFilter match { @@ -353,7 +363,10 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, for { metadata <- queryForMetadata(entityType, query, childContext) entitySource = queryForResultSource(entityType, query, childContext) - } yield (metadata, entitySource) + } yield { + logger.info(s"***** queryEntitiesSource complete in ${stopwatch.getTime}ms") + (metadata, entitySource) + } } } } @@ -457,6 +470,8 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, java.lang.Long.valueOf(entityUpdates.map(_.operations.length).sum) ) + val stopwatch = StopWatch.createStarted() + withAttributeNamespaceCheck(namesToCheck) { dataSource.inTransactionWithAttrTempTable(Set(AttributeTempTableType.Entity)) { dataAccess => val updateTrialsAction = traceDBIOWithParent("getActiveEntities", localContext)(_ => @@ -505,7 +520,13 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, } } - saveAction + traceDBIOWithParent("saveAction", localContext)(_ => + saveAction map { result => + stopwatch.stop() + logger.info(s"***** all writes complete in ${stopwatch.getTime}ms") + result + } + ) } recover { case icve: java.sql.SQLIntegrityConstraintViolationException => val userMessage = @@ -550,4 +571,18 @@ class LocalEntityProvider(requestArguments: EntityRequestArguments, ) } ) + + override def renameEntity(entity: AttributeEntityReference, newName: String): Future[Int] = + dataSource.inTransaction { dataAccess => + withEntity(workspaceContext, entity.entityType, entity.entityName, dataAccess) { entity => + dataAccess.entityQuery.get(workspaceContext, entity.entityType, newName) flatMap { + case None => dataAccess.entityQuery.rename(workspaceContext, entity.entityType, entity.name, newName) + case Some(_) => + throw new RawlsExceptionWithErrorReport( + errorReport = + ErrorReport(StatusCodes.Conflict, s"Destination ${entity.entityType} ${newName} already exists") + ) + } + } + } } diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/webservice/EntityApiService.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/webservice/EntityApiService.scala index 2eecb67d81..2588ae0a39 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/webservice/EntityApiService.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/webservice/EntityApiService.scala @@ -231,7 +231,9 @@ trait EntityApiService extends UserInfoDirectives { .renameEntity(WorkspaceName(workspaceNamespace, workspaceName), entityType, entityName, - newEntityName.name + newEntityName.name, + dataReference, + billingProject ) .map(_ => StatusCodes.NoContent) } diff --git a/core/src/test/scala/org/broadinstitute/dsde/rawls/entities/local/CaseSensitivitySpec.scala b/core/src/test/scala/org/broadinstitute/dsde/rawls/entities/local/CaseSensitivitySpec.scala index e9184e5d4c..59df9724ee 100644 --- a/core/src/test/scala/org/broadinstitute/dsde/rawls/entities/local/CaseSensitivitySpec.scala +++ b/core/src/test/scala/org/broadinstitute/dsde/rawls/entities/local/CaseSensitivitySpec.scala @@ -453,7 +453,7 @@ class CaseSensitivitySpec extends AnyFreeSpec with Matchers with TestDriverCompo // rename entity of target type services.entityService - .renameEntity(testWorkspace.workspace.toWorkspaceName, typeUnderTest, "003", "my-new-name") + .renameEntity(testWorkspace.workspace.toWorkspaceName, typeUnderTest, "003", "my-new-name", None, None) .futureValue // get actual entities