From 792e147a08a3bdfcefe70a5c2ef203c9218c33f9 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Sun, 2 Mar 2025 19:20:57 -0800 Subject: [PATCH 1/8] update --- .../CostBasedScheduleGenerator.scala | 12 +- .../ExpansionGreedyScheduleGenerator.scala | 10 +- .../FriesReconfigurationAlgorithm.scala | 49 +++--- .../ics/amber/compiler/WorkflowCompiler.scala | 7 +- .../amber/compiler/model/LogicalLink.scala | 7 +- .../amber/compiler/model/LogicalPlan.scala | 41 ++--- core/workflow-core/build.sbt | 2 +- .../ics/amber/core/workflow/PhysicalOp.scala | 40 +++-- .../amber/core/workflow/PhysicalPlan.scala | 149 +++++++++++------- 9 files changed, 169 insertions(+), 148 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index eb848c1d8f..f7c56c6168 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -1,10 +1,8 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} -import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging} import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.core.workflow.PhysicalLink -import org.jgrapht.alg.connectivity.BiconnectivityInspector +import edu.uci.ics.amber.core.workflow.{PhysicalLink, PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging} import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph} import java.util.concurrent.TimeoutException @@ -70,12 +68,10 @@ class CostBasedScheduleGenerator( val matEdgesRemovedDAG = matEdges.foldLeft(physicalPlan) { (currentPlan, linkToRemove) => currentPlan.removeLink(linkToRemove) } - val connectedComponents = new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink]( - matEdgesRemovedDAG.dag - ).getConnectedComponents.asScala.toSet + val connectedComponents = matEdgesRemovedDAG.dag.componentTraverser().toSet connectedComponents.zipWithIndex.map { case (connectedSubDAG, idx) => - val operatorIds = connectedSubDAG.vertexSet().asScala.toSet + val operatorIds = connectedSubDAG.nodes.map(_.outer) val links = operatorIds .flatMap(operatorId => { physicalPlan.getUpstreamPhysicalLinks(operatorId) ++ physicalPlan diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala index 2457d09b4c..7cb8794a14 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala @@ -2,10 +2,8 @@ package edu.uci.ics.amber.engine.architecture.scheduling import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.WorkflowRuntimeException -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import edu.uci.ics.amber.core.workflow.PhysicalLink -import org.jgrapht.alg.connectivity.BiconnectivityInspector +import edu.uci.ics.amber.core.workflow.{PhysicalLink, PhysicalPlan, WorkflowContext} import org.jgrapht.graph.DirectedAcyclicGraph import scala.annotation.tailrec @@ -63,12 +61,10 @@ class ExpansionGreedyScheduleGenerator( */ private def createRegions(physicalPlan: PhysicalPlan): Set[Region] = { val dependeeLinksRemovedDAG = physicalPlan.getDependeeLinksRemovedDAG - val connectedComponents = new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink]( - dependeeLinksRemovedDAG.dag - ).getConnectedComponents.asScala.toSet + val connectedComponents = dependeeLinksRemovedDAG.dag.componentTraverser().toSet connectedComponents.zipWithIndex.map { case (connectedSubDAG, idx) => - val operatorIds = connectedSubDAG.vertexSet().asScala.toSet + val operatorIds = connectedSubDAG.nodes.map(_.outer) val links = operatorIds .flatMap(operatorId => { physicalPlan.getUpstreamPhysicalLinks(operatorId) ++ physicalPlan diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/FriesReconfigurationAlgorithm.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/FriesReconfigurationAlgorithm.scala index 85113b3a5a..05f2dd99a9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/FriesReconfigurationAlgorithm.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/FriesReconfigurationAlgorithm.scala @@ -1,13 +1,12 @@ package edu.uci.ics.texera.web.service +import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity import edu.uci.ics.amber.core.workflow.PhysicalPlan import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ ModifyLogicRequest, PropagateChannelMarkerRequest } import edu.uci.ics.amber.engine.architecture.scheduling.{Region, WorkflowExecutionCoordinator} -import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import org.jgrapht.alg.connectivity.ConnectivityInspector import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -84,29 +83,29 @@ object FriesReconfigurationAlgorithm { // for each component, send an epoch marker to each of its source operators val epochMarkers = new ArrayBuffer[PropagateChannelMarkerRequest]() - val connectedSets = new ConnectivityInspector(mcsPlan.dag).connectedSets() - connectedSets.forEach(component => { - val componentSet = component.asScala.toSet - val componentPlan = mcsPlan.getSubPlan(componentSet) - - // generate the reconfiguration command for this component - // val reconfigCommands = - // reconfiguration.updateRequest - // .filter(req => component.contains(req.targetOpId)) - // val reconfigTargets = reconfigCommands.map(_.targetOpId) - // - // // find the source operators of the component - // val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds) - // epochMarkers += PropagateChannelMarkerRequest( - // sources.toSeq, - // ChannelMarkerIdentity(epochMarkerId), - // REQUIRE_ALIGNMENT, - // componentPlan.operators.map(_.id).toSeq, - // reconfigTargets, - // ModifyLogicRequest(reconfigCommands), - // METHOD_MODIFY_LOGIC.getBareMethodName - // ) - }) + mcsPlan.dag + .componentTraverser() + .foreach(component => { + val componentPlan = mcsPlan.getSubPlan(component.nodes.map(_.outer)) + + // generate the reconfiguration command for this component + // val reconfigCommands = + // reconfiguration.updateRequest + // .filter(req => component.contains(req.targetOpId)) + // val reconfigTargets = reconfigCommands.map(_.targetOpId) + // + // // find the source operators of the component + // val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds) + // epochMarkers += PropagateChannelMarkerRequest( + // sources.toSeq, + // ChannelMarkerIdentity(epochMarkerId), + // REQUIRE_ALIGNMENT, + // componentPlan.operators.map(_.id).toSeq, + // reconfigTargets, + // ModifyLogicRequest(reconfigCommands), + // METHOD_MODIFY_LOGIC.getBareMethodName + // ) + }) epochMarkers.toList } diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala index c199dfaf86..ce2e681eb5 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala @@ -6,19 +6,16 @@ import edu.uci.ics.amber.compiler.WorkflowCompiler.{ collectInputSchemaFromPhysicalPlan, convertErrorListToWorkflowFatalErrorMap } - import edu.uci.ics.amber.compiler.model.{LogicalPlan, LogicalPlanPojo} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.core.workflow.PhysicalLink +import edu.uci.ics.amber.core.workflow.{PhysicalLink, PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.COMPILATION_ERROR import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError import java.time.Instant import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.util.{Failure, Success, Try} object WorkflowCompiler { @@ -107,7 +104,7 @@ class WorkflowCompiler( ): PhysicalPlan = { var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty) - logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => + logicalPlan.getTopologicalOpIds.foreach(logicalOpId => Try { val logicalOp = logicalPlan.getOperator(logicalOpId) val allUpstreamLinks = logicalPlan diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala index bf9217128b..ab9e79d89f 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala @@ -3,13 +3,16 @@ package edu.uci.ics.amber.compiler.model import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity import edu.uci.ics.amber.core.workflow.PortIdentity +import scalax.collection.OneOrMore +import scalax.collection.generic.{AbstractDiEdge, MultiEdge} case class LogicalLink( @JsonProperty("fromOpId") fromOpId: OperatorIdentity, fromPortId: PortIdentity, @JsonProperty("toOpId") toOpId: OperatorIdentity, toPortId: PortIdentity -) { +) extends AbstractDiEdge[OperatorIdentity](fromOpId, toOpId) + with MultiEdge { @JsonCreator def this( @JsonProperty("fromOpId") fromOpId: String, @@ -19,4 +22,6 @@ case class LogicalLink( ) = { this(OperatorIdentity(fromOpId), fromPortId, OperatorIdentity(toOpId), toPortId) } + + override def extendKeyBy: OneOrMore[Any] = OneOrMore.one((fromPortId, toPortId)) } diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala index db700e8a3d..c1d0c1b7e0 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala @@ -2,38 +2,24 @@ package edu.uci.ics.amber.compiler.model import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.FileResolver -import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity import edu.uci.ics.amber.core.workflow.PortIdentity -import org.jgrapht.graph.DirectedAcyclicGraph -import org.jgrapht.util.SupplierUtil +import edu.uci.ics.amber.operator.LogicalOp +import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc +import scalax.collection.mutable.Graph -import java.util import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} object LogicalPlan { - private def toJgraphtDAG( + private def toScalaDAG( operatorList: List[LogicalOp], links: List[LogicalLink] - ): DirectedAcyclicGraph[OperatorIdentity, LogicalLink] = { - val workflowDag = - new DirectedAcyclicGraph[OperatorIdentity, LogicalLink]( - null, // vertexSupplier - SupplierUtil.createSupplier(classOf[LogicalLink]), // edgeSupplier - false, // weighted - true // allowMultipleEdges - ) - operatorList.foreach(op => workflowDag.addVertex(op.operatorIdentifier)) - links.foreach(l => - workflowDag.addEdge( - l.fromOpId, - l.toOpId, - l - ) - ) + ): Graph[OperatorIdentity, LogicalLink] = { + val workflowDag = Graph.empty[OperatorIdentity, LogicalLink]() + operatorList.foreach(op => workflowDag.add(op.operatorIdentifier)) + links.foreach(l => workflowDag.add(l)) workflowDag } @@ -53,10 +39,15 @@ case class LogicalPlan( private lazy val operatorMap: Map[OperatorIdentity, LogicalOp] = operators.map(op => (op.operatorIdentifier, op)).toMap - private lazy val jgraphtDag: DirectedAcyclicGraph[OperatorIdentity, LogicalLink] = - LogicalPlan.toJgraphtDAG(operators, links) + private lazy val scalaDAG: Graph[OperatorIdentity, LogicalLink] = + LogicalPlan.toScalaDAG(operators, links) - def getTopologicalOpIds: util.Iterator[OperatorIdentity] = jgraphtDag.iterator() + def getTopologicalOpIds: Iterator[OperatorIdentity] = { + scalaDAG.topologicalSort match { + case Left(value) => throw new RuntimeException("topological sort failed.") + case Right(value) => value.iterator.map(_.outer) + } + } def getOperator(opId: String): LogicalOp = operatorMap(OperatorIdentity(opId)) diff --git a/core/workflow-core/build.sbt b/core/workflow-core/build.sbt index e3a2ebb7ee..3bb3d7a3be 100644 --- a/core/workflow-core/build.sbt +++ b/core/workflow-core/build.sbt @@ -167,8 +167,8 @@ libraryDependencies ++= Seq( libraryDependencies ++= Seq( "com.github.sisyphsu" % "dateparser" % "1.0.11", // DateParser "com.google.guava" % "guava" % "31.1-jre", // Guava + "org.scala-graph" %% "graph-core" % "2.0.2", // scala-graph "org.ehcache" % "sizeof" % "0.4.3", // Ehcache SizeOf - "org.jgrapht" % "jgrapht-core" % "1.4.0", // JGraphT Core "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging "org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit "org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala index 00f244d6e1..ff83389546 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala @@ -2,7 +2,7 @@ package edu.uci.ics.amber.core.workflow import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties} import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.executor.{OpExecWithCode, OpExecInitInfo} +import edu.uci.ics.amber.core.executor.{OpExecInitInfo, OpExecWithCode} import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, @@ -10,8 +10,8 @@ import edu.uci.ics.amber.core.virtualidentity.{ PhysicalOpIdentity, WorkflowIdentity } -import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} -import org.jgrapht.traverse.TopologicalOrderIterator +import scalax.collection.edges.{DiEdge, DiEdgeImplicits} +import scalax.collection.mutable.Graph import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} @@ -484,30 +484,28 @@ case class PhysicalOp( */ @JsonIgnore def getInputLinksInProcessingOrder: List[PhysicalLink] = { - val dependencyDag = { - new DirectedAcyclicGraph[PhysicalLink, DefaultEdge](classOf[DefaultEdge]) - } + val dependencyDag: Graph[PhysicalLink, DiEdge[PhysicalLink]] = Graph() + inputPorts.values .map(_._1) .flatMap(port => port.dependencies.map(dependee => port.id -> dependee)) - .foreach({ + .foreach { case (depender: PortIdentity, dependee: PortIdentity) => val upstreamLink = getInputLinks(Some(dependee)).head val downstreamLink = getInputLinks(Some(depender)).head - if (!dependencyDag.containsVertex(upstreamLink)) { - dependencyDag.addVertex(upstreamLink) - } - if (!dependencyDag.containsVertex(downstreamLink)) { - dependencyDag.addVertex(downstreamLink) - } - dependencyDag.addEdge(upstreamLink, downstreamLink) - }) - val topologicalIterator = - new TopologicalOrderIterator[PhysicalLink, DefaultEdge](dependencyDag) - val processingOrder = new ArrayBuffer[PhysicalLink]() - while (topologicalIterator.hasNext) { - processingOrder.append(topologicalIterator.next()) + + dependencyDag.add(upstreamLink) + dependencyDag.add(downstreamLink) + dependencyDag.add(upstreamLink ~> downstreamLink) + } + + val sortedNodes = new ArrayBuffer[PhysicalLink]() + dependencyDag.topologicalSort match { + case Right(order) => sortedNodes.appendAll(order.toOuter) + case Left(_) => throw new IllegalStateException("Cycle detected in dependency graph") } - processingOrder.toList + + sortedNodes.toList } + } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 1c3d06519c..1f88944a88 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -9,32 +9,38 @@ import edu.uci.ics.amber.core.virtualidentity.{ PhysicalOpIdentity } import edu.uci.ics.amber.util.VirtualIdentityUtils -import org.jgrapht.alg.connectivity.BiconnectivityInspector -import org.jgrapht.alg.shortestpath.AllDirectedPaths -import org.jgrapht.graph.DirectedAcyclicGraph -import org.jgrapht.traverse.TopologicalOrderIterator -import org.jgrapht.util.SupplierUtil +import scalax.collection.OneOrMore +import scalax.collection.generic.{AbstractDiEdge, MultiEdge} +import scalax.collection.mutable.Graph -import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala} +import scala.collection.mutable +import scala.language.implicitConversions + +case class PhysicalEdge(physicalLink: PhysicalLink) + extends AbstractDiEdge(physicalLink.fromOpId, physicalLink.toOpId) + with MultiEdge { + + override def extendKeyBy: OneOrMore[Any] = + OneOrMore.one((physicalLink.fromPortId, physicalLink.toPortId)) +} case class PhysicalPlan( operators: Set[PhysicalOp], links: Set[PhysicalLink] ) extends LazyLogging { + implicit def physicalLinkToEdge(physicalLink: PhysicalLink): PhysicalEdge = { + PhysicalEdge(physicalLink) + } + @transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] = operators.map(o => (o.id, o)).toMap // the dag will be re-computed again once it reaches the coordinator. - @transient lazy val dag: DirectedAcyclicGraph[PhysicalOpIdentity, PhysicalLink] = { - val jgraphtDag = new DirectedAcyclicGraph[PhysicalOpIdentity, PhysicalLink]( - null, // vertexSupplier - SupplierUtil.createSupplier(classOf[PhysicalLink]), // edgeSupplier - false, // weighted - true // allowMultipleEdges - ) - operatorMap.foreach(op => jgraphtDag.addVertex(op._1)) - links.foreach(l => jgraphtDag.addEdge(l.fromOpId, l.toOpId, l)) + @transient lazy val dag: Graph[PhysicalOpIdentity, PhysicalEdge] = { + val jgraphtDag = Graph.empty[PhysicalOpIdentity, PhysicalEdge]() + operatorMap.foreach(op => jgraphtDag.add(op._1)) + links.foreach(l => jgraphtDag.add(l)) jgraphtDag } @@ -42,7 +48,7 @@ case class PhysicalPlan( @JsonIgnore def getSourceOperatorIds: Set[PhysicalOpIdentity] = - operatorMap.keys.filter(op => dag.inDegreeOf(op) == 0).toSet + operatorMap.keys.filter(op => dag.get(op).inDegree == 0).toSet def getPhysicalOpsOfLogicalOp(logicalOpId: OperatorIdentity): List[PhysicalOp] = { topologicalIterator() @@ -66,7 +72,7 @@ case class PhysicalPlan( } def getUpstreamPhysicalOpIds(physicalOpId: PhysicalOpIdentity): Set[PhysicalOpIdentity] = { - dag.incomingEdgesOf(physicalOpId).asScala.map(e => dag.getEdgeSource(e)).toSet + dag.get(physicalOpId).incoming.map(e => e.physicalLink.fromOpId) } def getUpstreamPhysicalLinks(physicalOpId: PhysicalOpIdentity): Set[PhysicalLink] = { @@ -78,7 +84,10 @@ case class PhysicalPlan( } def topologicalIterator(): Iterator[PhysicalOpIdentity] = { - new TopologicalOrderIterator(dag).asScala + dag.topologicalSort match { + case Left(value) => throw new RuntimeException("topological sort failed") + case Right(value) => value.iterator.map(_.outer) + } } def addOperator(physicalOp: PhysicalOp): PhysicalPlan = { this.copy(operators = Set(physicalOp) ++ operators) @@ -225,11 +234,13 @@ case class PhysicalPlan( @JsonIgnore def getNonBridgeNonBlockingLinks: Set[PhysicalLink] = { val bridges = - new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink](this.dag).getBridges.asScala + this.dag + .componentTraverser() + .flatMap(_.frontierEdges) .map { edge => { - val fromOpId = this.dag.getEdgeSource(edge) - val toOpId = this.dag.getEdgeTarget(edge) + val fromOpId = edge.physicalLink.fromOpId + val toOpId = edge.physicalLink.toOpId links.find(l => l.fromOpId == fromOpId && l.toOpId == toOpId) } } @@ -247,42 +258,70 @@ case class PhysicalPlan( * * @return All the maximal chains of this physical plan, where each chain is represented as a set of links. */ - private def getMaxChains: Set[Set[PhysicalLink]] = { - val dijkstra = new AllDirectedPaths[PhysicalOpIdentity, PhysicalLink](this.dag) - val chains = this.dag - .vertexSet() - .asScala - .flatMap { ancestor => - { - this.dag.getDescendants(ancestor).asScala.flatMap { descendant => - { - dijkstra - .getAllPaths(ancestor, descendant, true, Integer.MAX_VALUE) - .asScala - .filter(path => - path.getLength > 1 && - path.getVertexList.asScala - .filter(v => v != path.getStartVertex && v != path.getEndVertex) - .forall(v => this.dag.inDegreeOf(v) == 1 && this.dag.outDegreeOf(v) == 1) - ) - .map(path => - path.getEdgeList.asScala - .map { edge => - { - val fromOpId = this.dag.getEdgeSource(edge) - val toOpId = this.dag.getEdgeTarget(edge) - links.find(l => l.fromOpId == fromOpId && l.toOpId == toOpId) - } - } - .flatMap(_.toList) - .toSet - ) - .toSet - } - } + def getMaxChains: Set[Set[PhysicalLink]] = { + val allChains = mutable.Set[Set[PhysicalLink]]() + + /** + * Recursively expands chains from the current node, enumerating all possible paths. + * @param current The current operator identity from which to expand. + * @param path The accumulated list of operator IDs we have visited so far. + */ + def expandChain(current: PhysicalOpIdentity, path: List[PhysicalOpIdentity]): Unit = { + // If current has no successors, see if we've formed a valid chain + val successors = this.dag.get(current).outNeighbors.map(_.outer) + if (successors.isEmpty) { + // If path length > 1 and intermediate nodes have inDegree/outDegree == 1, record it + if (path.length > 1 && validIntermediateNodes(path)) { + allChains += pathToLinks(path) } + return } - chains.filter(s1 => chains.forall(s2 => s1 == s2 || !s1.subsetOf(s2))).toSet + + // Otherwise, expand to each successor + successors.foreach { next => + // Avoid cycles by checking if we've already visited 'next' + if (!path.contains(next)) { + expandChain(next, path :+ next) + } + } + } + + /** + * Checks that all intermediate nodes in the path + * (except the first and last) have inDegree == 1 and outDegree == 1. + */ + def validIntermediateNodes(path: List[PhysicalOpIdentity]): Boolean = { + // drop first and last, then check degree + path.drop(1).dropRight(1).forall { mid => + val midNode = this.dag.get(mid) + midNode.inDegree == 1 && midNode.outDegree == 1 + } + } + + /** + * Converts a path of operator IDs into a set of PhysicalLink + * by looking up each consecutive pair in `links`. + */ + def pathToLinks(path: List[PhysicalOpIdentity]): Set[PhysicalLink] = { + path + .sliding(2) + .flatMap { + case Seq(from, to) => + links.find(l => l.fromOpId == from && l.toOpId == to) + case _ => None + } + .toSet + } + + // Enumerate all possible starting nodes (potential ancestors) + for (node <- this.dag.nodes.map(_.outer)) { + expandChain(node, List(node)) + } + + // Now we have a bunch of chain candidates in `allChains`. + // We remove those that are strictly sub-chains of others. + val chainCandidates = allChains.toSet + chainCandidates.filter(s1 => chainCandidates.forall(s2 => s1 == s2 || !s1.subsetOf(s2))) } def propagateSchema(inputSchemas: Map[PortIdentity, Schema]): PhysicalPlan = { From de68106571278856ffb98844d2fc19367e9cf9ea Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Sun, 2 Mar 2025 22:12:50 -0800 Subject: [PATCH 2/8] Update PhysicalPlan.scala --- .../amber/core/workflow/PhysicalPlan.scala | 69 +++++++++++++------ 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 1f88944a88..59bfa19370 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -224,6 +224,49 @@ case class PhysicalPlan( this.copy(operators, links.diff(getDependeeLinks)) } + + /** + * Computes the bridges (cut-edges) in the given directed graph using Tarjan's Algorithm. + * A bridge is an edge whose removal increases the number of connected components. + * This method runs in O(V + E) time complexity. + * + * @return A set of PhysicalLinks representing the bridges in the graph. + */ + private def findBridges: Set[PhysicalLink] = { + var time = 0 + val discoveryTime = mutable.Map[PhysicalOpIdentity, Int]() + val lowLink = mutable.Map[PhysicalOpIdentity, Int]() + val parent = mutable.Map[PhysicalOpIdentity, PhysicalOpIdentity]() + val bridges = mutable.Set[PhysicalLink]() + + def dfs(node: PhysicalOpIdentity): Unit = { + discoveryTime(node) = time + lowLink(node) = time + time += 1 + + for (neighbor <- this.dag.get(node).diSuccessors.map(_.outer)) { + if (!discoveryTime.contains(neighbor)) { // If neighbor is not visited + parent(neighbor) = node + dfs(neighbor) + lowLink(node) = Math.min(lowLink(node), lowLink(neighbor)) + + if (lowLink(neighbor) > discoveryTime(node)) { + links.find(l => l.fromOpId == node && l.toOpId == neighbor).foreach(bridges.add) + } + } else if (!parent.get(node).contains(neighbor)) { + lowLink(node) = Math.min(lowLink(node), discoveryTime(neighbor)) + } + } + } + + for (node <- this.dag.nodes.map(_.outer) if !discoveryTime.contains(node)) { + dfs(node) + } + + bridges.toSet + } + + /** * A link is a bridge if removal of that link would increase the number of (weakly) connected components in the DAG. * Assuming pipelining a link is more desirable than materializing it, and optimal physical plan always pipelines @@ -233,19 +276,7 @@ case class PhysicalPlan( */ @JsonIgnore def getNonBridgeNonBlockingLinks: Set[PhysicalLink] = { - val bridges = - this.dag - .componentTraverser() - .flatMap(_.frontierEdges) - .map { edge => - { - val fromOpId = edge.physicalLink.fromOpId - val toOpId = edge.physicalLink.toOpId - links.find(l => l.fromOpId == fromOpId && l.toOpId == toOpId) - } - } - .flatMap(_.toList) - this.links.diff(getNonMaterializedBlockingAndDependeeLinks).diff(bridges.toSet) + this.links.diff(getNonMaterializedBlockingAndDependeeLinks).diff(findBridges) } /** @@ -267,15 +298,13 @@ case class PhysicalPlan( * @param path The accumulated list of operator IDs we have visited so far. */ def expandChain(current: PhysicalOpIdentity, path: List[PhysicalOpIdentity]): Unit = { + + if (path.length > 2 && validIntermediateNodes(path)) { + allChains += pathToLinks(path) + } + // If current has no successors, see if we've formed a valid chain val successors = this.dag.get(current).outNeighbors.map(_.outer) - if (successors.isEmpty) { - // If path length > 1 and intermediate nodes have inDegree/outDegree == 1, record it - if (path.length > 1 && validIntermediateNodes(path)) { - allChains += pathToLinks(path) - } - return - } // Otherwise, expand to each successor successors.foreach { next => From cec5b06fc4956b3567ea791aa91cf84e73562878 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Mon, 3 Mar 2025 00:24:46 -0800 Subject: [PATCH 3/8] Update PhysicalPlan.scala --- .../amber/core/workflow/PhysicalPlan.scala | 54 +++++++------------ 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 59bfa19370..6559d6ec05 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -224,49 +224,31 @@ case class PhysicalPlan( this.copy(operators, links.diff(getDependeeLinks)) } - /** - * Computes the bridges (cut-edges) in the given directed graph using Tarjan's Algorithm. - * A bridge is an edge whose removal increases the number of connected components. - * This method runs in O(V + E) time complexity. - * - * @return A set of PhysicalLinks representing the bridges in the graph. - */ + * Computes the bridges (cut-edges) in the given directed graph using Tarjan's Algorithm. + * A bridge is an edge whose removal increases the number of connected components. + * This method runs in O(V + E) time complexity. + * + * @return A set of PhysicalLinks representing the bridges in the graph. + */ private def findBridges: Set[PhysicalLink] = { - var time = 0 - val discoveryTime = mutable.Map[PhysicalOpIdentity, Int]() - val lowLink = mutable.Map[PhysicalOpIdentity, Int]() - val parent = mutable.Map[PhysicalOpIdentity, PhysicalOpIdentity]() - val bridges = mutable.Set[PhysicalLink]() - - def dfs(node: PhysicalOpIdentity): Unit = { - discoveryTime(node) = time - lowLink(node) = time - time += 1 - - for (neighbor <- this.dag.get(node).diSuccessors.map(_.outer)) { - if (!discoveryTime.contains(neighbor)) { // If neighbor is not visited - parent(neighbor) = node - dfs(neighbor) - lowLink(node) = Math.min(lowLink(node), lowLink(neighbor)) - - if (lowLink(neighbor) > discoveryTime(node)) { - links.find(l => l.fromOpId == node && l.toOpId == neighbor).foreach(bridges.add) - } - } else if (!parent.get(node).contains(neighbor)) { - lowLink(node) = Math.min(lowLink(node), discoveryTime(neighbor)) - } - } - } + val weakBridges = mutable.Set[PhysicalLink]() + val componentsBefore = this.dag.componentTraverser().size + + for (edge <- this.dag.edges) { + val tempGraph = this.dag.clone() + tempGraph -= edge + + val componentsAfter = tempGraph.componentTraverser().size - for (node <- this.dag.nodes.map(_.outer) if !discoveryTime.contains(node)) { - dfs(node) + if (componentsAfter > componentsBefore) { + weakBridges.add(edge.physicalLink) + } } - bridges.toSet + weakBridges.toSet } - /** * A link is a bridge if removal of that link would increase the number of (weakly) connected components in the DAG. * Assuming pipelining a link is more desirable than materializing it, and optimal physical plan always pipelines From 8405cb50bb70f6935ea74ca3d163561cdd235e72 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Mon, 3 Mar 2025 00:51:42 -0800 Subject: [PATCH 4/8] update --- .../ics/amber/compiler/model/LogicalLink.scala | 7 +------ .../ics/amber/compiler/model/LogicalPlan.scala | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala index ab9e79d89f..bf9217128b 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalLink.scala @@ -3,16 +3,13 @@ package edu.uci.ics.amber.compiler.model import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity import edu.uci.ics.amber.core.workflow.PortIdentity -import scalax.collection.OneOrMore -import scalax.collection.generic.{AbstractDiEdge, MultiEdge} case class LogicalLink( @JsonProperty("fromOpId") fromOpId: OperatorIdentity, fromPortId: PortIdentity, @JsonProperty("toOpId") toOpId: OperatorIdentity, toPortId: PortIdentity -) extends AbstractDiEdge[OperatorIdentity](fromOpId, toOpId) - with MultiEdge { +) { @JsonCreator def this( @JsonProperty("fromOpId") fromOpId: String, @@ -22,6 +19,4 @@ case class LogicalLink( ) = { this(OperatorIdentity(fromOpId), fromPortId, OperatorIdentity(toOpId), toPortId) } - - override def extendKeyBy: OneOrMore[Any] = OneOrMore.one((fromPortId, toPortId)) } diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala index c1d0c1b7e0..0080ec9e11 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala @@ -1,11 +1,14 @@ package edu.uci.ics.amber.compiler.model import com.typesafe.scalalogging.LazyLogging +import edu.uci.ics.amber.compiler.model.LogicalPlan.LogicalEdge import edu.uci.ics.amber.core.storage.FileResolver import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity import edu.uci.ics.amber.core.workflow.PortIdentity import edu.uci.ics.amber.operator.LogicalOp import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc +import scalax.collection.OneOrMore +import scalax.collection.generic.{AbstractDiEdge, MultiEdge} import scalax.collection.mutable.Graph import scala.collection.mutable.ArrayBuffer @@ -13,13 +16,20 @@ import scala.util.{Failure, Success, Try} object LogicalPlan { + case class LogicalEdge(logicalLink: LogicalLink) + extends AbstractDiEdge(logicalLink.fromOpId, logicalLink.toOpId) + with MultiEdge { + override def extendKeyBy: OneOrMore[Any] = + OneOrMore.one((logicalLink.fromPortId, logicalLink.toPortId)) + } + private def toScalaDAG( operatorList: List[LogicalOp], links: List[LogicalLink] - ): Graph[OperatorIdentity, LogicalLink] = { - val workflowDag = Graph.empty[OperatorIdentity, LogicalLink]() + ): Graph[OperatorIdentity, LogicalEdge] = { + val workflowDag = Graph.empty[OperatorIdentity, LogicalEdge]() operatorList.foreach(op => workflowDag.add(op.operatorIdentifier)) - links.foreach(l => workflowDag.add(l)) + links.foreach(l => workflowDag.add(LogicalEdge(l))) workflowDag } @@ -39,7 +49,7 @@ case class LogicalPlan( private lazy val operatorMap: Map[OperatorIdentity, LogicalOp] = operators.map(op => (op.operatorIdentifier, op)).toMap - private lazy val scalaDAG: Graph[OperatorIdentity, LogicalLink] = + private lazy val scalaDAG: Graph[OperatorIdentity, LogicalEdge] = LogicalPlan.toScalaDAG(operators, links) def getTopologicalOpIds: Iterator[OperatorIdentity] = { From 27317210c2aebe46b0558dbe40e8f172d4864c51 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Mon, 3 Mar 2025 00:54:12 -0800 Subject: [PATCH 5/8] Update PhysicalPlan.scala --- .../edu/uci/ics/amber/core/workflow/PhysicalPlan.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 6559d6ec05..5a2f5fcda7 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -38,10 +38,10 @@ case class PhysicalPlan( // the dag will be re-computed again once it reaches the coordinator. @transient lazy val dag: Graph[PhysicalOpIdentity, PhysicalEdge] = { - val jgraphtDag = Graph.empty[PhysicalOpIdentity, PhysicalEdge]() - operatorMap.foreach(op => jgraphtDag.add(op._1)) - links.foreach(l => jgraphtDag.add(l)) - jgraphtDag + val scalaDAG = Graph.empty[PhysicalOpIdentity, PhysicalEdge]() + operatorMap.foreach(op => scalaDAG.add(op._1)) + links.foreach(l => scalaDAG.add(l)) + scalaDAG } @transient lazy val maxChains: Set[Set[PhysicalLink]] = this.getMaxChains From 657f5fddde6f9be9cb6f4ab2137b71ff43734b04 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Mon, 3 Mar 2025 01:05:28 -0800 Subject: [PATCH 6/8] Update PhysicalPlan.scala --- .../scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 5a2f5fcda7..e2ccd042c5 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -271,7 +271,7 @@ case class PhysicalPlan( * * @return All the maximal chains of this physical plan, where each chain is represented as a set of links. */ - def getMaxChains: Set[Set[PhysicalLink]] = { + private def getMaxChains: Set[Set[PhysicalLink]] = { val allChains = mutable.Set[Set[PhysicalLink]]() /** From 6738e67b31a931277d55a1bc7ec232e4c0502852 Mon Sep 17 00:00:00 2001 From: Shengquan Ni Date: Mon, 3 Mar 2025 13:21:41 -0800 Subject: [PATCH 7/8] Update PhysicalPlan.scala --- .../scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index e2ccd042c5..71f3ba6310 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -285,10 +285,8 @@ case class PhysicalPlan( allChains += pathToLinks(path) } - // If current has no successors, see if we've formed a valid chain val successors = this.dag.get(current).outNeighbors.map(_.outer) - - // Otherwise, expand to each successor + successors.foreach { next => // Avoid cycles by checking if we've already visited 'next' if (!path.contains(next)) { From 3612a4ddc067224c0908695c4f491fca2d127dfa Mon Sep 17 00:00:00 2001 From: Shengquan Ni Date: Mon, 3 Mar 2025 13:22:54 -0800 Subject: [PATCH 8/8] Update PhysicalPlan.scala --- .../scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 71f3ba6310..e679ec35e6 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -227,7 +227,6 @@ case class PhysicalPlan( /** * Computes the bridges (cut-edges) in the given directed graph using Tarjan's Algorithm. * A bridge is an edge whose removal increases the number of connected components. - * This method runs in O(V + E) time complexity. * * @return A set of PhysicalLinks representing the bridges in the graph. */ @@ -286,7 +285,7 @@ case class PhysicalPlan( } val successors = this.dag.get(current).outNeighbors.map(_.outer) - + successors.foreach { next => // Avoid cycles by checking if we've already visited 'next' if (!path.contains(next)) {