From d85ce7aa768c21faf1f0ca114c0cb2e6e7cc2848 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 29 Dec 2024 18:11:21 -0800 Subject: [PATCH] Remove logical schema propagation (#3177) Schema propagation is now handled in the physical plan to ensure all ports, both external and internal, have an associated schema. As a result, schema propagation in the logical plan is no longer necessary. In addition, workflow context was used to give user information so that schema propagation can resolve file names. This is also no longer needed as we are resolving files through an explicit call as a step of compiling. WorkflowContext is no longer needed to be set to Logical Operator. --- .../ExecutionReconfigurationService.scala | 1 - .../uci/ics/texera/workflow/LogicalPlan.scala | 123 ----------- .../uci/ics/texera/workflow/LogicalPort.scala | 25 --- .../workflow/SinkInjectionTransformer.scala | 1 - .../texera/workflow/WorkflowCompiler.scala | 6 +- .../workflow/SchemaPropagationSpec.scala | 199 ------------------ .../ics/amber/compiler/WorkflowCompiler.scala | 3 +- .../amber/compiler/model/LogicalPlan.scala | 106 ---------- .../uci/ics/amber/operator/LogicalOp.scala | 49 +---- .../operator/hashJoin/HashJoinOpDesc.scala | 2 +- .../sentiment/SentimentAnalysisOpDesc.scala | 4 +- .../source/scan/ScanSourceOpDesc.scala | 5 - .../scan/csv/CSVScanSourceOpDescSpec.scala | 8 - 13 files changed, 11 insertions(+), 521 deletions(-) delete mode 100644 core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala delete mode 100644 core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala delete mode 100644 core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala index 393fd4b7433..9fa342d470a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala @@ -55,7 +55,6 @@ class ExecutionReconfigurationService( // they are not actually performed until the workflow is resumed def modifyOperatorLogic(modifyLogicRequest: ModifyLogicRequest): TexeraWebSocketEvent = { val newOp = modifyLogicRequest.operator - newOp.setContext(workflow.context) val opId = newOp.operatorIdentifier val currentOp = workflow.logicalPlan.getOperator(opId) val reconfiguredPhysicalOp = diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala index 27d2e504ddf..46bbb441cbe 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala @@ -2,19 +2,14 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.FileResolver -import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc import edu.uci.ics.amber.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.workflow.PortIdentity import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.util.SupplierUtil import java.util -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.SetHasAsScala import scala.util.{Failure, Success, Try} @@ -48,7 +43,6 @@ object LogicalPlan { ): LogicalPlan = { LogicalPlan(pojo.operators, pojo.links) } - } case class LogicalPlan( @@ -64,22 +58,13 @@ case class LogicalPlan( def getTopologicalOpIds: util.Iterator[OperatorIdentity] = jgraphtDag.iterator() - def getOperator(opId: String): LogicalOp = operatorMap(OperatorIdentity(opId)) - def getOperator(opId: OperatorIdentity): LogicalOp = operatorMap(opId) - def getSourceOperatorIds: List[OperatorIdentity] = - operatorMap.keys.filter(op => jgraphtDag.inDegreeOf(op) == 0).toList - def getTerminalOperatorIds: List[OperatorIdentity] = operatorMap.keys .filter(op => jgraphtDag.outDegreeOf(op) == 0) .toList - def getAncestorOpIds(opId: OperatorIdentity): Set[OperatorIdentity] = { - jgraphtDag.getAncestors(opId).asScala.toSet - } - def getUpstreamOps(opId: OperatorIdentity): List[LogicalOp] = { jgraphtDag .incomingEdgesOf(opId) @@ -88,64 +73,10 @@ case class LogicalPlan( .toList } - def addOperator(op: LogicalOp): LogicalPlan = { - // TODO: fix schema for the new operator - this.copy(operators :+ op, links) - } - - def removeOperator(opId: OperatorIdentity): LogicalPlan = { - this.copy( - operators.filter(o => o.operatorIdentifier != opId), - links.filter(l => l.fromOpId != opId && l.toOpId != opId) - ) - } - - def addLink( - fromOpId: OperatorIdentity, - fromPortId: PortIdentity, - toOpId: OperatorIdentity, - toPortId: PortIdentity - ): LogicalPlan = { - val newLink = LogicalLink( - fromOpId, - fromPortId, - toOpId, - toPortId - ) - val newLinks = links :+ newLink - this.copy(operators, newLinks) - } - - def removeLink(linkToRemove: LogicalLink): LogicalPlan = { - this.copy(operators, links.filter(l => l != linkToRemove)) - } - - def getDownstreamOps(opId: OperatorIdentity): List[LogicalOp] = { - val downstream = new mutable.ArrayBuffer[LogicalOp] - jgraphtDag - .outgoingEdgesOf(opId) - .forEach(e => downstream += operatorMap(e.toOpId)) - downstream.toList - } - - def getDownstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { - links.filter(l => l.fromOpId == opId) - } - def getUpstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { links.filter(l => l.toOpId == opId) } - def getInputSchemaMap: Map[OperatorIdentity, List[Option[Schema]]] = { - operators - .map(operator => { - operator.operatorIdentifier -> operator.operatorInfo.inputPorts.map(inputPort => - operator.inputPortToSchemaMapping.get(inputPort.id) - ) - }) - .toMap - } - /** * Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi * @@ -180,58 +111,4 @@ case class LogicalPlan( case _ => // Skip non-ScanSourceOpDesc operators } } - - def propagateWorkflowSchema( - context: WorkflowContext, - errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] - ): Unit = { - - operators.foreach(operator => { - if (operator.getContext == null) { - operator.setContext(context) - } - }) - - // propagate output schema following topological order - val topologicalOrderIterator = jgraphtDag.iterator() - topologicalOrderIterator.forEachRemaining(opId => { - val op = getOperator(opId) - val inputSchemas: Array[Option[Schema]] = if (op.isInstanceOf[SourceOperatorDescriptor]) { - Array() - } else { - op.operatorInfo.inputPorts - .flatMap(inputPort => { - links - .filter(link => link.toOpId == op.operatorIdentifier && link.toPortId == inputPort.id) - .map(link => { - val outputSchemaOpt = - getOperator(link.fromOpId).outputPortToSchemaMapping.get(link.fromPortId) - if (outputSchemaOpt.isDefined) { - op.inputPortToSchemaMapping(inputPort.id) = outputSchemaOpt.get - } - outputSchemaOpt - }) - }) - .toArray - } - - if (!inputSchemas.contains(None)) { - Try(op.getOutputSchemas(inputSchemas.map(_.get))) match { - case Success(outputSchemas) => - op.operatorInfo.outputPorts.foreach(outputPort => - op.outputPortToSchemaMapping(outputPort.id) = outputSchemas(outputPort.id.id) - ) - assert(outputSchemas.length == op.operatorInfo.outputPorts.length) - case Failure(err) => - logger.error("got error", err) - errorList match { - case Some(list) => list.append((opId, err)) - case None => // Throw the error if no errorList is provided - throw err - } - } - - } - }) - } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala deleted file mode 100644 index c33f29aa729..00000000000 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala +++ /dev/null @@ -1,25 +0,0 @@ -package edu.uci.ics.texera.workflow - -import edu.uci.ics.amber.virtualidentity.OperatorIdentity - -case object LogicalPort { - def apply(operatorIdentity: OperatorIdentity, portOrdinal: Integer): LogicalPort = { - LogicalPort(operatorIdentity.id, portOrdinal) - } - - def apply( - operatorIdentity: OperatorIdentity, - portOrdinal: Integer, - portName: String - ): LogicalPort = { - LogicalPort(operatorIdentity.id, portOrdinal, portName) - } -} - -case class LogicalPort( - operatorID: String, - portOrdinal: Integer = 0, - portName: String = "" -) { - def operatorId: OperatorIdentity = OperatorIdentity(operatorID) -} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala deleted file mode 100644 index 8b137891791..00000000000 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala +++ /dev/null @@ -1 +0,0 @@ - diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index da13af7e53a..615cc937dbf 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -37,7 +37,6 @@ class WorkflowCompiler( logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => Try { val logicalOp = logicalPlan.getOperator(logicalOpId) - logicalOp.setContext(context) val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId) subPlan @@ -165,10 +164,7 @@ class WorkflowCompiler( // 2. resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(None) - // 3. Propagate the schema to get the input & output schemas for each port of each operator - logicalPlan.propagateWorkflowSchema(context, None) - - // 4. expand the logical plan to the physical plan, and assign storage + // 3. expand the logical plan to the physical plan, and assign storage val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) Workflow(context, logicalPlan, physicalPlan) diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala deleted file mode 100644 index cb77ae266aa..00000000000 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala +++ /dev/null @@ -1,199 +0,0 @@ -package edu.uci.ics.texera.workflow - -import edu.uci.ics.amber.core.tuple.{AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, WorkflowContext} -import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.metadata.OperatorInfo -import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} -import org.apache.arrow.util.Preconditions -import org.scalatest.BeforeAndAfter -import org.scalatest.flatspec.AnyFlatSpec - -class SchemaPropagationSpec extends AnyFlatSpec with BeforeAndAfter { - - private abstract class TempTestSourceOpDesc extends SourceOperatorDescriptor { - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo("", "", "", List(InputPort()), List(OutputPort())) - } - - private class TempTestSinkOpDesc extends LogicalOp { - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo("", "", "", List(InputPort()), List(OutputPort())) - - override def getOutputSchema(schemas: Array[Schema]): Schema = { - Preconditions.checkArgument(schemas.length == 1) - schemas(0) - } - } - - it should "propagate workflow schema with multiple input and output ports" in { - // build the following workflow DAG: - // trainingData ---\ /----> mlVizSink - // testingData ----> mlTrainingOp--< - // inferenceData ---------------------> mlInferenceOp --> inferenceSink - - val dataSchema = Schema.builder().add("dataCol", AttributeType.INTEGER).build() - val trainingScan = new TempTestSourceOpDesc() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("trainingScan") - - override def sourceSchema(): Schema = dataSchema - } - - val testingScan = new TempTestSourceOpDesc() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("testingScan") - - override def sourceSchema(): Schema = dataSchema - } - - val inferenceScan = new TempTestSourceOpDesc() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("inferenceScan") - - override def sourceSchema(): Schema = dataSchema - } - - val mlModelSchema = Schema.builder().add("model", AttributeType.STRING).build() - val mlVizSchema = Schema.builder().add("visualization", AttributeType.STRING).build() - - val mlTrainingOp = new LogicalOp() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("mlTrainingOp") - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo( - "", - "", - "", - List( - InputPort(displayName = "training"), - InputPort(PortIdentity(0), displayName = "testing") - ), - List( - OutputPort(displayName = "visualization"), - OutputPort(PortIdentity(1), displayName = "model") - ) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = ??? - - override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] = { - Preconditions.checkArgument(schemas.length == 2) - Preconditions.checkArgument(schemas.distinct.length == 1) - Array(mlVizSchema, mlModelSchema) - } - } - - val mlInferOp = new LogicalOp() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("mlInferOp") - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo( - "", - "", - "", - List(InputPort(displayName = "model"), InputPort(PortIdentity(1), displayName = "data")), - List(OutputPort(displayName = "data")) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = ??? - - override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] = { - Preconditions.checkArgument(schemas.length == 2) - Array(schemas(1)) - } - } - - val mlVizSink = new TempTestSinkOpDesc { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("mlVizSink") - } - - val inferenceSink = new TempTestSinkOpDesc { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("inferenceSink") - } - - val operators = List( - trainingScan, - testingScan, - inferenceScan, - mlTrainingOp, - mlInferOp, - mlVizSink, - inferenceSink - ) - - val links = List( - LogicalLink( - trainingScan.operatorIdentifier, - PortIdentity(), - mlTrainingOp.operatorIdentifier, - PortIdentity() - ), - LogicalLink( - testingScan.operatorIdentifier, - PortIdentity(), - mlTrainingOp.operatorIdentifier, - PortIdentity(1) - ), - LogicalLink( - inferenceScan.operatorIdentifier, - PortIdentity(), - mlInferOp.operatorIdentifier, - PortIdentity(1) - ), - LogicalLink( - mlTrainingOp.operatorIdentifier, - PortIdentity(), - mlVizSink.operatorIdentifier, - PortIdentity(0) - ), - LogicalLink( - mlTrainingOp.operatorIdentifier, - PortIdentity(1), - mlInferOp.operatorIdentifier, - PortIdentity() - ), - LogicalLink( - mlInferOp.operatorIdentifier, - PortIdentity(), - inferenceSink.operatorIdentifier, - PortIdentity() - ) - ) - - val ctx = new WorkflowContext() - val logicalPlan = LogicalPlan(operators, links) - logicalPlan.propagateWorkflowSchema(ctx, None) - val schemaResult = logicalPlan.getInputSchemaMap - - assert(schemaResult(mlTrainingOp.operatorIdentifier).head.get.equals(dataSchema)) - assert(schemaResult(mlTrainingOp.operatorIdentifier)(1).get.equals(dataSchema)) - - assert(schemaResult(mlInferOp.operatorIdentifier).head.get.equals(mlModelSchema)) - assert(schemaResult(mlInferOp.operatorIdentifier)(1).get.equals(dataSchema)) - - assert(schemaResult(mlVizSink.operatorIdentifier).head.get.equals(mlVizSchema)) - assert(schemaResult(inferenceSink.operatorIdentifier).head.get.equals(dataSchema)) - - } - -} diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala index 8a23c681ef7..4a50f33f806 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala @@ -110,7 +110,6 @@ class WorkflowCompiler( logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => Try { val logicalOp = logicalPlan.getOperator(logicalOpId) - logicalOp.setContext(context) val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId) subPlan @@ -166,7 +165,7 @@ class WorkflowCompiler( // 1. convert the pojo to logical plan val logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) - // - resolve the file name in each scan source operator + // 2. resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(Some(errorList)) // 3. expand the logical plan to the physical plan diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala index c1a23c00ace..8b599176cd7 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala @@ -67,38 +67,11 @@ case class LogicalPlan( def getOperator(opId: OperatorIdentity): LogicalOp = operatorMap(opId) - def getSourceOperatorIds: List[OperatorIdentity] = - operatorMap.keys.filter(op => jgraphtDag.inDegreeOf(op) == 0).toList - - def getTerminalOperatorIds: List[OperatorIdentity] = - operatorMap.keys - .filter(op => jgraphtDag.outDegreeOf(op) == 0) - .toList - - def getAncestorOpIds(opId: OperatorIdentity): Set[OperatorIdentity] = { - jgraphtDag.getAncestors(opId).asScala.toSet - } - - def getUpstreamOps(opId: OperatorIdentity): List[LogicalOp] = { - jgraphtDag - .incomingEdgesOf(opId) - .asScala - .map(e => operatorMap(e.fromOpId)) - .toList - } - def addOperator(op: LogicalOp): LogicalPlan = { // TODO: fix schema for the new operator this.copy(operators :+ op, links) } - def removeOperator(opId: OperatorIdentity): LogicalPlan = { - this.copy( - operators.filter(o => o.operatorIdentifier != opId), - links.filter(l => l.fromOpId != opId && l.toOpId != opId) - ) - } - def addLink( fromOpId: OperatorIdentity, fromPortId: PortIdentity, @@ -115,36 +88,10 @@ case class LogicalPlan( this.copy(operators, newLinks) } - def removeLink(linkToRemove: LogicalLink): LogicalPlan = { - this.copy(operators, links.filter(l => l != linkToRemove)) - } - - def getDownstreamOps(opId: OperatorIdentity): List[LogicalOp] = { - val downstream = new mutable.ArrayBuffer[LogicalOp] - jgraphtDag - .outgoingEdgesOf(opId) - .forEach(e => downstream += operatorMap(e.toOpId)) - downstream.toList - } - - def getDownstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { - links.filter(l => l.fromOpId == opId) - } - def getUpstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { links.filter(l => l.toOpId == opId) } - def getInputSchemaMap: Map[OperatorIdentity, List[Option[Schema]]] = { - operators - .map(operator => { - operator.operatorIdentifier -> operator.operatorInfo.inputPorts.map(inputPort => - operator.inputPortToSchemaMapping.get(inputPort.id) - ) - }) - .toMap - } - /** * Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi * @param errorList if given, put errors during resolving to it @@ -170,57 +117,4 @@ case class LogicalPlan( case _ => // Skip non-ScanSourceOpDesc operators } } - - def propagateWorkflowSchema( - context: WorkflowContext, - errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] - ): Unit = { - - operators.foreach(operator => { - if (operator.getContext == null) { - operator.setContext(context) - } - }) - - // propagate output schema following topological order - val topologicalOrderIterator = jgraphtDag.iterator() - topologicalOrderIterator.forEachRemaining(opId => { - val op = getOperator(opId) - val inputSchemas: Array[Option[Schema]] = if (op.isInstanceOf[SourceOperatorDescriptor]) { - Array() - } else { - op.operatorInfo.inputPorts - .flatMap(inputPort => { - links - .filter(link => link.toOpId == op.operatorIdentifier && link.toPortId == inputPort.id) - .map(link => { - val outputSchemaOpt = - getOperator(link.fromOpId).outputPortToSchemaMapping.get(link.fromPortId) - if (outputSchemaOpt.isDefined) { - op.inputPortToSchemaMapping(inputPort.id) = outputSchemaOpt.get - } - outputSchemaOpt - }) - }) - .toArray - } - - if (!inputSchemas.contains(None)) { - Try(op.getOutputSchemas(inputSchemas.map(_.get))) match { - case Success(outputSchemas) => - op.operatorInfo.outputPorts.foreach(outputPort => - op.outputPortToSchemaMapping(outputPort.id) = outputSchemas(outputPort.id.id) - ) - assert(outputSchemas.length == op.operatorInfo.outputPorts.length) - case Failure(err) => - logger.error("got error", err) - errorList match { - case Some(list) => list.append((opId, err)) - case None => - } - } - - } - }) - } } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 925fc5314a0..e374fac80c7 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.executor.OperatorExecutor import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} import edu.uci.ics.amber.operator.aggregate.AggregateOpDesc import edu.uci.ics.amber.operator.cartesianProduct.CartesianProductOpDesc import edu.uci.ics.amber.operator.dictionary.DictionaryMatcherOpDesc @@ -37,35 +37,7 @@ import edu.uci.ics.amber.operator.randomksampling.RandomKSamplingOpDesc import edu.uci.ics.amber.operator.regex.RegexOpDesc import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc import edu.uci.ics.amber.operator.sentiment.SentimentAnalysisOpDesc -import edu.uci.ics.amber.operator.sklearn.{ - SklearnAdaptiveBoostingOpDesc, - SklearnBaggingOpDesc, - SklearnBernoulliNaiveBayesOpDesc, - SklearnComplementNaiveBayesOpDesc, - SklearnDecisionTreeOpDesc, - SklearnDummyClassifierOpDesc, - SklearnExtraTreeOpDesc, - SklearnExtraTreesOpDesc, - SklearnGaussianNaiveBayesOpDesc, - SklearnGradientBoostingOpDesc, - SklearnKNNOpDesc, - SklearnLinearRegressionOpDesc, - SklearnLinearSVMOpDesc, - SklearnLogisticRegressionCVOpDesc, - SklearnLogisticRegressionOpDesc, - SklearnMultiLayerPerceptronOpDesc, - SklearnMultinomialNaiveBayesOpDesc, - SklearnNearestCentroidOpDesc, - SklearnPassiveAggressiveOpDesc, - SklearnPerceptronOpDesc, - SklearnPredictionOpDesc, - SklearnProbabilityCalibrationOpDesc, - SklearnRandomForestOpDesc, - SklearnRidgeCVOpDesc, - SklearnRidgeOpDesc, - SklearnSDGOpDesc, - SklearnSVMOpDesc -} +import edu.uci.ics.amber.operator.sklearn._ import edu.uci.ics.amber.operator.sort.SortOpDesc import edu.uci.ics.amber.operator.sortPartitions.SortPartitionsOpDesc import edu.uci.ics.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc @@ -75,6 +47,7 @@ import edu.uci.ics.amber.operator.source.apis.twitter.v2.{ } import edu.uci.ics.amber.operator.source.fetcher.URLFetcherOpDesc import edu.uci.ics.amber.operator.source.scan.FileScanSourceOpDesc +import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc import edu.uci.ics.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc import edu.uci.ics.amber.operator.source.scan.json.JSONLScanSourceOpDesc @@ -121,7 +94,6 @@ import edu.uci.ics.amber.operator.visualization.ternaryPlot.TernaryPlotOpDesc import edu.uci.ics.amber.operator.visualization.urlviz.UrlVizOpDesc import edu.uci.ics.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import edu.uci.ics.amber.operator.visualization.wordCloud.WordCloudOpDesc -import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.PortIdentity import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} @@ -306,14 +278,11 @@ trait StateTransferFunc ) abstract class LogicalOp extends PortDescriptor with Serializable { - @JsonIgnore - private var context: WorkflowContext = _ - @JsonProperty(PropertyNameConstants.OPERATOR_ID) private var operatorId: String = getClass.getSimpleName + "-" + UUID.randomUUID.toString @JsonProperty(PropertyNameConstants.OPERATOR_VERSION) - var operatorVersion: String = getOperatorVersion() + var operatorVersion: String = getOperatorVersion @JsonIgnore val inputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() @@ -334,7 +303,7 @@ abstract class LogicalOp extends PortDescriptor with Serializable { workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalPlan = { - new PhysicalPlan( + PhysicalPlan( operators = Set(getPhysicalOp(workflowId, executionId)), links = Set.empty ) @@ -344,7 +313,7 @@ abstract class LogicalOp extends PortDescriptor with Serializable { def getOutputSchema(schemas: Array[Schema]): Schema - private def getOperatorVersion(): String = { + private def getOperatorVersion: String = { val path = "core/amber/src/main/scala/" val operatorPath = path + this.getClass.getPackage.getName.replace(".", "/") OPVersion.getVersion(this.getClass.getSimpleName, operatorPath) @@ -361,12 +330,6 @@ abstract class LogicalOp extends PortDescriptor with Serializable { override def toString: String = ToStringBuilder.reflectionToString(this) - def getContext: WorkflowContext = this.context - - def setContext(workflowContext: WorkflowContext): Unit = { - this.context = workflowContext - } - def setOperatorId(id: String): Unit = { operatorId = id } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala index 3bb144b70a8..c40d736d135 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala @@ -121,7 +121,7 @@ class HashJoinOpDesc[K] extends LogicalOp { ) ) - new PhysicalPlan( + PhysicalPlan( operators = Set(buildPhysicalOp, probePhysicalOp), links = Set( PhysicalLink( diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala index a77c67b9143..27a988d9f37 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala @@ -57,8 +57,8 @@ class SentimentAnalysisOpDesc extends MapOpDesc { ) } - override def operatorInfo = - new OperatorInfo( + override def operatorInfo: OperatorInfo = + OperatorInfo( "Sentiment Analysis", "analysis the sentiment of a text using machine learning", OperatorGroupConstants.MACHINE_LEARNING_GROUP, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala index cb779ce96ee..80aee74cf63 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty, JsonPropertyD import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor import edu.uci.ics.amber.workflow.OutputPort @@ -54,10 +53,6 @@ abstract class ScanSourceOpDesc extends SourceOperatorDescriptor { inferSchema() } - override def setContext(workflowContext: WorkflowContext): Unit = { - super.setContext(workflowContext) - } - override def operatorInfo: OperatorInfo = { OperatorInfo( userFriendlyName = s"${fileTypeName.getOrElse("Unknown")} File Scan", diff --git a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala index 5d4be121d08..06dd412b8b9 100644 --- a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala +++ b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala @@ -2,7 +2,6 @@ package edu.uci.ics.amber.operator.source.scan.csv import edu.uci.ics.amber.core.storage.FileResolver import edu.uci.ics.amber.core.tuple.{AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.workflow.PortIdentity @@ -11,7 +10,6 @@ import org.scalatest.flatspec.AnyFlatSpec class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { - val workflowContext = new WorkflowContext() var csvScanSourceOpDesc: CSVScanSourceOpDesc = _ var parallelCsvScanSourceOpDesc: ParallelCSVScanSourceOpDesc = _ before { @@ -28,7 +26,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesSmallCsvPath) parallelCsvScanSourceOpDesc.customDelimiter = Some(",") parallelCsvScanSourceOpDesc.hasHeader = true - parallelCsvScanSourceOpDesc.setContext(workflowContext) parallelCsvScanSourceOpDesc.setFileUri( FileResolver.resolve(parallelCsvScanSourceOpDesc.fileName.get) ) @@ -45,7 +42,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesHeaderlessSmallCsvPath) parallelCsvScanSourceOpDesc.customDelimiter = Some(",") parallelCsvScanSourceOpDesc.hasHeader = false - parallelCsvScanSourceOpDesc.setContext(workflowContext) parallelCsvScanSourceOpDesc.setFileUri( FileResolver.resolve(parallelCsvScanSourceOpDesc.fileName.get) ) @@ -62,7 +58,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesSmallMultiLineCsvPath) csvScanSourceOpDesc.customDelimiter = Some(",") csvScanSourceOpDesc.hasHeader = true - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -77,7 +72,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesHeaderlessSmallCsvPath) csvScanSourceOpDesc.customDelimiter = Some(",") csvScanSourceOpDesc.hasHeader = false - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -93,7 +87,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { Some(TestOperators.CountrySalesSmallMultiLineCustomDelimiterCsvPath) csvScanSourceOpDesc.customDelimiter = Some(";") csvScanSourceOpDesc.hasHeader = false - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -109,7 +102,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { Some(TestOperators.CountrySalesSmallMultiLineCustomDelimiterCsvPath) csvScanSourceOpDesc.customDelimiter = Some(";") csvScanSourceOpDesc.hasHeader = false - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) assert(