Skip to content

Commit

Permalink
Horizontal Scaling : checkCluster should not use full cluster query (#…
Browse files Browse the repository at this point in the history
…726)

* use min query for clusters
  • Loading branch information
vkumra-broad authored Dec 20, 2018
1 parent d86bcce commit a9e27b9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,23 @@ trait ClusterComponent extends LeoComponent {
// note: list* methods don't query the INSTANCE table

def list(): DBIO[Seq[Cluster]] = {
minimalClusterQuery.result.map(unmarshalMinimalCluster)
clusterLabelQuery.result.map(unmarshalMinimalCluster)
}

def listActive(): DBIO[Seq[Cluster]] = {
minimalClusterQuery.filter { _._1.status inSetBind ClusterStatus.activeStatuses.map(_.toString) }.result map { recs =>
clusterLabelQuery.filter { _._1.status inSetBind ClusterStatus.activeStatuses.map(_.toString) }.result map { recs =>
unmarshalMinimalCluster(recs)
}
}

def listMonitoredClusterOnly(): DBIO[Seq[Cluster]] = {
clusterQuery.filter{_.status inSetBind ClusterStatus.monitoredStatuses.map(_.toString) }.result map { recs =>
recs.map(rec => unmarshalCluster(rec,Seq.empty, List.empty, Map.empty, List.empty, List.empty, List.empty))
}
}

def listMonitored(): DBIO[Seq[Cluster]] = {
minimalClusterQuery.filter { _._1.status inSetBind ClusterStatus.monitoredStatuses.map(_.toString) }.result map { recs =>
clusterLabelQuery.filter { _._1.status inSetBind ClusterStatus.monitoredStatuses.map(_.toString) }.result map { recs =>
unmarshalMinimalCluster(recs)
}
}
Expand Down Expand Up @@ -319,7 +325,7 @@ trait ClusterComponent extends LeoComponent {
}

def listByLabels(labelMap: LabelMap, includeDeleted: Boolean, googleProjectOpt: Option[GoogleProject] = None): DBIO[Seq[Cluster]] = {
val clusterStatusQuery = if (includeDeleted) minimalClusterQuery else minimalClusterQuery.filterNot { _._1.status === "Deleted" }
val clusterStatusQuery = if (includeDeleted) clusterLabelQuery else clusterLabelQuery.filterNot { _._1.status === "Deleted" }
val clusterStatusQueryByProject = googleProjectOpt match {
case Some(googleProject) => clusterStatusQuery.filter { _._1.googleProject === googleProject.value }
case None => clusterStatusQuery
Expand Down Expand Up @@ -480,7 +486,7 @@ trait ClusterComponent extends LeoComponent {
// just clusters and labels: no instances, extensions, etc.
// select * from cluster c
// left join label l on c.id = l.clusterId
val minimalClusterQuery: Query[(ClusterTable, Rep[Option[LabelTable]]), (ClusterRecord, Option[LabelRecord]), Seq] = {
val clusterLabelQuery: Query[(ClusterTable, Rep[Option[LabelTable]]), (ClusterRecord, Option[LabelRecord]), Seq] = {
for {
(cluster, label) <- clusterQuery joinLeft labelQuery on (_.id === _.clusterId)
} yield (cluster, label)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,27 @@ class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: Dat
case RecreateCluster(cluster) =>
if (monitorConfig.recreateCluster) {
logger.info(s"Recreating cluster ${cluster.projectNameString}...")
val clusterRequest = ClusterRequest(
Option(cluster.labels),
cluster.jupyterExtensionUri,
cluster.jupyterUserScriptUri,
Some(cluster.machineConfig),
None,
cluster.userJupyterExtensionConfig,
if (cluster.autopauseThreshold == 0) Some(false) else Some(true),
Some(cluster.autopauseThreshold),
cluster.defaultClientId,
cluster.clusterImages.find(_.tool == Jupyter).map(_.dockerImage))
leonardoService.internalCreateCluster(cluster.auditInfo.creator, cluster.serviceAccountInfo, cluster.googleProject, cluster.clusterName, clusterRequest).failed.foreach { e =>
logger.error(s"Error occurred recreating cluster ${cluster.projectNameString}", e)
dbRef.inTransaction { dataAccess =>
dataAccess.clusterQuery.getClusterById(cluster.id)
}.flatMap {
case Some(cluster) =>
val clusterRequest = ClusterRequest(
Option(cluster.labels),
cluster.jupyterExtensionUri,
cluster.jupyterUserScriptUri,
Some(cluster.machineConfig),
None,
cluster.userJupyterExtensionConfig,
if (cluster.autopauseThreshold == 0) Some(false) else Some(true),
Some(cluster.autopauseThreshold),
cluster.defaultClientId,
cluster.clusterImages.find(_.tool == Jupyter).map(_.dockerImage))
val createFuture = leonardoService.internalCreateCluster(cluster.auditInfo.creator, cluster.serviceAccountInfo, cluster.googleProject, cluster.clusterName, clusterRequest)
createFuture.failed.foreach { e =>
logger.error(s"Error occurred recreating cluster ${cluster.projectNameString}", e)
}
createFuture
case None => Future.failed(new WorkbenchException(s"Cluster ${cluster.projectNameString} not found in the database"))
}
} else {
logger.warn(s"Received RecreateCluster message for cluster ${cluster.projectNameString} but cluster recreation is disabled.")
Expand Down Expand Up @@ -172,7 +180,7 @@ class ClusterMonitorSupervisor(monitorConfig: MonitorConfig, dataprocConfig: Dat
val monitoredClusterIds = monitoredClusters.map(_.id)

dbRef
.inTransaction { _.clusterQuery.listMonitoredFullCluster() }
.inTransaction { _.clusterQuery.listMonitoredClusterOnly() }
.onComplete {
case Success(clusters) =>
val clustersNotAlreadyBeingMonitored = clusters.filterNot(c => monitoredClusterIds.contains(c.id))
Expand Down

0 comments on commit a9e27b9

Please sign in to comment.