diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala index 393fd4b7433..9fa342d470a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionReconfigurationService.scala @@ -55,7 +55,6 @@ class ExecutionReconfigurationService( // they are not actually performed until the workflow is resumed def modifyOperatorLogic(modifyLogicRequest: ModifyLogicRequest): TexeraWebSocketEvent = { val newOp = modifyLogicRequest.operator - newOp.setContext(workflow.context) val opId = newOp.operatorIdentifier val currentOp = workflow.logicalPlan.getOperator(opId) val reconfiguredPhysicalOp = diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala index 27d2e504ddf..46bbb441cbe 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPlan.scala @@ -2,19 +2,14 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.FileResolver -import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor import edu.uci.ics.amber.operator.source.scan.ScanSourceOpDesc import edu.uci.ics.amber.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.workflow.PortIdentity import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.util.SupplierUtil import java.util -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.SetHasAsScala import scala.util.{Failure, Success, Try} @@ -48,7 +43,6 @@ object LogicalPlan { ): LogicalPlan = { LogicalPlan(pojo.operators, pojo.links) } - } case class LogicalPlan( @@ -64,22 +58,13 @@ case class LogicalPlan( def getTopologicalOpIds: util.Iterator[OperatorIdentity] = jgraphtDag.iterator() - def getOperator(opId: String): LogicalOp = operatorMap(OperatorIdentity(opId)) - def getOperator(opId: OperatorIdentity): LogicalOp = operatorMap(opId) - def getSourceOperatorIds: List[OperatorIdentity] = - operatorMap.keys.filter(op => jgraphtDag.inDegreeOf(op) == 0).toList - def getTerminalOperatorIds: List[OperatorIdentity] = operatorMap.keys .filter(op => jgraphtDag.outDegreeOf(op) == 0) .toList - def getAncestorOpIds(opId: OperatorIdentity): Set[OperatorIdentity] = { - jgraphtDag.getAncestors(opId).asScala.toSet - } - def getUpstreamOps(opId: OperatorIdentity): List[LogicalOp] = { jgraphtDag .incomingEdgesOf(opId) @@ -88,64 +73,10 @@ case class LogicalPlan( .toList } - def addOperator(op: LogicalOp): LogicalPlan = { - // TODO: fix schema for the new operator - this.copy(operators :+ op, links) - } - - def removeOperator(opId: OperatorIdentity): LogicalPlan = { - this.copy( - operators.filter(o => o.operatorIdentifier != opId), - links.filter(l => l.fromOpId != opId && l.toOpId != opId) - ) - } - - def addLink( - fromOpId: OperatorIdentity, - fromPortId: PortIdentity, - toOpId: OperatorIdentity, - toPortId: PortIdentity - ): LogicalPlan = { - val newLink = LogicalLink( - fromOpId, - fromPortId, - toOpId, - toPortId - ) - val newLinks = links :+ newLink - this.copy(operators, newLinks) - } - - def removeLink(linkToRemove: LogicalLink): LogicalPlan = { - this.copy(operators, links.filter(l => l != linkToRemove)) - } - - def getDownstreamOps(opId: OperatorIdentity): List[LogicalOp] = { - val downstream = new mutable.ArrayBuffer[LogicalOp] - jgraphtDag - .outgoingEdgesOf(opId) - .forEach(e => downstream += operatorMap(e.toOpId)) - downstream.toList - } - - def getDownstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { - links.filter(l => l.fromOpId == opId) - } - def getUpstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { links.filter(l => l.toOpId == opId) } - def getInputSchemaMap: Map[OperatorIdentity, List[Option[Schema]]] = { - operators - .map(operator => { - operator.operatorIdentifier -> operator.operatorInfo.inputPorts.map(inputPort => - operator.inputPortToSchemaMapping.get(inputPort.id) - ) - }) - .toMap - } - /** * Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi * @@ -180,58 +111,4 @@ case class LogicalPlan( case _ => // Skip non-ScanSourceOpDesc operators } } - - def propagateWorkflowSchema( - context: WorkflowContext, - errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] - ): Unit = { - - operators.foreach(operator => { - if (operator.getContext == null) { - operator.setContext(context) - } - }) - - // propagate output schema following topological order - val topologicalOrderIterator = jgraphtDag.iterator() - topologicalOrderIterator.forEachRemaining(opId => { - val op = getOperator(opId) - val inputSchemas: Array[Option[Schema]] = if (op.isInstanceOf[SourceOperatorDescriptor]) { - Array() - } else { - op.operatorInfo.inputPorts - .flatMap(inputPort => { - links - .filter(link => link.toOpId == op.operatorIdentifier && link.toPortId == inputPort.id) - .map(link => { - val outputSchemaOpt = - getOperator(link.fromOpId).outputPortToSchemaMapping.get(link.fromPortId) - if (outputSchemaOpt.isDefined) { - op.inputPortToSchemaMapping(inputPort.id) = outputSchemaOpt.get - } - outputSchemaOpt - }) - }) - .toArray - } - - if (!inputSchemas.contains(None)) { - Try(op.getOutputSchemas(inputSchemas.map(_.get))) match { - case Success(outputSchemas) => - op.operatorInfo.outputPorts.foreach(outputPort => - op.outputPortToSchemaMapping(outputPort.id) = outputSchemas(outputPort.id.id) - ) - assert(outputSchemas.length == op.operatorInfo.outputPorts.length) - case Failure(err) => - logger.error("got error", err) - errorList match { - case Some(list) => list.append((opId, err)) - case None => // Throw the error if no errorList is provided - throw err - } - } - - } - }) - } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala deleted file mode 100644 index c33f29aa729..00000000000 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/LogicalPort.scala +++ /dev/null @@ -1,25 +0,0 @@ -package edu.uci.ics.texera.workflow - -import edu.uci.ics.amber.virtualidentity.OperatorIdentity - -case object LogicalPort { - def apply(operatorIdentity: OperatorIdentity, portOrdinal: Integer): LogicalPort = { - LogicalPort(operatorIdentity.id, portOrdinal) - } - - def apply( - operatorIdentity: OperatorIdentity, - portOrdinal: Integer, - portName: String - ): LogicalPort = { - LogicalPort(operatorIdentity.id, portOrdinal, portName) - } -} - -case class LogicalPort( - operatorID: String, - portOrdinal: Integer = 0, - portName: String = "" -) { - def operatorId: OperatorIdentity = OperatorIdentity(operatorID) -} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala deleted file mode 100644 index 8b137891791..00000000000 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala +++ /dev/null @@ -1 +0,0 @@ - diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index da13af7e53a..615cc937dbf 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -37,7 +37,6 @@ class WorkflowCompiler( logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => Try { val logicalOp = logicalPlan.getOperator(logicalOpId) - logicalOp.setContext(context) val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId) subPlan @@ -165,10 +164,7 @@ class WorkflowCompiler( // 2. resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(None) - // 3. Propagate the schema to get the input & output schemas for each port of each operator - logicalPlan.propagateWorkflowSchema(context, None) - - // 4. expand the logical plan to the physical plan, and assign storage + // 3. expand the logical plan to the physical plan, and assign storage val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) Workflow(context, logicalPlan, physicalPlan) diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala deleted file mode 100644 index cb77ae266aa..00000000000 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala +++ /dev/null @@ -1,199 +0,0 @@ -package edu.uci.ics.texera.workflow - -import edu.uci.ics.amber.core.tuple.{AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, WorkflowContext} -import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.metadata.OperatorInfo -import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} -import org.apache.arrow.util.Preconditions -import org.scalatest.BeforeAndAfter -import org.scalatest.flatspec.AnyFlatSpec - -class SchemaPropagationSpec extends AnyFlatSpec with BeforeAndAfter { - - private abstract class TempTestSourceOpDesc extends SourceOperatorDescriptor { - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo("", "", "", List(InputPort()), List(OutputPort())) - } - - private class TempTestSinkOpDesc extends LogicalOp { - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo("", "", "", List(InputPort()), List(OutputPort())) - - override def getOutputSchema(schemas: Array[Schema]): Schema = { - Preconditions.checkArgument(schemas.length == 1) - schemas(0) - } - } - - it should "propagate workflow schema with multiple input and output ports" in { - // build the following workflow DAG: - // trainingData ---\ /----> mlVizSink - // testingData ----> mlTrainingOp--< - // inferenceData ---------------------> mlInferenceOp --> inferenceSink - - val dataSchema = Schema.builder().add("dataCol", AttributeType.INTEGER).build() - val trainingScan = new TempTestSourceOpDesc() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("trainingScan") - - override def sourceSchema(): Schema = dataSchema - } - - val testingScan = new TempTestSourceOpDesc() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("testingScan") - - override def sourceSchema(): Schema = dataSchema - } - - val inferenceScan = new TempTestSourceOpDesc() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("inferenceScan") - - override def sourceSchema(): Schema = dataSchema - } - - val mlModelSchema = Schema.builder().add("model", AttributeType.STRING).build() - val mlVizSchema = Schema.builder().add("visualization", AttributeType.STRING).build() - - val mlTrainingOp = new LogicalOp() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("mlTrainingOp") - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo( - "", - "", - "", - List( - InputPort(displayName = "training"), - InputPort(PortIdentity(0), displayName = "testing") - ), - List( - OutputPort(displayName = "visualization"), - OutputPort(PortIdentity(1), displayName = "model") - ) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = ??? - - override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] = { - Preconditions.checkArgument(schemas.length == 2) - Preconditions.checkArgument(schemas.distinct.length == 1) - Array(mlVizSchema, mlModelSchema) - } - } - - val mlInferOp = new LogicalOp() { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("mlInferOp") - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = ??? - - override def operatorInfo: OperatorInfo = - OperatorInfo( - "", - "", - "", - List(InputPort(displayName = "model"), InputPort(PortIdentity(1), displayName = "data")), - List(OutputPort(displayName = "data")) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = ??? - - override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] = { - Preconditions.checkArgument(schemas.length == 2) - Array(schemas(1)) - } - } - - val mlVizSink = new TempTestSinkOpDesc { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("mlVizSink") - } - - val inferenceSink = new TempTestSinkOpDesc { - override def operatorIdentifier: OperatorIdentity = OperatorIdentity("inferenceSink") - } - - val operators = List( - trainingScan, - testingScan, - inferenceScan, - mlTrainingOp, - mlInferOp, - mlVizSink, - inferenceSink - ) - - val links = List( - LogicalLink( - trainingScan.operatorIdentifier, - PortIdentity(), - mlTrainingOp.operatorIdentifier, - PortIdentity() - ), - LogicalLink( - testingScan.operatorIdentifier, - PortIdentity(), - mlTrainingOp.operatorIdentifier, - PortIdentity(1) - ), - LogicalLink( - inferenceScan.operatorIdentifier, - PortIdentity(), - mlInferOp.operatorIdentifier, - PortIdentity(1) - ), - LogicalLink( - mlTrainingOp.operatorIdentifier, - PortIdentity(), - mlVizSink.operatorIdentifier, - PortIdentity(0) - ), - LogicalLink( - mlTrainingOp.operatorIdentifier, - PortIdentity(1), - mlInferOp.operatorIdentifier, - PortIdentity() - ), - LogicalLink( - mlInferOp.operatorIdentifier, - PortIdentity(), - inferenceSink.operatorIdentifier, - PortIdentity() - ) - ) - - val ctx = new WorkflowContext() - val logicalPlan = LogicalPlan(operators, links) - logicalPlan.propagateWorkflowSchema(ctx, None) - val schemaResult = logicalPlan.getInputSchemaMap - - assert(schemaResult(mlTrainingOp.operatorIdentifier).head.get.equals(dataSchema)) - assert(schemaResult(mlTrainingOp.operatorIdentifier)(1).get.equals(dataSchema)) - - assert(schemaResult(mlInferOp.operatorIdentifier).head.get.equals(mlModelSchema)) - assert(schemaResult(mlInferOp.operatorIdentifier)(1).get.equals(dataSchema)) - - assert(schemaResult(mlVizSink.operatorIdentifier).head.get.equals(mlVizSchema)) - assert(schemaResult(inferenceSink.operatorIdentifier).head.get.equals(dataSchema)) - - } - -} diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala index 8a23c681ef7..4a50f33f806 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala @@ -110,7 +110,6 @@ class WorkflowCompiler( logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => Try { val logicalOp = logicalPlan.getOperator(logicalOpId) - logicalOp.setContext(context) val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId) subPlan @@ -166,7 +165,7 @@ class WorkflowCompiler( // 1. convert the pojo to logical plan val logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) - // - resolve the file name in each scan source operator + // 2. resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(Some(errorList)) // 3. expand the logical plan to the physical plan diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala index c1a23c00ace..8b599176cd7 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/model/LogicalPlan.scala @@ -67,38 +67,11 @@ case class LogicalPlan( def getOperator(opId: OperatorIdentity): LogicalOp = operatorMap(opId) - def getSourceOperatorIds: List[OperatorIdentity] = - operatorMap.keys.filter(op => jgraphtDag.inDegreeOf(op) == 0).toList - - def getTerminalOperatorIds: List[OperatorIdentity] = - operatorMap.keys - .filter(op => jgraphtDag.outDegreeOf(op) == 0) - .toList - - def getAncestorOpIds(opId: OperatorIdentity): Set[OperatorIdentity] = { - jgraphtDag.getAncestors(opId).asScala.toSet - } - - def getUpstreamOps(opId: OperatorIdentity): List[LogicalOp] = { - jgraphtDag - .incomingEdgesOf(opId) - .asScala - .map(e => operatorMap(e.fromOpId)) - .toList - } - def addOperator(op: LogicalOp): LogicalPlan = { // TODO: fix schema for the new operator this.copy(operators :+ op, links) } - def removeOperator(opId: OperatorIdentity): LogicalPlan = { - this.copy( - operators.filter(o => o.operatorIdentifier != opId), - links.filter(l => l.fromOpId != opId && l.toOpId != opId) - ) - } - def addLink( fromOpId: OperatorIdentity, fromPortId: PortIdentity, @@ -115,36 +88,10 @@ case class LogicalPlan( this.copy(operators, newLinks) } - def removeLink(linkToRemove: LogicalLink): LogicalPlan = { - this.copy(operators, links.filter(l => l != linkToRemove)) - } - - def getDownstreamOps(opId: OperatorIdentity): List[LogicalOp] = { - val downstream = new mutable.ArrayBuffer[LogicalOp] - jgraphtDag - .outgoingEdgesOf(opId) - .forEach(e => downstream += operatorMap(e.toOpId)) - downstream.toList - } - - def getDownstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { - links.filter(l => l.fromOpId == opId) - } - def getUpstreamLinks(opId: OperatorIdentity): List[LogicalLink] = { links.filter(l => l.toOpId == opId) } - def getInputSchemaMap: Map[OperatorIdentity, List[Option[Schema]]] = { - operators - .map(operator => { - operator.operatorIdentifier -> operator.operatorInfo.inputPorts.map(inputPort => - operator.inputPortToSchemaMapping.get(inputPort.id) - ) - }) - .toMap - } - /** * Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi * @param errorList if given, put errors during resolving to it @@ -170,57 +117,4 @@ case class LogicalPlan( case _ => // Skip non-ScanSourceOpDesc operators } } - - def propagateWorkflowSchema( - context: WorkflowContext, - errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] - ): Unit = { - - operators.foreach(operator => { - if (operator.getContext == null) { - operator.setContext(context) - } - }) - - // propagate output schema following topological order - val topologicalOrderIterator = jgraphtDag.iterator() - topologicalOrderIterator.forEachRemaining(opId => { - val op = getOperator(opId) - val inputSchemas: Array[Option[Schema]] = if (op.isInstanceOf[SourceOperatorDescriptor]) { - Array() - } else { - op.operatorInfo.inputPorts - .flatMap(inputPort => { - links - .filter(link => link.toOpId == op.operatorIdentifier && link.toPortId == inputPort.id) - .map(link => { - val outputSchemaOpt = - getOperator(link.fromOpId).outputPortToSchemaMapping.get(link.fromPortId) - if (outputSchemaOpt.isDefined) { - op.inputPortToSchemaMapping(inputPort.id) = outputSchemaOpt.get - } - outputSchemaOpt - }) - }) - .toArray - } - - if (!inputSchemas.contains(None)) { - Try(op.getOutputSchemas(inputSchemas.map(_.get))) match { - case Success(outputSchemas) => - op.operatorInfo.outputPorts.foreach(outputPort => - op.outputPortToSchemaMapping(outputPort.id) = outputSchemas(outputPort.id.id) - ) - assert(outputSchemas.length == op.operatorInfo.outputPorts.length) - case Failure(err) => - logger.error("got error", err) - errorList match { - case Some(list) => list.append((opId, err)) - case None => - } - } - - } - }) - } } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 925fc5314a0..e374fac80c7 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.executor.OperatorExecutor import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} import edu.uci.ics.amber.operator.aggregate.AggregateOpDesc import edu.uci.ics.amber.operator.cartesianProduct.CartesianProductOpDesc import edu.uci.ics.amber.operator.dictionary.DictionaryMatcherOpDesc @@ -37,35 +37,7 @@ import edu.uci.ics.amber.operator.randomksampling.RandomKSamplingOpDesc import edu.uci.ics.amber.operator.regex.RegexOpDesc import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc import edu.uci.ics.amber.operator.sentiment.SentimentAnalysisOpDesc -import edu.uci.ics.amber.operator.sklearn.{ - SklearnAdaptiveBoostingOpDesc, - SklearnBaggingOpDesc, - SklearnBernoulliNaiveBayesOpDesc, - SklearnComplementNaiveBayesOpDesc, - SklearnDecisionTreeOpDesc, - SklearnDummyClassifierOpDesc, - SklearnExtraTreeOpDesc, - SklearnExtraTreesOpDesc, - SklearnGaussianNaiveBayesOpDesc, - SklearnGradientBoostingOpDesc, - SklearnKNNOpDesc, - SklearnLinearRegressionOpDesc, - SklearnLinearSVMOpDesc, - SklearnLogisticRegressionCVOpDesc, - SklearnLogisticRegressionOpDesc, - SklearnMultiLayerPerceptronOpDesc, - SklearnMultinomialNaiveBayesOpDesc, - SklearnNearestCentroidOpDesc, - SklearnPassiveAggressiveOpDesc, - SklearnPerceptronOpDesc, - SklearnPredictionOpDesc, - SklearnProbabilityCalibrationOpDesc, - SklearnRandomForestOpDesc, - SklearnRidgeCVOpDesc, - SklearnRidgeOpDesc, - SklearnSDGOpDesc, - SklearnSVMOpDesc -} +import edu.uci.ics.amber.operator.sklearn._ import edu.uci.ics.amber.operator.sort.SortOpDesc import edu.uci.ics.amber.operator.sortPartitions.SortPartitionsOpDesc import edu.uci.ics.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc @@ -75,6 +47,7 @@ import edu.uci.ics.amber.operator.source.apis.twitter.v2.{ } import edu.uci.ics.amber.operator.source.fetcher.URLFetcherOpDesc import edu.uci.ics.amber.operator.source.scan.FileScanSourceOpDesc +import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc import edu.uci.ics.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc import edu.uci.ics.amber.operator.source.scan.json.JSONLScanSourceOpDesc @@ -121,7 +94,6 @@ import edu.uci.ics.amber.operator.visualization.ternaryPlot.TernaryPlotOpDesc import edu.uci.ics.amber.operator.visualization.urlviz.UrlVizOpDesc import edu.uci.ics.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import edu.uci.ics.amber.operator.visualization.wordCloud.WordCloudOpDesc -import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.PortIdentity import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} @@ -306,14 +278,11 @@ trait StateTransferFunc ) abstract class LogicalOp extends PortDescriptor with Serializable { - @JsonIgnore - private var context: WorkflowContext = _ - @JsonProperty(PropertyNameConstants.OPERATOR_ID) private var operatorId: String = getClass.getSimpleName + "-" + UUID.randomUUID.toString @JsonProperty(PropertyNameConstants.OPERATOR_VERSION) - var operatorVersion: String = getOperatorVersion() + var operatorVersion: String = getOperatorVersion @JsonIgnore val inputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() @@ -334,7 +303,7 @@ abstract class LogicalOp extends PortDescriptor with Serializable { workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalPlan = { - new PhysicalPlan( + PhysicalPlan( operators = Set(getPhysicalOp(workflowId, executionId)), links = Set.empty ) @@ -344,7 +313,7 @@ abstract class LogicalOp extends PortDescriptor with Serializable { def getOutputSchema(schemas: Array[Schema]): Schema - private def getOperatorVersion(): String = { + private def getOperatorVersion: String = { val path = "core/amber/src/main/scala/" val operatorPath = path + this.getClass.getPackage.getName.replace(".", "/") OPVersion.getVersion(this.getClass.getSimpleName, operatorPath) @@ -361,12 +330,6 @@ abstract class LogicalOp extends PortDescriptor with Serializable { override def toString: String = ToStringBuilder.reflectionToString(this) - def getContext: WorkflowContext = this.context - - def setContext(workflowContext: WorkflowContext): Unit = { - this.context = workflowContext - } - def setOperatorId(id: String): Unit = { operatorId = id } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala index 3bb144b70a8..c40d736d135 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/hashJoin/HashJoinOpDesc.scala @@ -121,7 +121,7 @@ class HashJoinOpDesc[K] extends LogicalOp { ) ) - new PhysicalPlan( + PhysicalPlan( operators = Set(buildPhysicalOp, probePhysicalOp), links = Set( PhysicalLink( diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala index a77c67b9143..27a988d9f37 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sentiment/SentimentAnalysisOpDesc.scala @@ -57,8 +57,8 @@ class SentimentAnalysisOpDesc extends MapOpDesc { ) } - override def operatorInfo = - new OperatorInfo( + override def operatorInfo: OperatorInfo = + OperatorInfo( "Sentiment Analysis", "analysis the sentiment of a text using machine learning", OperatorGroupConstants.MACHINE_LEARNING_GROUP, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala index cb779ce96ee..80aee74cf63 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/ScanSourceOpDesc.scala @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty, JsonPropertyD import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor import edu.uci.ics.amber.workflow.OutputPort @@ -54,10 +53,6 @@ abstract class ScanSourceOpDesc extends SourceOperatorDescriptor { inferSchema() } - override def setContext(workflowContext: WorkflowContext): Unit = { - super.setContext(workflowContext) - } - override def operatorInfo: OperatorInfo = { OperatorInfo( userFriendlyName = s"${fileTypeName.getOrElse("Unknown")} File Scan", diff --git a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala index 5d4be121d08..06dd412b8b9 100644 --- a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala +++ b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala @@ -2,7 +2,6 @@ package edu.uci.ics.amber.operator.source.scan.csv import edu.uci.ics.amber.core.storage.FileResolver import edu.uci.ics.amber.core.tuple.{AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.workflow.PortIdentity @@ -11,7 +10,6 @@ import org.scalatest.flatspec.AnyFlatSpec class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { - val workflowContext = new WorkflowContext() var csvScanSourceOpDesc: CSVScanSourceOpDesc = _ var parallelCsvScanSourceOpDesc: ParallelCSVScanSourceOpDesc = _ before { @@ -28,7 +26,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesSmallCsvPath) parallelCsvScanSourceOpDesc.customDelimiter = Some(",") parallelCsvScanSourceOpDesc.hasHeader = true - parallelCsvScanSourceOpDesc.setContext(workflowContext) parallelCsvScanSourceOpDesc.setFileUri( FileResolver.resolve(parallelCsvScanSourceOpDesc.fileName.get) ) @@ -45,7 +42,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesHeaderlessSmallCsvPath) parallelCsvScanSourceOpDesc.customDelimiter = Some(",") parallelCsvScanSourceOpDesc.hasHeader = false - parallelCsvScanSourceOpDesc.setContext(workflowContext) parallelCsvScanSourceOpDesc.setFileUri( FileResolver.resolve(parallelCsvScanSourceOpDesc.fileName.get) ) @@ -62,7 +58,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesSmallMultiLineCsvPath) csvScanSourceOpDesc.customDelimiter = Some(",") csvScanSourceOpDesc.hasHeader = true - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -77,7 +72,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.fileName = Some(TestOperators.CountrySalesHeaderlessSmallCsvPath) csvScanSourceOpDesc.customDelimiter = Some(",") csvScanSourceOpDesc.hasHeader = false - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -93,7 +87,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { Some(TestOperators.CountrySalesSmallMultiLineCustomDelimiterCsvPath) csvScanSourceOpDesc.customDelimiter = Some(";") csvScanSourceOpDesc.hasHeader = false - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -109,7 +102,6 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { Some(TestOperators.CountrySalesSmallMultiLineCustomDelimiterCsvPath) csvScanSourceOpDesc.customDelimiter = Some(";") csvScanSourceOpDesc.hasHeader = false - csvScanSourceOpDesc.setContext(workflowContext) csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) assert(