Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove logical schema propagation #3177

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class ExecutionReconfigurationService(
// they are not actually performed until the workflow is resumed
def modifyOperatorLogic(modifyLogicRequest: ModifyLogicRequest): TexeraWebSocketEvent = {
val newOp = modifyLogicRequest.operator
newOp.setContext(workflow.context)
val opId = newOp.operatorIdentifier
val currentOp = workflow.logicalPlan.getOperator(opId)
val reconfiguredPhysicalOp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ package edu.uci.ics.texera.workflow

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.FileResolver
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor
import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.workflow.PortIdentity
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.util.SupplierUtil

import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.SetHasAsScala
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -48,7 +43,6 @@ object LogicalPlan {
): LogicalPlan = {
LogicalPlan(pojo.operators, pojo.links)
}

}

case class LogicalPlan(
Expand All @@ -64,22 +58,13 @@ case class LogicalPlan(

def getTopologicalOpIds: util.Iterator[OperatorIdentity] = jgraphtDag.iterator()

def getOperator(opId: String): LogicalOp = operatorMap(OperatorIdentity(opId))

def getOperator(opId: OperatorIdentity): LogicalOp = operatorMap(opId)

def getSourceOperatorIds: List[OperatorIdentity] =
operatorMap.keys.filter(op => jgraphtDag.inDegreeOf(op) == 0).toList

def getTerminalOperatorIds: List[OperatorIdentity] =
operatorMap.keys
.filter(op => jgraphtDag.outDegreeOf(op) == 0)
.toList

def getAncestorOpIds(opId: OperatorIdentity): Set[OperatorIdentity] = {
jgraphtDag.getAncestors(opId).asScala.toSet
}

def getUpstreamOps(opId: OperatorIdentity): List[LogicalOp] = {
jgraphtDag
.incomingEdgesOf(opId)
Expand All @@ -88,64 +73,10 @@ case class LogicalPlan(
.toList
}

def addOperator(op: LogicalOp): LogicalPlan = {
// TODO: fix schema for the new operator
this.copy(operators :+ op, links)
}

def removeOperator(opId: OperatorIdentity): LogicalPlan = {
this.copy(
operators.filter(o => o.operatorIdentifier != opId),
links.filter(l => l.fromOpId != opId && l.toOpId != opId)
)
}

def addLink(
fromOpId: OperatorIdentity,
fromPortId: PortIdentity,
toOpId: OperatorIdentity,
toPortId: PortIdentity
): LogicalPlan = {
val newLink = LogicalLink(
fromOpId,
fromPortId,
toOpId,
toPortId
)
val newLinks = links :+ newLink
this.copy(operators, newLinks)
}

def removeLink(linkToRemove: LogicalLink): LogicalPlan = {
this.copy(operators, links.filter(l => l != linkToRemove))
}

def getDownstreamOps(opId: OperatorIdentity): List[LogicalOp] = {
val downstream = new mutable.ArrayBuffer[LogicalOp]
jgraphtDag
.outgoingEdgesOf(opId)
.forEach(e => downstream += operatorMap(e.toOpId))
downstream.toList
}

def getDownstreamLinks(opId: OperatorIdentity): List[LogicalLink] = {
links.filter(l => l.fromOpId == opId)
}

def getUpstreamLinks(opId: OperatorIdentity): List[LogicalLink] = {
links.filter(l => l.toOpId == opId)
}

def getInputSchemaMap: Map[OperatorIdentity, List[Option[Schema]]] = {
operators
.map(operator => {
operator.operatorIdentifier -> operator.operatorInfo.inputPorts.map(inputPort =>
operator.inputPortToSchemaMapping.get(inputPort.id)
)
})
.toMap
}

/**
* Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi
*
Expand Down Expand Up @@ -180,58 +111,4 @@ case class LogicalPlan(
case _ => // Skip non-ScanSourceOpDesc operators
}
}

def propagateWorkflowSchema(
context: WorkflowContext,
errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
): Unit = {

operators.foreach(operator => {
if (operator.getContext == null) {
operator.setContext(context)
}
})

// propagate output schema following topological order
val topologicalOrderIterator = jgraphtDag.iterator()
topologicalOrderIterator.forEachRemaining(opId => {
val op = getOperator(opId)
val inputSchemas: Array[Option[Schema]] = if (op.isInstanceOf[SourceOperatorDescriptor]) {
Array()
} else {
op.operatorInfo.inputPorts
.flatMap(inputPort => {
links
.filter(link => link.toOpId == op.operatorIdentifier && link.toPortId == inputPort.id)
.map(link => {
val outputSchemaOpt =
getOperator(link.fromOpId).outputPortToSchemaMapping.get(link.fromPortId)
if (outputSchemaOpt.isDefined) {
op.inputPortToSchemaMapping(inputPort.id) = outputSchemaOpt.get
}
outputSchemaOpt
})
})
.toArray
}

if (!inputSchemas.contains(None)) {
Try(op.getOutputSchemas(inputSchemas.map(_.get))) match {
case Success(outputSchemas) =>
op.operatorInfo.outputPorts.foreach(outputPort =>
op.outputPortToSchemaMapping(outputPort.id) = outputSchemas(outputPort.id.id)
)
assert(outputSchemas.length == op.operatorInfo.outputPorts.length)
case Failure(err) =>
logger.error("got error", err)
errorList match {
case Some(list) => list.append((opId, err))
case None => // Throw the error if no errorList is provided
throw err
}
}

}
})
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class WorkflowCompiler(
logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId =>
Try {
val logicalOp = logicalPlan.getOperator(logicalOpId)
logicalOp.setContext(context)

val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId)
subPlan
Expand Down Expand Up @@ -165,10 +164,7 @@ class WorkflowCompiler(
// 2. resolve the file name in each scan source operator
logicalPlan.resolveScanSourceOpFileName(None)

// 3. Propagate the schema to get the input & output schemas for each port of each operator
logicalPlan.propagateWorkflowSchema(context, None)

// 4. expand the logical plan to the physical plan, and assign storage
// 3. expand the logical plan to the physical plan, and assign storage
val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None)

Workflow(context, logicalPlan, physicalPlan)
Expand Down
Loading
Loading