Skip to content

Commit

Permalink
List clusters by project (#623)
Browse files Browse the repository at this point in the history
* List clusters by project

* Retry IOExceptions from Sam

* PR feedback

* Fix IOException retry condition
  • Loading branch information
rtitle authored Oct 11, 2018
1 parent fcde597 commit cef0c66
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 10 deletions.
51 changes: 51 additions & 0 deletions src/main/resources/swagger/api-docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,57 @@ paths:
- email
- profile

'/api/clusters/{googleProject}':
get:
summary: List all active clusters within the given Google project
description: List all active clusters within the given Google project, optionally filtering on a set of labels
operationId: listClustersByProject
tags:
- cluster
parameters:
- in: path
name: googleProject
description: googleProject
required: true
type: string
- in: query
name: _labels
description: |
Optional label key-value pairs to filter results by. Example: Querying by key1=val1,key2=val2
returns all clusters that contain the key1/val1 and key2/val2 labels (possibly among other labels).
Note: this string format is a workaround because Swagger doesn't support free-form
query string parameters. The recommended way to use this endpoint is to specify the
labels as top-level query string parameters. For instance: GET /api/clusters?key1=val1&key2=val2.
required: false
type: string
- in: query
name: includeDeleted
description: Optional filter that includes any clusters with a Deleted status.
required: false
type: boolean
default: false
responses:
'200':
description: List of clusters
schema:
type: array
items:
$ref: '#/definitions/Cluster'
'400':
description: Bad Request
schema:
$ref: '#/definitions/ErrorReport'
'500':
description: Internal Error
schema:
$ref: '#/definitions/ErrorReport'
security:
- googleoauth:
- openid
- email
- profile

'/api/cluster/v2/{googleProject}/{clusterName}':
put:
summary: Creates a new Dataproc cluster in the given project with the given name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,24 @@ abstract class LeoRoutes(val leonardoService: LeonardoService, val proxyService:
}
}
} ~
path("clusters") {
pathPrefix("clusters") {
parameterMap { params =>
complete {
leonardoService.listClusters(userInfo, params).map { clusters =>
StatusCodes.OK -> clusters
path(Segment) { googleProject =>
get {
complete {
leonardoService.listClusters(userInfo, params, Some(GoogleProject(googleProject))).map { clusters =>
StatusCodes.OK -> clusters
}
}
}
} ~
pathEndOrSingleSlash {
get {
complete {
leonardoService.listClusters(userInfo, params).map { clusters =>
StatusCodes.OK -> clusters
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.broadinstitute.dsde.workbench.leonardo.auth.sam

import java.io.IOException
import java.util.concurrent.TimeUnit

import com.typesafe.config.Config
Expand Down Expand Up @@ -207,6 +208,8 @@ class SamAuthProvider(val config: Config, serviceAccountProvider: ServiceAccount
case e: ApiException if e.getCode == 401 => true
// always invalidate and retry 500 errors
case e: ApiException if e.getCode / 100 == 5 => true
// retry IOExceptions
case e: ApiException if e.getCause.isInstanceOf[IOException] => true
// otherwise don't retry
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,14 @@ trait ClusterComponent extends LeoComponent {
updateClusterStatusAndHostIp(id, ClusterStatus.Stopping, None)
}

def listByLabels(labelMap: LabelMap, includeDeleted: Boolean): DBIO[Seq[Cluster]] = {
def listByLabels(labelMap: LabelMap, includeDeleted: Boolean, googleProjectOpt: Option[GoogleProject] = None): DBIO[Seq[Cluster]] = {
val clusterStatusQuery = if (includeDeleted) clusterQueryWithLabels else clusterQueryWithLabels.filterNot { _._1.status === "Deleted" }
val clusterStatusQueryByProject = googleProjectOpt match {
case Some(googleProject) => clusterStatusQuery.filter { _._1.googleProject === googleProject.value }
case None => clusterStatusQuery
}
val query = if (labelMap.isEmpty) {
clusterStatusQuery
clusterStatusQueryByProject
} else {
// The trick is to find all clusters that have _at least_ all the labels in labelMap.
// In other words, for a given cluster, the labels provided in the query string must be
Expand All @@ -310,8 +314,10 @@ trait ClusterComponent extends LeoComponent {
// where clusterId = c.id and (key, value) in ${labelMap}
// ) = ${labelMap.size}
//
clusterStatusQuery.filter { case (cluster, _, _) =>
labelQuery.filter { _.clusterId === cluster.id }
clusterStatusQueryByProject.filter { case (cluster, _, _) =>
labelQuery.filter {
_.clusterId === cluster.id
}
// The following confusing line is equivalent to the much simpler:
// .filter { lbl => (lbl.key, lbl.value) inSetBind labelMap.toSet }
// Unfortunately slick doesn't support inSet/inSetBind for tuples.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,10 @@ class LeonardoService(protected val dataprocConfig: DataprocConfig,
} else Future.failed(ClusterCannotBeStartedException(cluster.googleProject, cluster.clusterName, cluster.status))
}

def listClusters(userInfo: UserInfo, params: LabelMap): Future[Seq[Cluster]] = {
def listClusters(userInfo: UserInfo, params: LabelMap, googleProjectOpt: Option[GoogleProject] = None): Future[Seq[Cluster]] = {
for {
paramMap <- processListClustersParameters(params)
clusterList <- dbRef.inTransaction { da => da.clusterQuery.listByLabels(paramMap._1, paramMap._2) }
clusterList <- dbRef.inTransaction { da => da.clusterQuery.listByLabels(paramMap._1, paramMap._2, googleProjectOpt) }
visibleClusters <- authProvider.filterUserVisibleClusters(userInfo, clusterList.map(c => (c.googleProject, c.clusterName)).toList)
} yield {
val visibleClustersSet = visibleClusters.toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait CommonTestData{ this: ScalaFutures =>
val name2 = ClusterName("clustername2")
val name3 = ClusterName("clustername3")
val project = GoogleProject("dsp-leo-test")
val project2 = GoogleProject("dsp-leo-test-2")
val userEmail = WorkbenchEmail("[email protected]")
val userInfo = UserInfo(OAuth2BearerToken("accessToken"), WorkbenchUserId("user1"), userEmail, 0)
val serviceAccountEmail = WorkbenchEmail("[email protected]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class LeoRoutesSpec extends FlatSpec with ScalatestRouteTest with CommonTestData
implicit val timeout = RouteTestTimeout(5.seconds dilated)

private val googleProject = GoogleProject("test-project")
private val googleProject2 = GoogleProject("test-project2")
private val clusterName = ClusterName("test-cluster")

val invalidUserLeoRoutes = new LeoRoutes(leonardoService, proxyService, statusService, swaggerConfig) with MockUserInfoDirectives {
Expand Down Expand Up @@ -222,6 +223,72 @@ class LeoRoutesSpec extends FlatSpec with ScalatestRouteTest with CommonTestData
}
}

it should "list clusters by project" in isolatedDbTest {
val newCluster = ClusterRequest(Map.empty, None)

// listClusters should return no clusters initially
Get(s"/api/clusters/${googleProject.value}") ~> timedLeoRoutes.route ~> check {
status shouldEqual StatusCodes.OK
val responseClusters = responseAs[List[Cluster]]
responseClusters shouldBe List.empty[Cluster]
}

Get(s"/api/clusters/${googleProject2.value}") ~> timedLeoRoutes.route ~> check {
status shouldEqual StatusCodes.OK
val responseClusters = responseAs[List[Cluster]]
responseClusters shouldBe List.empty[Cluster]
}


for (i <- 1 to 5) {
Put(s"/api/cluster/${googleProject.value}/${clusterName.value}-$i", newCluster.toJson) ~> leoRoutes.route ~> check {
status shouldEqual StatusCodes.OK
}
}

for (i <- 6 to 10) {
Put(s"/api/cluster/v2/${googleProject2.value}/${clusterName.value}-$i", newCluster.toJson) ~> leoRoutes.route ~> check {
status shouldEqual StatusCodes.Accepted
}
}

Get(s"/api/clusters/${googleProject.value}") ~> timedLeoRoutes.route ~> check {
status shouldEqual StatusCodes.OK

val responseClusters = responseAs[List[Cluster]]
responseClusters should have size 5
responseClusters foreach { cluster =>
cluster.googleProject shouldEqual googleProject
cluster.serviceAccountInfo.clusterServiceAccount shouldEqual clusterServiceAccount(googleProject)
cluster.serviceAccountInfo.notebookServiceAccount shouldEqual notebookServiceAccount(googleProject)
cluster.labels shouldEqual Map(
"clusterName" -> cluster.clusterName.value,
"creator" -> "[email protected]",
"googleProject" -> googleProject.value) ++ serviceAccountLabels
}

validateCookie { header[`Set-Cookie`] }
}

Get(s"/api/clusters/${googleProject2.value}") ~> timedLeoRoutes.route ~> check {
status shouldEqual StatusCodes.OK

val responseClusters = responseAs[List[Cluster]]
responseClusters should have size 5
responseClusters foreach { cluster =>
cluster.googleProject shouldEqual googleProject2
cluster.serviceAccountInfo.clusterServiceAccount shouldEqual clusterServiceAccount(googleProject2)
cluster.serviceAccountInfo.notebookServiceAccount shouldEqual notebookServiceAccount(googleProject2)
cluster.labels shouldEqual Map(
"clusterName" -> cluster.clusterName.value,
"creator" -> "[email protected]",
"googleProject" -> googleProject2.value) ++ serviceAccountLabels
}

validateCookie { header[`Set-Cookie`] }
}
}

it should "202 when stopping and starting a cluster" in isolatedDbTest {
val newCluster = ClusterRequest(Map.empty, None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.broadinstitute.dsde.workbench.leonardo.ClusterEnrichments.{clusterEq,
import org.broadinstitute.dsde.workbench.leonardo.{CommonTestData, GcsPathUtils}
import org.broadinstitute.dsde.workbench.leonardo.model._
import org.broadinstitute.dsde.workbench.leonardo.model.google._
import org.broadinstitute.dsde.workbench.model.google.GoogleProject
import org.scalatest.FlatSpecLike

class ClusterComponentSpec extends TestComponent with FlatSpecLike with CommonTestData with GcsPathUtils {
Expand Down Expand Up @@ -181,4 +182,31 @@ class ClusterComponentSpec extends TestComponent with FlatSpecLike with CommonTe
autoFreezeList should not contain stoppedCluster
autoFreezeList should not contain runningCluster2
}

it should "list by labels and project" in isolatedDbTest {
val savedCluster1 = makeCluster(1)
.copy(labels = Map("bam" -> "yes", "vcf" -> "no", "foo" -> "bar"))
.save(Some(serviceAccountKey.id))

val savedCluster2 = makeCluster(2)
.copy(status = ClusterStatus.Running,
clusterName = name2,
googleProject = project2,
clusterUrl = Cluster.getClusterUrl(project2, name2, clusterUrlBase),
labels = Map("bam" -> "yes"))
.save(Some(serviceAccountKey.id))

val savedCluster3 = makeCluster(3)
.copy(status = ClusterStatus.Deleted,
labels = Map("a" -> "b", "bam" -> "yes"))
.save()

dbFutureValue { _.clusterQuery.listByLabels(Map.empty, false, Some(project)) }.toSet shouldEqual Set(savedCluster1)
dbFutureValue { _.clusterQuery.listByLabels(Map.empty, true, Some(project)) }.toSet shouldEqual Set(savedCluster1, savedCluster3)
dbFutureValue { _.clusterQuery.listByLabels(Map.empty, false, Some(project2)) }.toSet shouldEqual Set(savedCluster2)
dbFutureValue { _.clusterQuery.listByLabels(Map("bam" -> "yes"), true, Some(project)) }.toSet shouldEqual Set(savedCluster1, savedCluster3)
dbFutureValue { _.clusterQuery.listByLabels(Map("bam" -> "yes"), false, Some(project2)) }.toSet shouldEqual Set(savedCluster2)
dbFutureValue { _.clusterQuery.listByLabels(Map("a" -> "b"), true, Some(project)) }.toSet shouldEqual Set(savedCluster3)
dbFutureValue { _.clusterQuery.listByLabels(Map("a" -> "b"), true, Some(project2)) }.toSet shouldEqual Set.empty[Cluster]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,20 @@ class LeonardoServiceSpec extends TestKit(ActorSystem("leonardotest")) with Flat
}
}

it should "list clusters belonging to a project" in isolatedDbTest {
// create a couple of clusters
val cluster1 = leo.createCluster(userInfo, project, name1, testClusterRequest).futureValue
val cluster2 = leo.createCluster(userInfo, project2, name2, testClusterRequest.copy(labels = Map("a" -> "b", "foo" -> "bar"))).futureValue

leo.listClusters(userInfo, Map.empty, Some(project)).futureValue.toSet shouldBe Set(cluster1)
leo.listClusters(userInfo, Map.empty, Some(project2)).futureValue.toSet shouldBe Set(cluster2)
leo.listClusters(userInfo, Map("foo" -> "bar"), Some(project)).futureValue.toSet shouldBe Set(cluster1)
leo.listClusters(userInfo, Map("foo" -> "bar"), Some(project2)).futureValue.toSet shouldBe Set(cluster2)
leo.listClusters(userInfo, Map("k" -> "v"), Some(project)).futureValue.toSet shouldBe Set.empty
leo.listClusters(userInfo, Map("k" -> "v"), Some(project2)).futureValue.toSet shouldBe Set.empty
leo.listClusters(userInfo, Map("foo" -> "bar"), Some(GoogleProject("non-existing-project"))).futureValue.toSet shouldBe Set.empty
}

it should "delete the init bucket if cluster creation fails" in isolatedDbTest {
// create the cluster
val clusterCreateResponse = leo.createCluster(userInfo, project, gdDAO.badClusterName, testClusterRequest).failed.futureValue
Expand Down

0 comments on commit cef0c66

Please sign in to comment.