Skip to content

Commit

Permalink
Remove logical schema propagation (#3177)
Browse files Browse the repository at this point in the history
Schema propagation is now handled in the physical plan to ensure all
ports, both external and internal, have an associated schema. As a
result, schema propagation in the logical plan is no longer necessary.

In addition, workflow context was used to give user information so that
schema propagation can resolve file names. This is also no longer needed
as we are resolving files through an explicit call as a step of
compiling. WorkflowContext is no longer needed to be set to Logical
Operator.
  • Loading branch information
Yicong-Huang authored Dec 30, 2024
1 parent 5267fec commit d85ce7a
Show file tree
Hide file tree
Showing 13 changed files with 11 additions and 521 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class ExecutionReconfigurationService(
// they are not actually performed until the workflow is resumed
def modifyOperatorLogic(modifyLogicRequest: ModifyLogicRequest): TexeraWebSocketEvent = {
val newOp = modifyLogicRequest.operator
newOp.setContext(workflow.context)
val opId = newOp.operatorIdentifier
val currentOp = workflow.logicalPlan.getOperator(opId)
val reconfiguredPhysicalOp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ package edu.uci.ics.texera.workflow

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.FileResolver
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor
import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.workflow.PortIdentity
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.util.SupplierUtil

import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.SetHasAsScala
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -48,7 +43,6 @@ object LogicalPlan {
): LogicalPlan = {
LogicalPlan(pojo.operators, pojo.links)
}

}

case class LogicalPlan(
Expand All @@ -64,22 +58,13 @@ case class LogicalPlan(

def getTopologicalOpIds: util.Iterator[OperatorIdentity] = jgraphtDag.iterator()

def getOperator(opId: String): LogicalOp = operatorMap(OperatorIdentity(opId))

def getOperator(opId: OperatorIdentity): LogicalOp = operatorMap(opId)

def getSourceOperatorIds: List[OperatorIdentity] =
operatorMap.keys.filter(op => jgraphtDag.inDegreeOf(op) == 0).toList

def getTerminalOperatorIds: List[OperatorIdentity] =
operatorMap.keys
.filter(op => jgraphtDag.outDegreeOf(op) == 0)
.toList

def getAncestorOpIds(opId: OperatorIdentity): Set[OperatorIdentity] = {
jgraphtDag.getAncestors(opId).asScala.toSet
}

def getUpstreamOps(opId: OperatorIdentity): List[LogicalOp] = {
jgraphtDag
.incomingEdgesOf(opId)
Expand All @@ -88,64 +73,10 @@ case class LogicalPlan(
.toList
}

def addOperator(op: LogicalOp): LogicalPlan = {
// TODO: fix schema for the new operator
this.copy(operators :+ op, links)
}

def removeOperator(opId: OperatorIdentity): LogicalPlan = {
this.copy(
operators.filter(o => o.operatorIdentifier != opId),
links.filter(l => l.fromOpId != opId && l.toOpId != opId)
)
}

def addLink(
fromOpId: OperatorIdentity,
fromPortId: PortIdentity,
toOpId: OperatorIdentity,
toPortId: PortIdentity
): LogicalPlan = {
val newLink = LogicalLink(
fromOpId,
fromPortId,
toOpId,
toPortId
)
val newLinks = links :+ newLink
this.copy(operators, newLinks)
}

def removeLink(linkToRemove: LogicalLink): LogicalPlan = {
this.copy(operators, links.filter(l => l != linkToRemove))
}

def getDownstreamOps(opId: OperatorIdentity): List[LogicalOp] = {
val downstream = new mutable.ArrayBuffer[LogicalOp]
jgraphtDag
.outgoingEdgesOf(opId)
.forEach(e => downstream += operatorMap(e.toOpId))
downstream.toList
}

def getDownstreamLinks(opId: OperatorIdentity): List[LogicalLink] = {
links.filter(l => l.fromOpId == opId)
}

def getUpstreamLinks(opId: OperatorIdentity): List[LogicalLink] = {
links.filter(l => l.toOpId == opId)
}

def getInputSchemaMap: Map[OperatorIdentity, List[Option[Schema]]] = {
operators
.map(operator => {
operator.operatorIdentifier -> operator.operatorInfo.inputPorts.map(inputPort =>
operator.inputPortToSchemaMapping.get(inputPort.id)
)
})
.toMap
}

/**
* Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi
*
Expand Down Expand Up @@ -180,58 +111,4 @@ case class LogicalPlan(
case _ => // Skip non-ScanSourceOpDesc operators
}
}

def propagateWorkflowSchema(
context: WorkflowContext,
errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
): Unit = {

operators.foreach(operator => {
if (operator.getContext == null) {
operator.setContext(context)
}
})

// propagate output schema following topological order
val topologicalOrderIterator = jgraphtDag.iterator()
topologicalOrderIterator.forEachRemaining(opId => {
val op = getOperator(opId)
val inputSchemas: Array[Option[Schema]] = if (op.isInstanceOf[SourceOperatorDescriptor]) {
Array()
} else {
op.operatorInfo.inputPorts
.flatMap(inputPort => {
links
.filter(link => link.toOpId == op.operatorIdentifier && link.toPortId == inputPort.id)
.map(link => {
val outputSchemaOpt =
getOperator(link.fromOpId).outputPortToSchemaMapping.get(link.fromPortId)
if (outputSchemaOpt.isDefined) {
op.inputPortToSchemaMapping(inputPort.id) = outputSchemaOpt.get
}
outputSchemaOpt
})
})
.toArray
}

if (!inputSchemas.contains(None)) {
Try(op.getOutputSchemas(inputSchemas.map(_.get))) match {
case Success(outputSchemas) =>
op.operatorInfo.outputPorts.foreach(outputPort =>
op.outputPortToSchemaMapping(outputPort.id) = outputSchemas(outputPort.id.id)
)
assert(outputSchemas.length == op.operatorInfo.outputPorts.length)
case Failure(err) =>
logger.error("got error", err)
errorList match {
case Some(list) => list.append((opId, err))
case None => // Throw the error if no errorList is provided
throw err
}
}

}
})
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class WorkflowCompiler(
logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId =>
Try {
val logicalOp = logicalPlan.getOperator(logicalOpId)
logicalOp.setContext(context)

val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId)
subPlan
Expand Down Expand Up @@ -165,10 +164,7 @@ class WorkflowCompiler(
// 2. resolve the file name in each scan source operator
logicalPlan.resolveScanSourceOpFileName(None)

// 3. Propagate the schema to get the input & output schemas for each port of each operator
logicalPlan.propagateWorkflowSchema(context, None)

// 4. expand the logical plan to the physical plan, and assign storage
// 3. expand the logical plan to the physical plan, and assign storage
val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None)

Workflow(context, logicalPlan, physicalPlan)
Expand Down
Loading

0 comments on commit d85ce7a

Please sign in to comment.