Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace JGraphT with scala-graph #3297

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging}
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.core.workflow.PhysicalLink
import org.jgrapht.alg.connectivity.BiconnectivityInspector
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging}
import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jgrapht is still used in many other places in the amber module. Please do a thorough check.


import java.util.concurrent.TimeoutException
Expand Down Expand Up @@ -70,12 +68,10 @@ class CostBasedScheduleGenerator(
val matEdgesRemovedDAG = matEdges.foldLeft(physicalPlan) { (currentPlan, linkToRemove) =>
currentPlan.removeLink(linkToRemove)
}
val connectedComponents = new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink](
matEdgesRemovedDAG.dag
).getConnectedComponents.asScala.toSet
val connectedComponents = matEdgesRemovedDAG.dag.componentTraverser().toSet
connectedComponents.zipWithIndex.map {
case (connectedSubDAG, idx) =>
val operatorIds = connectedSubDAG.vertexSet().asScala.toSet
val operatorIds = connectedSubDAG.nodes.map(_.outer)
val links = operatorIds
.flatMap(operatorId => {
physicalPlan.getUpstreamPhysicalLinks(operatorId) ++ physicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package edu.uci.ics.amber.engine.architecture.scheduling

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.WorkflowRuntimeException
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity
import edu.uci.ics.amber.core.workflow.PhysicalLink
import org.jgrapht.alg.connectivity.BiconnectivityInspector
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PhysicalPlan, WorkflowContext}
import org.jgrapht.graph.DirectedAcyclicGraph

import scala.annotation.tailrec
Expand Down Expand Up @@ -63,12 +61,10 @@ class ExpansionGreedyScheduleGenerator(
*/
private def createRegions(physicalPlan: PhysicalPlan): Set[Region] = {
val dependeeLinksRemovedDAG = physicalPlan.getDependeeLinksRemovedDAG
val connectedComponents = new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink](
dependeeLinksRemovedDAG.dag
).getConnectedComponents.asScala.toSet
val connectedComponents = dependeeLinksRemovedDAG.dag.componentTraverser().toSet
connectedComponents.zipWithIndex.map {
case (connectedSubDAG, idx) =>
val operatorIds = connectedSubDAG.vertexSet().asScala.toSet
val operatorIds = connectedSubDAG.nodes.map(_.outer)
val links = operatorIds
.flatMap(operatorId => {
physicalPlan.getUpstreamPhysicalLinks(operatorId) ++ physicalPlan
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package edu.uci.ics.texera.web.service

import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity
import edu.uci.ics.amber.core.workflow.PhysicalPlan
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{
ModifyLogicRequest,
PropagateChannelMarkerRequest
}
import edu.uci.ics.amber.engine.architecture.scheduling.{Region, WorkflowExecutionCoordinator}
import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity
import org.jgrapht.alg.connectivity.ConnectivityInspector

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -84,29 +83,29 @@ object FriesReconfigurationAlgorithm {
// for each component, send an epoch marker to each of its source operators
val epochMarkers = new ArrayBuffer[PropagateChannelMarkerRequest]()

val connectedSets = new ConnectivityInspector(mcsPlan.dag).connectedSets()
connectedSets.forEach(component => {
val componentSet = component.asScala.toSet
val componentPlan = mcsPlan.getSubPlan(componentSet)

// generate the reconfiguration command for this component
// val reconfigCommands =
// reconfiguration.updateRequest
// .filter(req => component.contains(req.targetOpId))
// val reconfigTargets = reconfigCommands.map(_.targetOpId)
//
// // find the source operators of the component
// val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds)
// epochMarkers += PropagateChannelMarkerRequest(
// sources.toSeq,
// ChannelMarkerIdentity(epochMarkerId),
// REQUIRE_ALIGNMENT,
// componentPlan.operators.map(_.id).toSeq,
// reconfigTargets,
// ModifyLogicRequest(reconfigCommands),
// METHOD_MODIFY_LOGIC.getBareMethodName
// )
})
mcsPlan.dag
.componentTraverser()
.foreach(component => {
val componentPlan = mcsPlan.getSubPlan(component.nodes.map(_.outer))

// generate the reconfiguration command for this component
// val reconfigCommands =
// reconfiguration.updateRequest
// .filter(req => component.contains(req.targetOpId))
// val reconfigTargets = reconfigCommands.map(_.targetOpId)
//
// // find the source operators of the component
// val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds)
// epochMarkers += PropagateChannelMarkerRequest(
// sources.toSeq,
// ChannelMarkerIdentity(epochMarkerId),
// REQUIRE_ALIGNMENT,
// componentPlan.operators.map(_.id).toSeq,
// reconfigTargets,
// ModifyLogicRequest(reconfigCommands),
// METHOD_MODIFY_LOGIC.getBareMethodName
// )
})
epochMarkers.toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@ import edu.uci.ics.amber.compiler.WorkflowCompiler.{
collectInputSchemaFromPhysicalPlan,
convertErrorListToWorkflowFatalErrorMap
}

import edu.uci.ics.amber.compiler.model.{LogicalPlan, LogicalPlanPojo}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.core.workflow.PhysicalLink
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.COMPILATION_ERROR
import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError

import java.time.Instant
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.util.{Failure, Success, Try}

object WorkflowCompiler {
Expand Down Expand Up @@ -107,7 +104,7 @@ class WorkflowCompiler(
): PhysicalPlan = {
var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)

logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId =>
logicalPlan.getTopologicalOpIds.foreach(logicalOpId =>
Try {
val logicalOp = logicalPlan.getOperator(logicalOpId)
val allUpstreamLinks = logicalPlan
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package edu.uci.ics.amber.compiler.model

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.compiler.model.LogicalPlan.LogicalEdge
import edu.uci.ics.amber.core.storage.FileResolver
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc
import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.core.workflow.PortIdentity
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.util.SupplierUtil
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc
import scalax.collection.OneOrMore
import scalax.collection.generic.{AbstractDiEdge, MultiEdge}
import scalax.collection.mutable.Graph

import java.util
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}

object LogicalPlan {

private def toJgraphtDAG(
case class LogicalEdge(logicalLink: LogicalLink)
extends AbstractDiEdge(logicalLink.fromOpId, logicalLink.toOpId)
with MultiEdge {
override def extendKeyBy: OneOrMore[Any] =
OneOrMore.one((logicalLink.fromPortId, logicalLink.toPortId))
}

private def toScalaDAG(
operatorList: List[LogicalOp],
links: List[LogicalLink]
): DirectedAcyclicGraph[OperatorIdentity, LogicalLink] = {
Copy link
Collaborator

@Xiao-zhen-Liu Xiao-zhen-Liu Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Graph class ensure the plan is a DAG? For jgrapht if a plan is not a DAG an error will be thrown. If Graph does not ensure the acyclicity part of a DAG, do we implement that ourselves? The acyclicity is crucial for scheduling (RegionDAG), which is currently still implemented as a jgrapht DAG.

val workflowDag =
new DirectedAcyclicGraph[OperatorIdentity, LogicalLink](
null, // vertexSupplier
SupplierUtil.createSupplier(classOf[LogicalLink]), // edgeSupplier
false, // weighted
true // allowMultipleEdges
)
operatorList.foreach(op => workflowDag.addVertex(op.operatorIdentifier))
links.foreach(l =>
workflowDag.addEdge(
l.fromOpId,
l.toOpId,
l
)
)
): Graph[OperatorIdentity, LogicalEdge] = {
val workflowDag = Graph.empty[OperatorIdentity, LogicalEdge]()
operatorList.foreach(op => workflowDag.add(op.operatorIdentifier))
links.foreach(l => workflowDag.add(LogicalEdge(l)))
workflowDag
}

Expand All @@ -53,10 +49,15 @@ case class LogicalPlan(
private lazy val operatorMap: Map[OperatorIdentity, LogicalOp] =
operators.map(op => (op.operatorIdentifier, op)).toMap

private lazy val jgraphtDag: DirectedAcyclicGraph[OperatorIdentity, LogicalLink] =
LogicalPlan.toJgraphtDAG(operators, links)
private lazy val scalaDAG: Graph[OperatorIdentity, LogicalEdge] =
LogicalPlan.toScalaDAG(operators, links)

def getTopologicalOpIds: util.Iterator[OperatorIdentity] = jgraphtDag.iterator()
def getTopologicalOpIds: Iterator[OperatorIdentity] = {
scalaDAG.topologicalSort match {
case Left(value) => throw new RuntimeException("topological sort failed.")
case Right(value) => value.iterator.map(_.outer)
}
}

def getOperator(opId: String): LogicalOp = operatorMap(OperatorIdentity(opId))

Expand Down
2 changes: 1 addition & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ libraryDependencies ++= Seq(
libraryDependencies ++= Seq(
"com.github.sisyphsu" % "dateparser" % "1.0.11", // DateParser
"com.google.guava" % "guava" % "31.1-jre", // Guava
"org.scala-graph" %% "graph-core" % "2.0.2", // scala-graph
"org.ehcache" % "sizeof" % "0.4.3", // Ehcache SizeOf
"org.jgrapht" % "jgrapht-core" % "1.4.0", // JGraphT Core
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package edu.uci.ics.amber.core.workflow

import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties}
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.executor.{OpExecWithCode, OpExecInitInfo}
import edu.uci.ics.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
PhysicalOpIdentity,
WorkflowIdentity
}
import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph}
import org.jgrapht.traverse.TopologicalOrderIterator
import scalax.collection.edges.{DiEdge, DiEdgeImplicits}
import scalax.collection.mutable.Graph

import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -484,30 +484,28 @@ case class PhysicalOp(
*/
@JsonIgnore
def getInputLinksInProcessingOrder: List[PhysicalLink] = {
val dependencyDag = {
new DirectedAcyclicGraph[PhysicalLink, DefaultEdge](classOf[DefaultEdge])
}
val dependencyDag: Graph[PhysicalLink, DiEdge[PhysicalLink]] = Graph()

inputPorts.values
.map(_._1)
.flatMap(port => port.dependencies.map(dependee => port.id -> dependee))
.foreach({
.foreach {
case (depender: PortIdentity, dependee: PortIdentity) =>
val upstreamLink = getInputLinks(Some(dependee)).head
val downstreamLink = getInputLinks(Some(depender)).head
if (!dependencyDag.containsVertex(upstreamLink)) {
dependencyDag.addVertex(upstreamLink)
}
if (!dependencyDag.containsVertex(downstreamLink)) {
dependencyDag.addVertex(downstreamLink)
}
dependencyDag.addEdge(upstreamLink, downstreamLink)
})
val topologicalIterator =
new TopologicalOrderIterator[PhysicalLink, DefaultEdge](dependencyDag)
val processingOrder = new ArrayBuffer[PhysicalLink]()
while (topologicalIterator.hasNext) {
processingOrder.append(topologicalIterator.next())

dependencyDag.add(upstreamLink)
dependencyDag.add(downstreamLink)
dependencyDag.add(upstreamLink ~> downstreamLink)
}

val sortedNodes = new ArrayBuffer[PhysicalLink]()
dependencyDag.topologicalSort match {
case Right(order) => sortedNodes.appendAll(order.toOuter)
case Left(_) => throw new IllegalStateException("Cycle detected in dependency graph")
}
processingOrder.toList

sortedNodes.toList
}

}
Loading
Loading