From a9e27b9a96350ee790c7144b45166f3a288da624 Mon Sep 17 00:00:00 2001 From: vkumra-broad Date: Thu, 20 Dec 2018 09:58:32 -0500 Subject: [PATCH] Horizontal Scaling : checkCluster should not use full cluster query (#726) * use min query for clusters --- .../leonardo/db/ClusterComponent.scala | 16 ++++++--- .../monitor/ClusterMonitorSupervisor.scala | 36 +++++++++++-------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala index deb3b67facf..3fe342afc4b 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala @@ -136,17 +136,23 @@ trait ClusterComponent extends LeoComponent { // note: list* methods don't query the INSTANCE table def list(): DBIO[Seq[Cluster]] = { - minimalClusterQuery.result.map(unmarshalMinimalCluster) + clusterLabelQuery.result.map(unmarshalMinimalCluster) } def listActive(): DBIO[Seq[Cluster]] = { - minimalClusterQuery.filter { _._1.status inSetBind ClusterStatus.activeStatuses.map(_.toString) }.result map { recs => + clusterLabelQuery.filter { _._1.status inSetBind ClusterStatus.activeStatuses.map(_.toString) }.result map { recs => unmarshalMinimalCluster(recs) } } + def listMonitoredClusterOnly(): DBIO[Seq[Cluster]] = { + clusterQuery.filter{_.status inSetBind ClusterStatus.monitoredStatuses.map(_.toString) }.result map { recs => + recs.map(rec => unmarshalCluster(rec,Seq.empty, List.empty, Map.empty, List.empty, List.empty, List.empty)) + } + } + def listMonitored(): DBIO[Seq[Cluster]] = { - minimalClusterQuery.filter { _._1.status inSetBind ClusterStatus.monitoredStatuses.map(_.toString) }.result map { recs => + clusterLabelQuery.filter { _._1.status inSetBind ClusterStatus.monitoredStatuses.map(_.toString) }.result map { recs => unmarshalMinimalCluster(recs) } } @@ -319,7 +325,7 @@ trait ClusterComponent extends LeoComponent { } def listByLabels(labelMap: LabelMap, includeDeleted: Boolean, googleProjectOpt: Option[GoogleProject] = None): DBIO[Seq[Cluster]] = { - val clusterStatusQuery = if (includeDeleted) minimalClusterQuery else minimalClusterQuery.filterNot { _._1.status === "Deleted" } + val clusterStatusQuery = if (includeDeleted) clusterLabelQuery else clusterLabelQuery.filterNot { _._1.status === "Deleted" } val clusterStatusQueryByProject = googleProjectOpt match { case Some(googleProject) => clusterStatusQuery.filter { _._1.googleProject === googleProject.value } case None => clusterStatusQuery @@ -480,7 +486,7 @@ trait ClusterComponent extends LeoComponent { // just clusters and labels: no instances, extensions, etc. // select * from cluster c // left join label l on c.id = l.clusterId - val minimalClusterQuery: Query[(ClusterTable, Rep[Option[LabelTable]]), (ClusterRecord, Option[LabelRecord]), Seq] = { + val clusterLabelQuery: Query[(ClusterTable, Rep[Option[LabelTable]]), (ClusterRecord, Option[LabelRecord]), Seq] = { for { (cluster, label) <- clusterQuery joinLeft labelQuery on (_.id === _.clusterId) } yield (cluster, label) diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ClusterMonitorSupervisor.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ClusterMonitorSupervisor.scala index 28cbfe78468..4d7a9b79f59 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ClusterMonitorSupervisor.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ClusterMonitorSupervisor.scala @@ -86,19 +86,27 @@ class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: Dat case RecreateCluster(cluster) => if (monitorConfig.recreateCluster) { logger.info(s"Recreating cluster ${cluster.projectNameString}...") - val clusterRequest = ClusterRequest( - Option(cluster.labels), - cluster.jupyterExtensionUri, - cluster.jupyterUserScriptUri, - Some(cluster.machineConfig), - None, - cluster.userJupyterExtensionConfig, - if (cluster.autopauseThreshold == 0) Some(false) else Some(true), - Some(cluster.autopauseThreshold), - cluster.defaultClientId, - cluster.clusterImages.find(_.tool == Jupyter).map(_.dockerImage)) - leonardoService.internalCreateCluster(cluster.auditInfo.creator, cluster.serviceAccountInfo, cluster.googleProject, cluster.clusterName, clusterRequest).failed.foreach { e => - logger.error(s"Error occurred recreating cluster ${cluster.projectNameString}", e) + dbRef.inTransaction { dataAccess => + dataAccess.clusterQuery.getClusterById(cluster.id) + }.flatMap { + case Some(cluster) => + val clusterRequest = ClusterRequest( + Option(cluster.labels), + cluster.jupyterExtensionUri, + cluster.jupyterUserScriptUri, + Some(cluster.machineConfig), + None, + cluster.userJupyterExtensionConfig, + if (cluster.autopauseThreshold == 0) Some(false) else Some(true), + Some(cluster.autopauseThreshold), + cluster.defaultClientId, + cluster.clusterImages.find(_.tool == Jupyter).map(_.dockerImage)) + val createFuture = leonardoService.internalCreateCluster(cluster.auditInfo.creator, cluster.serviceAccountInfo, cluster.googleProject, cluster.clusterName, clusterRequest) + createFuture.failed.foreach { e => + logger.error(s"Error occurred recreating cluster ${cluster.projectNameString}", e) + } + createFuture + case None => Future.failed(new WorkbenchException(s"Cluster ${cluster.projectNameString} not found in the database")) } } else { logger.warn(s"Received RecreateCluster message for cluster ${cluster.projectNameString} but cluster recreation is disabled.") @@ -172,7 +180,7 @@ class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: Dat val monitoredClusterIds = monitoredClusters.map(_.id) dbRef - .inTransaction { _.clusterQuery.listMonitoredFullCluster() } + .inTransaction { _.clusterQuery.listMonitoredClusterOnly() } .onComplete { case Success(clusters) => val clustersNotAlreadyBeingMonitored = clusters.filterNot(c => monitoredClusterIds.contains(c.id))