From c111c9a9332871750d7c88413768f912e8a04096 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 21 May 2024 10:06:27 -0700 Subject: [PATCH 1/6] Add storage on output ports of operators with sinks added in the logical plan. --- .../deploysemantics/PhysicalOp.scala | 87 ++++++++++--------- .../messaginglayer/OutputManager.scala | 36 ++++++-- .../RegionExecutionCoordinator.scala | 3 +- .../scheduling/RegionPlanGenerator.scala | 9 ++ .../architecture/worker/DataProcessor.scala | 1 + .../ControlCommandConvertUtils.scala | 2 +- .../InitializeExecutorHandler.scala | 6 +- .../workflow/common/operators/LogicalOp.scala | 83 ++++++------------ .../common/storage/OpResultStorage.scala | 44 ++++++++++ .../common/workflow/PhysicalPlan.scala | 2 +- .../workflow/SinkInjectionTransformer.scala | 3 + .../workflow/WorkflowCacheRewriter.scala | 18 +++- 12 files changed, 186 insertions(+), 108 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala index 41531cb01fd..7d4f413074a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala @@ -5,32 +5,23 @@ import akka.remote.RemoteScope import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.engine.architecture.common.AkkaActorService import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution -import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.{ - OpExecInitInfo, - OpExecInitInfoWithCode -} -import edu.uci.ics.amber.engine.architecture.deploysemantics.locationpreference.{ - AddressInfo, - LocationPreference, - PreferController, - RoundRobinPreference -} +import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.{OpExecInitInfo, OpExecInitInfoWithCode} +import edu.uci.ics.amber.engine.architecture.deploysemantics.locationpreference.{AddressInfo, LocationPreference, PreferController, RoundRobinPreference} import edu.uci.ics.amber.engine.architecture.pythonworker.PythonWorkflowWorker import edu.uci.ics.amber.engine.architecture.scheduling.config.OperatorConfig import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker -import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{ - FaultToleranceConfig, - StateRestoreConfig, - WorkerReplayInitialization -} +import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{FaultToleranceConfig, StateRestoreConfig, WorkerReplayInitialization} import edu.uci.ics.amber.engine.common.VirtualIdentityUtils import edu.uci.ics.amber.engine.common.virtualidentity._ import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import edu.uci.ics.texera.workflow.common.workflow._ +import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageReader, SinkStorageWriter} import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} import org.jgrapht.traverse.TopologicalOrderIterator +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} @@ -158,34 +149,35 @@ object PhysicalOp { } case class PhysicalOp( - // the identifier of this PhysicalOp - id: PhysicalOpIdentity, - // the workflow id number - workflowId: WorkflowIdentity, - // the execution id number - executionId: ExecutionIdentity, - // information regarding initializing an operator executor instance - opExecInitInfo: OpExecInitInfo, - // preference of parallelism - parallelizable: Boolean = true, - // preference of worker placement - locationPreference: Option[LocationPreference] = None, - // requirement of partition policy (hash/range/single/none) on inputs - partitionRequirement: List[Option[PartitionInfo]] = List(), - // derive the output partition info given the input partitions - // if not specified, by default the output partition is the same as input partition - derivePartition: List[PartitionInfo] => PartitionInfo = inputParts => inputParts.head, - // input/output ports of the physical operator - // for operators with multiple input/output ports: must set these variables properly - inputPorts: Map[PortIdentity, (InputPort, List[PhysicalLink], Either[Throwable, Schema])] = + // the identifier of this PhysicalOp + id: PhysicalOpIdentity, + // the workflow id number + workflowId: WorkflowIdentity, + // the execution id number + executionId: ExecutionIdentity, + // information regarding initializing an operator executor instance + opExecInitInfo: OpExecInitInfo, + // preference of parallelism + parallelizable: Boolean = true, + // preference of worker placement + locationPreference: Option[LocationPreference] = None, + // requirement of partition policy (hash/range/single/none) on inputs + partitionRequirement: List[Option[PartitionInfo]] = List(), + // derive the output partition info given the input partitions + // if not specified, by default the output partition is the same as input partition + derivePartition: List[PartitionInfo] => PartitionInfo = inputParts => inputParts.head, + // input/output ports of the physical operator + // for operators with multiple input/output ports: must set these variables properly + inputPorts: Map[PortIdentity, (InputPort, List[PhysicalLink], Either[Throwable, Schema])] = Map.empty, - outputPorts: Map[PortIdentity, (OutputPort, List[PhysicalLink], Either[Throwable, Schema])] = + outputPorts: Map[PortIdentity, (OutputPort, List[PhysicalLink], Either[Throwable, Schema])] = Map.empty, - // schema propagation function - propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), - isOneToManyOp: Boolean = false, - // hint for number of workers - suggestedWorkerNum: Option[Int] = None + outputPortStorages: Map[PortIdentity, SinkStorageWriter] = Map.empty, + // schema propagation function + propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), + isOneToManyOp: Boolean = false, + // hint for number of workers + suggestedWorkerNum: Option[Int] = None ) extends LazyLogging { // all the "dependee" links are also blocking @@ -440,6 +432,19 @@ case class PhysicalOp( } } + def setOutputPortStorage(outputPortId: PortIdentity, storageWriter: SinkStorageWriter): PhysicalOp = { + this.copy(outputPortStorages = outputPortStorages.updated(outputPortId, storageWriter)) + } + + // TODO: Assuming logical and physical have the same ports for now. + // Need to include mapping from logical port to physical port eventually. + def setOutputPortStorages(logicalStorageMap: mutable.Map[PortIdentity, SinkStorageReader]): PhysicalOp = { + logicalStorageMap.foldLeft(this) { (currentOp, entry) => + val (outputPortId, storageReader) = entry + currentOp.setOutputPortStorage(outputPortId, storageReader.getStorageWriter) + } + } + /** * returns all output links. Optionally, if a specific portId is provided, returns the links connected to that portId. */ diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 6635d429bba..137eb0001ee 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -1,10 +1,6 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer -import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, - getBatchSize, - toPartitioner -} +import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner} import edu.uci.ics.amber.engine.architecture.sendsemantics.partitioners._ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._ import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} @@ -14,7 +10,9 @@ import edu.uci.ics.amber.engine.common.tuple.amber.{SchemaEnforceable, TupleLike import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.workflow.common.tuple.schema.Schema +import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageReader, SinkStorageWriter} +import scala.collection.immutable.Map import scala.collection.mutable object OutputManager { @@ -93,6 +91,8 @@ class OutputManager( private val ports: mutable.HashMap[PortIdentity, WorkerPort] = mutable.HashMap() + private var portStorages: Map[PortIdentity, SinkStorageWriter] = _ + private val networkOutputBuffers = mutable.HashMap[(PhysicalLink, ActorVirtualIdentity), NetworkOutputBuffer]() @@ -135,6 +135,16 @@ class OutputManager( networkOutputBuffers((link, partitioner.allReceivers(bucketIndex))).addTuple(tuple) } } + + // Save to storage + (outputPortId match { + case Some(portId) => portStorages.filter(_._1 == portId) + case None => portStorages + }).foreach { + case (portId, storageWriter) => + val tuple = tupleLike.enforceSchema(getPort(portId).schema) + storageWriter.putOne(tuple) + } } /** @@ -174,6 +184,11 @@ class OutputManager( def addPort(portId: PortIdentity, schema: Schema): Unit = { // each port can only be added and initialized once. + portStorages.filter(_._1 == portId).foreach{ + kv => { + kv._2.open() + } + } if (this.ports.contains(portId)) { return } @@ -183,6 +198,10 @@ class OutputManager( def getPort(portId: PortIdentity): WorkerPort = ports(portId) + def setPortStorage(portStorageMap: Map[PortIdentity, SinkStorageWriter]): Unit = { + this.portStorages = portStorageMap + } + def hasUnfinishedOutput: Boolean = outputIterator.hasNext def finalizeOutput(): Unit = { @@ -193,6 +212,13 @@ class OutputManager( outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) } + def closeOutputStorages(): Unit = { + this.portStorages.foreach { + case (_, storageWriter) => + storageWriter.close() + } + } + def getSingleOutputPortIdentity: PortIdentity = { assert(ports.size == 1) ports.head._1 diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 8c245834ebf..b7ad789b34f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -135,7 +135,8 @@ class RegionExecutionCoordinator( InitializeExecutor( workerConfigs.length, physicalOp.opExecInitInfo, - physicalOp.isSourceOperator + physicalOp.isSourceOperator, + physicalOp.outputPortStorages ), workerId ) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala index 34e0c80f32c..eb6c527767a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala @@ -138,6 +138,15 @@ abstract class RegionPlanGenerator( var newPhysicalPlan = physicalPlan .removeLink(physicalLink) + newPhysicalPlan = newPhysicalPlan.setOperator(fromOp.setOutputPortStorage( + fromPortId, + opResultStorage.createPortStorage( + s"${workflowContext.executionId}_", + getMatIdFromPhysicalLink(physicalLink), + OpResultStorage.defaultStorageMode + ).getStorageWriter + )) + // create cache writer and link val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index dd4f4f60e1d..446436351a3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -145,6 +145,7 @@ class DataProcessor( outputManager.emitEndOfUpstream() // Send Completed signal to worker actor. executor.close() + outputManager.closeOutputStorages() adaptiveBatchingMonitor.stopAdaptiveBatching() stateManager.transitTo(COMPLETED) logger.info( diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala index 1f4dda35884..5f1978ca783 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala @@ -51,7 +51,7 @@ object ControlCommandConvertUtils { QueryStatisticsV2() case QueryCurrentInputTuple() => QueryCurrentInputTupleV2() - case InitializeExecutor(_, opExecInitInfo, isSource) => + case InitializeExecutor(_, opExecInitInfo, isSource, _) => InitializeExecutorV2( opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0)._1, isSource diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index b1c8feca067..c637ac2ed22 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -6,12 +6,15 @@ import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerIniti import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.InitializeExecutorHandler.InitializeExecutor import edu.uci.ics.amber.engine.common.VirtualIdentityUtils import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand +import edu.uci.ics.amber.engine.common.workflow.PortIdentity +import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageWriter} object InitializeExecutorHandler { final case class InitializeExecutor( totalWorkerCount: Int, opExecInitInfo: OpExecInitInfo, - isSource: Boolean + isSource: Boolean, + portStorages: Map[PortIdentity, SinkStorageWriter] ) extends ControlCommand[Unit] } @@ -26,6 +29,7 @@ trait InitializeExecutorHandler { VirtualIdentityUtils.getWorkerIndex(actorId), msg.totalWorkerCount ) + dp.outputManager.setPortStorage(msg.portStorages) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala index cf0607f4c81..0a35c59f99a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala @@ -1,20 +1,10 @@ package edu.uci.ics.texera.workflow.common.operators import com.fasterxml.jackson.annotation.JsonSubTypes.Type -import com.fasterxml.jackson.annotation.{ - JsonIgnore, - JsonProperty, - JsonPropertyDescription, - JsonSubTypes, - JsonTypeInfo -} +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty, JsonPropertyDescription, JsonSubTypes, JsonTypeInfo} import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.engine.architecture.deploysemantics.PhysicalOp -import edu.uci.ics.amber.engine.common.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} +import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.texera.web.OPversion import edu.uci.ics.texera.workflow.common.metadata.{OperatorInfo, PropertyNameConstants} @@ -34,54 +24,19 @@ import edu.uci.ics.texera.workflow.operators.intersect.IntersectOpDesc import edu.uci.ics.texera.workflow.operators.intervalJoin.IntervalJoinOpDesc import edu.uci.ics.texera.workflow.operators.keywordSearch.KeywordSearchOpDesc import edu.uci.ics.texera.workflow.operators.limit.LimitOpDesc -import edu.uci.ics.texera.workflow.operators.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import edu.uci.ics.texera.workflow.operators.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import edu.uci.ics.texera.workflow.operators.projection.ProjectionOpDesc import edu.uci.ics.texera.workflow.operators.randomksampling.RandomKSamplingOpDesc import edu.uci.ics.texera.workflow.operators.regex.RegexOpDesc import edu.uci.ics.texera.workflow.operators.reservoirsampling.ReservoirSamplingOpDesc import edu.uci.ics.texera.workflow.operators.sentiment.SentimentAnalysisOpDesc import edu.uci.ics.texera.workflow.operators.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.texera.workflow.operators.sklearn.{ - SklearnAdaptiveBoostingOpDesc, - SklearnBaggingOpDesc, - SklearnBernoulliNaiveBayesOpDesc, - SklearnComplementNaiveBayesOpDesc, - SklearnDecisionTreeOpDesc, - SklearnDummyClassifierOpDesc, - SklearnExtraTreeOpDesc, - SklearnExtraTreesOpDesc, - SklearnGaussianNaiveBayesOpDesc, - SklearnGradientBoostingOpDesc, - SklearnKNNOpDesc, - SklearnLinearRegressionOpDesc, - SklearnLinearSVMOpDesc, - SklearnLogisticRegressionCVOpDesc, - SklearnLogisticRegressionOpDesc, - SklearnMultiLayerPerceptronOpDesc, - SklearnMultinomialNaiveBayesOpDesc, - SklearnNearestCentroidOpDesc, - SklearnPassiveAggressiveOpDesc, - SklearnPerceptronOpDesc, - SklearnPredictionOpDesc, - SklearnProbabilityCalibrationOpDesc, - SklearnRandomForestOpDesc, - SklearnRidgeCVOpDesc, - SklearnRidgeOpDesc, - SklearnSDGOpDesc, - SklearnSVMOpDesc -} +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageReader +import edu.uci.ics.texera.workflow.operators.sklearn.{SklearnAdaptiveBoostingOpDesc, SklearnBaggingOpDesc, SklearnBernoulliNaiveBayesOpDesc, SklearnComplementNaiveBayesOpDesc, SklearnDecisionTreeOpDesc, SklearnDummyClassifierOpDesc, SklearnExtraTreeOpDesc, SklearnExtraTreesOpDesc, SklearnGaussianNaiveBayesOpDesc, SklearnGradientBoostingOpDesc, SklearnKNNOpDesc, SklearnLinearRegressionOpDesc, SklearnLinearSVMOpDesc, SklearnLogisticRegressionCVOpDesc, SklearnLogisticRegressionOpDesc, SklearnMultiLayerPerceptronOpDesc, SklearnMultinomialNaiveBayesOpDesc, SklearnNearestCentroidOpDesc, SklearnPassiveAggressiveOpDesc, SklearnPerceptronOpDesc, SklearnPredictionOpDesc, SklearnProbabilityCalibrationOpDesc, SklearnRandomForestOpDesc, SklearnRidgeCVOpDesc, SklearnRidgeOpDesc, SklearnSDGOpDesc, SklearnSVMOpDesc} import edu.uci.ics.texera.workflow.operators.sort.SortOpDesc import edu.uci.ics.texera.workflow.operators.sortPartitions.SortPartitionsOpDesc import edu.uci.ics.texera.workflow.operators.source.apis.reddit.RedditSearchSourceOpDesc -import edu.uci.ics.texera.workflow.operators.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import edu.uci.ics.texera.workflow.operators.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import edu.uci.ics.texera.workflow.operators.source.fetcher.URLFetcherOpDesc import edu.uci.ics.texera.workflow.operators.source.scan.FileScanSourceOpDesc import edu.uci.ics.texera.workflow.operators.source.scan.csv.CSVScanSourceOpDesc @@ -96,12 +51,7 @@ import edu.uci.ics.texera.workflow.operators.symmetricDifference.SymmetricDiffer import edu.uci.ics.texera.workflow.operators.typecasting.TypeCastingOpDesc import edu.uci.ics.texera.workflow.operators.udf.java.JavaUDFOpDesc import edu.uci.ics.texera.workflow.operators.udf.python.source.PythonUDFSourceOpDescV2 -import edu.uci.ics.texera.workflow.operators.udf.python.{ - DualInputPortsPythonUDFOpDescV2, - PythonLambdaFunctionOpDesc, - PythonTableReducerOpDesc, - PythonUDFOpDescV2 -} +import edu.uci.ics.texera.workflow.operators.udf.python.{DualInputPortsPythonUDFOpDescV2, PythonLambdaFunctionOpDesc, PythonTableReducerOpDesc, PythonUDFOpDescV2} import edu.uci.ics.texera.workflow.operators.union.UnionOpDesc import edu.uci.ics.texera.workflow.operators.unneststring.UnnestStringOpDesc import edu.uci.ics.texera.workflow.operators.visualization.boxPlot.BoxPlotOpDesc @@ -294,6 +244,13 @@ abstract class LogicalOp extends PortDescriptor with Serializable { val inputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() @JsonIgnore val outputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() + + @JsonIgnore + val outputPortsWithStorage: mutable.Set[PortIdentity] = mutable.HashSet() + + @JsonIgnore + val outputPortToStorageMapping: mutable.Map[PortIdentity, SinkStorageReader] = mutable.HashMap() + def operatorIdentifier: OperatorIdentity = OperatorIdentity(operatorId) def getPhysicalOp( @@ -344,6 +301,18 @@ abstract class LogicalOp extends PortDescriptor with Serializable { operatorId = id } + def setOutputPortHasStorage(outputPortId: PortIdentity): Unit = { + this.outputPortsWithStorage.add(outputPortId) + } + + def setOutputPortStorage(outputPortId: PortIdentity, storage: SinkStorageReader): Unit = { + this.outputPortToStorageMapping(outputPortId) = storage + } + + def getOutputPortStorage(outputPortId: PortIdentity): SinkStorageReader = { + this.outputPortToStorageMapping(outputPortId) + } + def runtimeReconfiguration( workflowId: WorkflowIdentity, executionId: ExecutionIdentity, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala index efd914494c4..ef1576897e5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala @@ -26,6 +26,48 @@ class OpResultStorage extends Serializable with LazyLogging { val cache: ConcurrentHashMap[OperatorIdentity, SinkStorageReader] = new ConcurrentHashMap[OperatorIdentity, SinkStorageReader]() + val portStorage: ConcurrentHashMap[String, SinkStorageReader] = + new ConcurrentHashMap[String, SinkStorageReader]() + + def getPortStorage(key: String): SinkStorageReader = { + portStorage.get(key) + } + + def createPortStorage( + executionId: String = "", + key: String, + mode: String + ): SinkStorageReader = { + val storage: SinkStorageReader = + if (mode == "memory") { + new MemoryStorage + } else { + try { + new MongoDBSinkStorage(executionId + key) + } catch { + case t: Throwable => + logger.warn("Failed to create mongo storage", t) + logger.info(s"Fall back to memory storage for $key") + // fall back to memory + new MemoryStorage + } + } + portStorage.put(key, storage) + storage + } + + def containsPortStorage(key: String): Boolean = { + portStorage.contains(key) + } + + def removePortStorage(key: String): Unit = { + if (portStorage.contains(key)) { + portStorage.get(key).clear() + } + portStorage.remove(key) + } + + /** * Retrieve the result of an operator from OpResultStorage * @param key The key used for storage and retrieval. @@ -81,6 +123,8 @@ class OpResultStorage extends Serializable with LazyLogging { def close(): Unit = { cache.forEach((_, sinkStorageReader) => sinkStorageReader.clear()) cache.clear() + portStorage.forEach((_, sinkStorageReader)=>sinkStorageReader.clear()) + portStorage.clear() } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala index df16fe76be1..7a52f2dfd15 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala @@ -49,7 +49,7 @@ object PhysicalPlan { val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id) // Add the operator to the physical plan - physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema()) + physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema().setOutputPortStorages(logicalOp.outputPortToStorageMapping)) // Add all the links to the physical plan physicalPlan = (externalLinks ++ internalLinks) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala index 1e34daafac1..6394f2bc665 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala @@ -24,6 +24,9 @@ object SinkInjectionTransformer { operatorsToAddSink.foreach(opId => { val op = logicalPlan.getOperator(opId) op.operatorInfo.outputPorts.foreach(outPort => { + // Set storage on port + op.setOutputPortHasStorage(outPort.id) + // Add sink (TODO:deprecate) val sink = new ProgressiveSinkOpDesc() sink.setOperatorId("sink_" + opId.id) logicalPlan = logicalPlan diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala index 6dd8fd91b95..76282ae49c2 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala @@ -127,7 +127,23 @@ object WorkflowCacheRewriter { } storage.get(storageKey) - case _ => + case op => { + op.outputPortsWithStorage.foreach(outPortId => { + val storageKey = s"${op.operatorIdentifier}_outPort_${outPortId.id}" + // TODO: Ignoring viz for now. + val storageType = OpResultStorage.defaultStorageMode + // TODO: Ignoring reuse for now. + op.setOutputPortStorage(outPortId, storage.createPortStorage( + s"${op.getContext.executionId}_", + storageKey, + storageType + )) + op.getOutputPortStorage(outPortId).setSchema( + op.outputPortToSchemaMapping(outPortId) + ) + // TODO: Update result JSON in metadata + }) + } } // update execution entry in MySQL to have pointers to the mongo collections resultsJSON.set("results", sinksPointers) From 7eef86f60700866cfda4ef1f61d2d40716f5e794 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 21 May 2024 12:59:35 -0700 Subject: [PATCH 2/6] Remove changes to RPG. --- .../architecture/scheduling/RegionPlanGenerator.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala index eb6c527767a..34e0c80f32c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlanGenerator.scala @@ -138,15 +138,6 @@ abstract class RegionPlanGenerator( var newPhysicalPlan = physicalPlan .removeLink(physicalLink) - newPhysicalPlan = newPhysicalPlan.setOperator(fromOp.setOutputPortStorage( - fromPortId, - opResultStorage.createPortStorage( - s"${workflowContext.executionId}_", - getMatIdFromPhysicalLink(physicalLink), - OpResultStorage.defaultStorageMode - ).getStorageWriter - )) - // create cache writer and link val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink) From 29123f99c4987653848d2679f49ccd6429d4fefe Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 21 May 2024 10:06:27 -0700 Subject: [PATCH 3/6] Fix lint and format. --- .../deploysemantics/PhysicalOp.scala | 84 +++++++++++-------- .../messaginglayer/OutputManager.scala | 14 ++-- .../common/storage/OpResultStorage.scala | 5 +- .../common/workflow/PhysicalPlan.scala | 6 +- .../workflow/WorkflowCacheRewriter.scala | 18 ++-- .../architecture/worker/WorkerSpec.scala | 7 +- 6 files changed, 83 insertions(+), 51 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala index 7fb75284a2d..e1a744feee2 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala @@ -5,16 +5,27 @@ import akka.remote.RemoteScope import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.engine.architecture.common.AkkaActorService import edu.uci.ics.amber.engine.architecture.controller.execution.OperatorExecution -import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.{OpExecInitInfo, OpExecInitInfoWithCode} -import edu.uci.ics.amber.engine.architecture.deploysemantics.locationpreference.{AddressInfo, LocationPreference, PreferController, RoundRobinPreference} +import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.{ + OpExecInitInfo, + OpExecInitInfoWithCode +} +import edu.uci.ics.amber.engine.architecture.deploysemantics.locationpreference.{ + AddressInfo, + LocationPreference, + PreferController, + RoundRobinPreference +} import edu.uci.ics.amber.engine.architecture.pythonworker.PythonWorkflowWorker import edu.uci.ics.amber.engine.architecture.scheduling.config.OperatorConfig import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker -import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{FaultToleranceConfig, StateRestoreConfig, WorkerReplayInitialization} +import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{ + FaultToleranceConfig, + StateRestoreConfig, + WorkerReplayInitialization +} import edu.uci.ics.amber.engine.common.VirtualIdentityUtils import edu.uci.ics.amber.engine.common.virtualidentity._ import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} -import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import edu.uci.ics.texera.workflow.common.workflow._ import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageReader, SinkStorageWriter} @@ -149,35 +160,35 @@ object PhysicalOp { } case class PhysicalOp( - // the identifier of this PhysicalOp - id: PhysicalOpIdentity, - // the workflow id number - workflowId: WorkflowIdentity, - // the execution id number - executionId: ExecutionIdentity, - // information regarding initializing an operator executor instance - opExecInitInfo: OpExecInitInfo, - // preference of parallelism - parallelizable: Boolean = true, - // preference of worker placement - locationPreference: Option[LocationPreference] = None, - // requirement of partition policy (hash/range/single/none) on inputs - partitionRequirement: List[Option[PartitionInfo]] = List(), - // derive the output partition info given the input partitions - // if not specified, by default the output partition is the same as input partition - derivePartition: List[PartitionInfo] => PartitionInfo = inputParts => inputParts.head, - // input/output ports of the physical operator - // for operators with multiple input/output ports: must set these variables properly - inputPorts: Map[PortIdentity, (InputPort, List[PhysicalLink], Either[Throwable, Schema])] = + // the identifier of this PhysicalOp + id: PhysicalOpIdentity, + // the workflow id number + workflowId: WorkflowIdentity, + // the execution id number + executionId: ExecutionIdentity, + // information regarding initializing an operator executor instance + opExecInitInfo: OpExecInitInfo, + // preference of parallelism + parallelizable: Boolean = true, + // preference of worker placement + locationPreference: Option[LocationPreference] = None, + // requirement of partition policy (hash/range/single/none) on inputs + partitionRequirement: List[Option[PartitionInfo]] = List(), + // derive the output partition info given the input partitions + // if not specified, by default the output partition is the same as input partition + derivePartition: List[PartitionInfo] => PartitionInfo = inputParts => inputParts.head, + // input/output ports of the physical operator + // for operators with multiple input/output ports: must set these variables properly + inputPorts: Map[PortIdentity, (InputPort, List[PhysicalLink], Either[Throwable, Schema])] = Map.empty, - outputPorts: Map[PortIdentity, (OutputPort, List[PhysicalLink], Either[Throwable, Schema])] = + outputPorts: Map[PortIdentity, (OutputPort, List[PhysicalLink], Either[Throwable, Schema])] = Map.empty, - outputPortStorages: Map[PortIdentity, SinkStorageWriter] = Map.empty, - // schema propagation function - propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), - isOneToManyOp: Boolean = false, - // hint for number of workers - suggestedWorkerNum: Option[Int] = None + outputPortStorages: Map[PortIdentity, SinkStorageWriter] = Map.empty, + // schema propagation function + propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), + isOneToManyOp: Boolean = false, + // hint for number of workers + suggestedWorkerNum: Option[Int] = None ) extends LazyLogging { // all the "dependee" links are also blocking @@ -432,14 +443,19 @@ case class PhysicalOp( } } - def setOutputPortStorage(outputPortId: PortIdentity, storageWriter: SinkStorageWriter): PhysicalOp = { + def setOutputPortStorage( + outputPortId: PortIdentity, + storageWriter: SinkStorageWriter + ): PhysicalOp = { this.copy(outputPortStorages = outputPortStorages.updated(outputPortId, storageWriter)) } // TODO: Assuming logical and physical have the same ports for now. // Need to include mapping from logical port to physical port eventually. - def setOutputPortStorages(logicalStorageMap: mutable.Map[PortIdentity, SinkStorageReader]): PhysicalOp = { - logicalStorageMap.foldLeft(this) { (currentOp, entry) => + def setOutputPortStorages( + logicalStorageMap: mutable.Map[PortIdentity, SinkStorageReader] + ): PhysicalOp = { + logicalStorageMap.foldLeft(this) { (currentOp, entry) => val (outputPortId, storageReader) = entry currentOp.setOutputPortStorage(outputPortId, storageReader.getStorageWriter) } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 137eb0001ee..bbb75bbd56a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -1,6 +1,10 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer -import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner} +import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{ + DPOutputIterator, + getBatchSize, + toPartitioner +} import edu.uci.ics.amber.engine.architecture.sendsemantics.partitioners._ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._ import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} @@ -10,7 +14,7 @@ import edu.uci.ics.amber.engine.common.tuple.amber.{SchemaEnforceable, TupleLike import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.workflow.common.tuple.schema.Schema -import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageReader, SinkStorageWriter} +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter import scala.collection.immutable.Map import scala.collection.mutable @@ -139,7 +143,7 @@ class OutputManager( // Save to storage (outputPortId match { case Some(portId) => portStorages.filter(_._1 == portId) - case None => portStorages + case None => portStorages }).foreach { case (portId, storageWriter) => val tuple = tupleLike.enforceSchema(getPort(portId).schema) @@ -184,8 +188,8 @@ class OutputManager( def addPort(portId: PortIdentity, schema: Schema): Unit = { // each port can only be added and initialized once. - portStorages.filter(_._1 == portId).foreach{ - kv => { + portStorages.filter(_._1 == portId).foreach { kv => + { kv._2.open() } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala index ef1576897e5..e24897d8891 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala @@ -37,7 +37,7 @@ class OpResultStorage extends Serializable with LazyLogging { executionId: String = "", key: String, mode: String - ): SinkStorageReader = { + ): SinkStorageReader = { val storage: SinkStorageReader = if (mode == "memory") { new MemoryStorage @@ -67,7 +67,6 @@ class OpResultStorage extends Serializable with LazyLogging { portStorage.remove(key) } - /** * Retrieve the result of an operator from OpResultStorage * @param key The key used for storage and retrieval. @@ -123,7 +122,7 @@ class OpResultStorage extends Serializable with LazyLogging { def close(): Unit = { cache.forEach((_, sinkStorageReader) => sinkStorageReader.clear()) cache.clear() - portStorage.forEach((_, sinkStorageReader)=>sinkStorageReader.clear()) + portStorage.forEach((_, sinkStorageReader) => sinkStorageReader.clear()) portStorage.clear() } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala index 7a52f2dfd15..b40da52e255 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala @@ -49,7 +49,11 @@ object PhysicalPlan { val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id) // Add the operator to the physical plan - physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema().setOutputPortStorages(logicalOp.outputPortToStorageMapping)) + physicalPlan = physicalPlan.addOperator( + physicalOp + .propagateSchema() + .setOutputPortStorages(logicalOp.outputPortToStorageMapping) + ) // Add all the links to the physical plan physicalPlan = (externalLinks ++ internalLinks) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala index 76282ae49c2..98547fce963 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala @@ -133,14 +133,18 @@ object WorkflowCacheRewriter { // TODO: Ignoring viz for now. val storageType = OpResultStorage.defaultStorageMode // TODO: Ignoring reuse for now. - op.setOutputPortStorage(outPortId, storage.createPortStorage( - s"${op.getContext.executionId}_", - storageKey, - storageType - )) - op.getOutputPortStorage(outPortId).setSchema( - op.outputPortToSchemaMapping(outPortId) + op.setOutputPortStorage( + outPortId, + storage.createPortStorage( + s"${op.getContext.executionId}_", + storageKey, + storageType + ) ) + op.getOutputPortStorage(outPortId) + .setSchema( + op.outputPortToSchemaMapping(outPortId) + ) // TODO: Update result JSON in metadata }) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala index 99ed9c01590..3fadd42e39e 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala @@ -165,7 +165,12 @@ class WorkerSpec ) val initializeOperatorLogic = ControlInvocation( 4, - InitializeExecutor(1, OpExecInitInfo((_, _) => mockOpExecutor), isSource = false) + InitializeExecutor( + 1, + OpExecInitInfo((_, _) => mockOpExecutor), + isSource = false, + portStorages = null + ) ) sendControlToWorker( worker, From 64cc7ca27fe26a4196979df266e8140f4bd92640 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 28 May 2024 12:06:45 -0700 Subject: [PATCH 4/6] Add hasStorage to PortDescriptor. --- .../common/operators/PortDescriptor.scala | 3 ++- .../execute-workflow/execute-workflow.service.ts | 16 +++++++++++++++- .../workspace/types/workflow-common.interface.ts | 1 + 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala index 66a1ac28b00..99f04d0d9ce 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala @@ -9,7 +9,8 @@ case class PortDescription( allowMultiInputs: Boolean, isDynamicPort: Boolean, partitionRequirement: PartitionInfo, - dependencies: List[Int] = List.empty + dependencies: List[Int] = List.empty, + hasStorage: Boolean ) trait PortDescriptor { diff --git a/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index d28a991d70e..ca2a430c8fc 100644 --- a/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -26,6 +26,7 @@ import { exhaustiveGuard } from "../../../common/util/switch"; import { WorkflowStatusService } from "../workflow-status/workflow-status.service"; import { isDefined } from "../../../common/util/predicate"; import { intersection } from "../../../common/util/set"; +import { PortDescription } from "../../types/workflow-common.interface"; // TODO: change this declaration export const FORM_DEBOUNCE_TIME_MS = 150; @@ -169,6 +170,7 @@ export class ExecuteWorkflowService { this.workflowActionService.getTexeraGraph(), targetOperatorId ); + console.log(logicalPlan); this.resetExecutionState(); this.workflowStatusService.resetStatus(); this.sendExecutionRequest(executionName, logicalPlan); @@ -376,7 +378,19 @@ export class ExecuteWorkflowService { intersection(operatorIds, workflowGraph.getOperatorsMarkedForReuseResult()) ); - return { operators, links, opsToViewResult, opsToReuseResult }; + const terminalOpIds: string[] = Array.from(operatorIds).filter(opId => !links.some(link => link.fromOpId === opId)); + + const opsNeedingOutputStorage = new Set(opsToViewResult.concat(terminalOpIds)); + + const operatorsWithPortStorage: LogicalOperator[] = operators.map(op => ({ + ...op, + outputPorts: (op.outputPorts as PortDescription[]).map(outputPort => ({ + ...outputPort, + hasStorage: opsNeedingOutputStorage.has(op.operatorID), + })), + })); + + return { operators: operatorsWithPortStorage, links, opsToViewResult, opsToReuseResult }; } public getWorkerIds(operatorId: string): ReadonlyArray { diff --git a/core/gui/src/app/workspace/types/workflow-common.interface.ts b/core/gui/src/app/workspace/types/workflow-common.interface.ts index 69ef962ea01..9c9506120c3 100644 --- a/core/gui/src/app/workspace/types/workflow-common.interface.ts +++ b/core/gui/src/app/workspace/types/workflow-common.interface.ts @@ -40,6 +40,7 @@ export interface PortDescription isDynamicPort?: boolean; partitionRequirement?: PartitionInfo; dependencies?: { id: number; internal: boolean }[]; + hasStorage?: boolean; }> {} export interface OperatorPredicate From 839e63f96e608ddf2f8184de23f2b294e9cbf9e9 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Thu, 30 May 2024 11:20:04 -0700 Subject: [PATCH 5/6] assign storage in PhysicalPlan. --- .../ics/amber/engine/common/workflow.proto | 1 + .../controller/ControllerProcessor.scala | 3 +- .../deploysemantics/PhysicalOp.scala | 59 +++++++++++++------ .../messaginglayer/OutputManager.scala | 50 ++++++++-------- .../messaginglayer/WorkerPort.scala | 4 +- .../RegionExecutionCoordinator.scala | 30 +++++++--- .../WorkflowExecutionCoordinator.scala | 7 ++- .../ControlCommandConvertUtils.scala | 4 +- .../promisehandlers/AssignPortHandler.scala | 6 +- .../InitializeExecutorHandler.scala | 6 +- .../workflow/common/operators/LogicalOp.scala | 19 ------ .../common/operators/PortDescriptor.scala | 2 +- .../common/workflow/PhysicalPlan.scala | 9 ++- .../workflow/SinkInjectionTransformer.scala | 2 - .../workflow/WorkflowCacheRewriter.scala | 23 +------- .../common/workflow/WorkflowCompiler.scala | 2 +- .../filter/SpecializedFilterOpDesc.java | 2 +- .../sink/managed/ProgressiveSinkOpDesc.java | 2 +- .../typecasting/TypeCastingOpDesc.java | 2 +- .../source/PythonUDFSourceOpDescV2.java | 2 +- .../operators/udf/r/RUDFSourceOpDesc.java | 2 +- .../engine/common/workflow/OutputPort.scala | 42 +++++++++++-- .../common/workflow/WorkflowProto.scala | 17 +++--- 23 files changed, 166 insertions(+), 130 deletions(-) diff --git a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto index 5d86ee18728..7a2cab21aab 100644 --- a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto +++ b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto @@ -27,6 +27,7 @@ message OutputPort { PortIdentity id = 1 [(scalapb.field).no_box = true]; string displayName = 2; bool blocking = 3; + string storageLocation = 4; } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala index 8e48e7405d5..a5a419a7c9b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala @@ -29,7 +29,8 @@ class ControllerProcessor( () => this.workflowScheduler.getNextRegions, workflowExecution, controllerConfig, - asyncRPCClient + asyncRPCClient, + opResultStorage ) private val initializer = new ControllerAsyncRPCHandlerInitializer(this) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala index e1a744feee2..b92b014be81 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala @@ -26,13 +26,14 @@ import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{ import edu.uci.ics.amber.engine.common.VirtualIdentityUtils import edu.uci.ics.amber.engine.common.virtualidentity._ import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} +import edu.uci.ics.texera.workflow.common.WorkflowContext +import edu.uci.ics.texera.workflow.common.operators.LogicalOp +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import edu.uci.ics.texera.workflow.common.workflow._ -import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageReader, SinkStorageWriter} import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} import org.jgrapht.traverse.TopologicalOrderIterator -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} @@ -183,7 +184,6 @@ case class PhysicalOp( Map.empty, outputPorts: Map[PortIdentity, (OutputPort, List[PhysicalLink], Either[Throwable, Schema])] = Map.empty, - outputPortStorages: Map[PortIdentity, SinkStorageWriter] = Map.empty, // schema propagation function propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), isOneToManyOp: Boolean = false, @@ -443,22 +443,47 @@ case class PhysicalOp( } } - def setOutputPortStorage( - outputPortId: PortIdentity, - storageWriter: SinkStorageWriter + def assignOutputPortStorages( + logicalOp: LogicalOp, + context: WorkflowContext, + opResultStorageOptional: Option[OpResultStorage] ): PhysicalOp = { - this.copy(outputPortStorages = outputPortStorages.updated(outputPortId, storageWriter)) - } - - // TODO: Assuming logical and physical have the same ports for now. - // Need to include mapping from logical port to physical port eventually. - def setOutputPortStorages( - logicalStorageMap: mutable.Map[PortIdentity, SinkStorageReader] - ): PhysicalOp = { - logicalStorageMap.foldLeft(this) { (currentOp, entry) => - val (outputPortId, storageReader) = entry - currentOp.setOutputPortStorage(outputPortId, storageReader.getStorageWriter) + opResultStorageOptional match { + case Some(opResultStorage: OpResultStorage) => + logicalOp.outputPorts.zipWithIndex.foldLeft(this) { (currentOp, portWithIndex) => + { + if ( + portWithIndex._1.hasStorage && currentOp.outputPorts + .exists(pred => pred._1.id == portWithIndex._2) + ) { + val correspondingPortId = + currentOp.outputPorts.keys.filter(portId => portId.id == portWithIndex._2).head + val existingContent = outputPorts(correspondingPortId) + val storageKey = s"${currentOp.id}_outPort_${correspondingPortId.id}" + val storageType = OpResultStorage.defaultStorageMode + val createdStorageReader = opResultStorage.createPortStorage( + s"${context.executionId}_", + storageKey, + storageType + ) + existingContent._3 match { + case Left(_) => + case Right(schema) => createdStorageReader.setSchema(schema) + } + currentOp.copy(outputPorts = + outputPorts.updated( + correspondingPortId, + existingContent.copy(_1 = existingContent._1.copy(storageLocation = storageKey)) + ) + ) + } else { + currentOp + } + } + } + case _ => this } + } /** diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index bbb75bbd56a..69ea0eb2bc4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -16,7 +16,6 @@ import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter -import scala.collection.immutable.Map import scala.collection.mutable object OutputManager { @@ -95,8 +94,6 @@ class OutputManager( private val ports: mutable.HashMap[PortIdentity, WorkerPort] = mutable.HashMap() - private var portStorages: Map[PortIdentity, SinkStorageWriter] = _ - private val networkOutputBuffers = mutable.HashMap[(PhysicalLink, ActorVirtualIdentity), NetworkOutputBuffer]() @@ -141,14 +138,19 @@ class OutputManager( } // Save to storage + (outputPortId match { - case Some(portId) => portStorages.filter(_._1 == portId) - case None => portStorages - }).foreach { - case (portId, storageWriter) => - val tuple = tupleLike.enforceSchema(getPort(portId).schema) - storageWriter.putOne(tuple) - } + case Some(portId) => ports.filter(_._1 == portId) + case None => ports + }).foreach(kv => { + val portId = kv._1 + kv._2.storage match { + case Some(storageWriter) => + val tuple = tupleLike.enforceSchema(getPort(portId).schema) + storageWriter.putOne(tuple) + case None => + } + }) } /** @@ -186,26 +188,20 @@ class OutputManager( }) } - def addPort(portId: PortIdentity, schema: Schema): Unit = { + def addPort(portId: PortIdentity, schema: Schema, storage: Option[SinkStorageWriter]): Unit = { // each port can only be added and initialized once. - portStorages.filter(_._1 == portId).foreach { kv => - { - kv._2.open() - } - } if (this.ports.contains(portId)) { return } - this.ports(portId) = WorkerPort(schema) - + this.ports(portId) = WorkerPort(schema, storage = storage) + this.ports(portId).storage match { + case Some(storageWriter) => storageWriter.open() + case None => + } } def getPort(portId: PortIdentity): WorkerPort = ports(portId) - def setPortStorage(portStorageMap: Map[PortIdentity, SinkStorageWriter]): Unit = { - this.portStorages = portStorageMap - } - def hasUnfinishedOutput: Boolean = outputIterator.hasNext def finalizeOutput(): Unit = { @@ -217,10 +213,12 @@ class OutputManager( } def closeOutputStorages(): Unit = { - this.portStorages.foreach { - case (_, storageWriter) => - storageWriter.close() - } + this.ports.values.foreach(workerPort => { + workerPort.storage match { + case Some(storageWriter) => storageWriter.close() + case None => + } + }) } def getSingleOutputPortIdentity: PortIdentity = { diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala index 2144b3bc595..dc995a52f54 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala @@ -2,11 +2,13 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.engine.common.virtualidentity.ChannelIdentity import edu.uci.ics.texera.workflow.common.tuple.schema.Schema +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter import scala.collection.mutable case class WorkerPort( schema: Schema, // TODO: change it to manage the actual AmberFIFOChannel instead of Boolean - channels: mutable.HashMap[ChannelIdentity, Boolean] = mutable.HashMap() + channels: mutable.HashMap[ChannelIdentity, Boolean] = mutable.HashMap(), + storage: Option[SinkStorageWriter] = None ) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 7f9b45c2502..73f40539b9b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -22,13 +22,15 @@ import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER import edu.uci.ics.amber.engine.common.workflow.PhysicalLink import edu.uci.ics.texera.web.workflowruntimestate.WorkflowAggregatedState +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import scala.collection.Seq class RegionExecutionCoordinator( region: Region, workflowExecution: WorkflowExecution, asyncRPCClient: AsyncRPCClient, - controllerConfig: ControllerConfig + controllerConfig: ControllerConfig, + opResultStorage: OpResultStorage ) { def execute(actorService: AkkaActorService): Future[Unit] = { @@ -127,8 +129,7 @@ class RegionExecutionCoordinator( InitializeExecutor( workerConfigs.length, physicalOp.opExecInitInfo, - physicalOp.isSourceOperator, - physicalOp.outputPortStorages + physicalOp.isSourceOperator ), workerId ) @@ -145,23 +146,36 @@ class RegionExecutionCoordinator( val inputPortMapping = physicalOp.inputPorts .flatMap { case (inputPortId, (_, _, Right(schema))) => - Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> schema) + Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> (schema, None)) case _ => None } val outputPortMapping = physicalOp.outputPorts .flatMap { - case (outputPortId, (_, _, Right(schema))) => - Some(GlobalPortIdentity(physicalOp.id, outputPortId, input = false) -> schema) + case (outputPortId, (outputPort, _, Right(schema))) => + Some( + GlobalPortIdentity( + physicalOp.id, + outputPortId, + input = false + ) -> (schema, outputPort.storageLocation match { + case "" => None + case location => Some(opResultStorage.getPortStorage(location).getStorageWriter) + case _ => None + }) + ) case _ => None } inputPortMapping ++ outputPortMapping } .flatMap { - case (globalPortId, schema) => + case (globalPortId, (schema, storage)) => resourceConfig.operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId).map { workerId => asyncRPCClient - .send(AssignPort(globalPortId.portId, globalPortId.input, schema), workerId) + .send( + AssignPort(globalPortId.portId, globalPortId.input, schema, storage), + workerId + ) } } .toSeq diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 7f76932cfd0..5e2811c2ce6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -7,6 +7,7 @@ import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig import edu.uci.ics.amber.engine.architecture.controller.execution.WorkflowExecution import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient import edu.uci.ics.amber.engine.common.workflow.PhysicalLink +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import scala.collection.mutable @@ -14,7 +15,8 @@ class WorkflowExecutionCoordinator( getNextRegions: () => Set[Region], workflowExecution: WorkflowExecution, controllerConfig: ControllerConfig, - asyncRPCClient: AsyncRPCClient + asyncRPCClient: AsyncRPCClient, + opResultStorage: OpResultStorage ) extends LazyLogging { private val executedRegions: mutable.ListBuffer[Set[Region]] = mutable.ListBuffer() @@ -41,7 +43,8 @@ class WorkflowExecutionCoordinator( region, workflowExecution, asyncRPCClient, - controllerConfig + controllerConfig, + opResultStorage ) regionExecutionCoordinators(region.id) }) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala index 2a36359136a..be390b9114b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala @@ -40,7 +40,7 @@ object ControlCommandConvertUtils { ResumeWorkerV2() case OpenExecutor() => OpenExecutorV2() - case AssignPort(portId, input, schema) => + case AssignPort(portId, input, schema, _) => AssignPortV2(portId, input, schema.toRawSchema) case AddPartitioning(tag: PhysicalLink, partitioning: Partitioning) => AddPartitioningV2(tag, partitioning) @@ -50,7 +50,7 @@ object ControlCommandConvertUtils { QueryStatisticsV2() case QueryCurrentInputTuple() => QueryCurrentInputTupleV2() - case InitializeExecutor(_, opExecInitInfo, isSource, _) => + case InitializeExecutor(_, opExecInitInfo, isSource) => val (code, language) = opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0) InitializeExecutorV2( code, diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index f094880438e..7128e794315 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -5,13 +5,15 @@ import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AssignPortHa import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.texera.workflow.common.tuple.schema.Schema +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter object AssignPortHandler { final case class AssignPort( portId: PortIdentity, input: Boolean, - schema: Schema + schema: Schema, + storage: Option[SinkStorageWriter] ) extends ControlCommand[Unit] } @@ -22,7 +24,7 @@ trait AssignPortHandler { if (msg.input) { dp.inputManager.addPort(msg.portId, msg.schema) } else { - dp.outputManager.addPort(msg.portId, msg.schema) + dp.outputManager.addPort(msg.portId, msg.schema, msg.storage) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index c637ac2ed22..b1c8feca067 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -6,15 +6,12 @@ import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerIniti import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.InitializeExecutorHandler.InitializeExecutor import edu.uci.ics.amber.engine.common.VirtualIdentityUtils import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand -import edu.uci.ics.amber.engine.common.workflow.PortIdentity -import edu.uci.ics.texera.workflow.operators.sink.storage.{SinkStorageWriter} object InitializeExecutorHandler { final case class InitializeExecutor( totalWorkerCount: Int, opExecInitInfo: OpExecInitInfo, - isSource: Boolean, - portStorages: Map[PortIdentity, SinkStorageWriter] + isSource: Boolean ) extends ControlCommand[Unit] } @@ -29,7 +26,6 @@ trait InitializeExecutorHandler { VirtualIdentityUtils.getWorkerIndex(actorId), msg.totalWorkerCount ) - dp.outputManager.setPortStorage(msg.portStorages) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala index 92dd2e13a20..cc4824e8b43 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala @@ -47,7 +47,6 @@ import edu.uci.ics.texera.workflow.operators.regex.RegexOpDesc import edu.uci.ics.texera.workflow.operators.reservoirsampling.ReservoirSamplingOpDesc import edu.uci.ics.texera.workflow.operators.sentiment.SentimentAnalysisOpDesc import edu.uci.ics.texera.workflow.operators.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageReader import edu.uci.ics.texera.workflow.operators.sklearn.{ SklearnAdaptiveBoostingOpDesc, SklearnBaggingOpDesc, @@ -300,12 +299,6 @@ abstract class LogicalOp extends PortDescriptor with Serializable { @JsonIgnore val outputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() - @JsonIgnore - val outputPortsWithStorage: mutable.Set[PortIdentity] = mutable.HashSet() - - @JsonIgnore - val outputPortToStorageMapping: mutable.Map[PortIdentity, SinkStorageReader] = mutable.HashMap() - def operatorIdentifier: OperatorIdentity = OperatorIdentity(operatorId) def getPhysicalOp( @@ -356,18 +349,6 @@ abstract class LogicalOp extends PortDescriptor with Serializable { operatorId = id } - def setOutputPortHasStorage(outputPortId: PortIdentity): Unit = { - this.outputPortsWithStorage.add(outputPortId) - } - - def setOutputPortStorage(outputPortId: PortIdentity, storage: SinkStorageReader): Unit = { - this.outputPortToStorageMapping(outputPortId) = storage - } - - def getOutputPortStorage(outputPortId: PortIdentity): SinkStorageReader = { - this.outputPortToStorageMapping(outputPortId) - } - def runtimeReconfiguration( workflowId: WorkflowIdentity, executionId: ExecutionIdentity, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala index 99f04d0d9ce..13a3aca64c5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala @@ -18,5 +18,5 @@ trait PortDescriptor { var inputPorts: List[PortDescription] = null @JsonProperty(required = false) - var outputPorts: List[PortDescription] = null + var outputPorts: List[PortDescription] = List.empty } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala index b40da52e255..a5d60f0e2d8 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala @@ -10,6 +10,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.{ } import edu.uci.ics.amber.engine.common.workflow.PhysicalLink import edu.uci.ics.texera.workflow.common.WorkflowContext +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import org.jgrapht.alg.connectivity.BiconnectivityInspector import org.jgrapht.alg.shortestpath.AllDirectedPaths import org.jgrapht.graph.DirectedAcyclicGraph @@ -20,7 +21,11 @@ import scala.jdk.CollectionConverters.{IteratorHasAsScala, ListHasAsScala, SetHa object PhysicalPlan { - def apply(context: WorkflowContext, logicalPlan: LogicalPlan): PhysicalPlan = { + def apply( + context: WorkflowContext, + logicalPlan: LogicalPlan, + opResultStorage: Option[OpResultStorage] = None + ): PhysicalPlan = { var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty) @@ -52,7 +57,7 @@ object PhysicalPlan { physicalPlan = physicalPlan.addOperator( physicalOp .propagateSchema() - .setOutputPortStorages(logicalOp.outputPortToStorageMapping) + .assignOutputPortStorages(logicalOp, context, opResultStorage) ) // Add all the links to the physical plan diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala index 6394f2bc665..97bf3181b7d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala @@ -24,8 +24,6 @@ object SinkInjectionTransformer { operatorsToAddSink.foreach(opId => { val op = logicalPlan.getOperator(opId) op.operatorInfo.outputPorts.foreach(outPort => { - // Set storage on port - op.setOutputPortHasStorage(outPort.id) // Add sink (TODO:deprecate) val sink = new ProgressiveSinkOpDesc() sink.setOperatorId("sink_" + opId.id) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala index 98547fce963..65aa4c2e385 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala @@ -126,28 +126,7 @@ object WorkflowCacheRewriter { sinksPointers.add(storageNode) } storage.get(storageKey) - - case op => { - op.outputPortsWithStorage.foreach(outPortId => { - val storageKey = s"${op.operatorIdentifier}_outPort_${outPortId.id}" - // TODO: Ignoring viz for now. - val storageType = OpResultStorage.defaultStorageMode - // TODO: Ignoring reuse for now. - op.setOutputPortStorage( - outPortId, - storage.createPortStorage( - s"${op.getContext.executionId}_", - storageKey, - storageType - ) - ) - op.getOutputPortStorage(outPortId) - .setSchema( - op.outputPortToSchemaMapping(outPortId) - ) - // TODO: Update result JSON in metadata - }) - } + case _ => } // update execution entry in MySQL to have pointers to the mongo collections resultsJSON.set("results", sinksPointers) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala index df60b4d3326..bb84abb97e7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala @@ -85,7 +85,7 @@ class WorkflowCompiler( ) // the PhysicalPlan with topology expanded. - val physicalPlan = PhysicalPlan(context, rewrittenLogicalPlan) + val physicalPlan = PhysicalPlan(context, rewrittenLogicalPlan, Some(opResultStorage)) Workflow( context, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java index 2b43aa23383..9c02e91a8ef 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java @@ -49,7 +49,7 @@ public OperatorInfo operatorInfo() { "Performs a filter operation", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, true, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java index b73c5b32eb1..74bdd5b44de 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java @@ -110,7 +110,7 @@ public OperatorInfo operatorInfo() { "View the results", OperatorGroupConstants.UTILITY_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, false, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java index a624419b920..91400637466 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java @@ -77,7 +77,7 @@ public OperatorInfo operatorInfo() { "Cast between types", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, false, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java index f3196d95973..b4757c8f61f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java @@ -100,7 +100,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in Python script", OperatorGroupConstants.PYTHON_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, true, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java index 58fe4977211..8c3b96812bb 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java @@ -95,7 +95,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in R script", OperatorGroupConstants.R_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, false, diff --git a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala index 6a495c698b3..a1d336b70cb 100644 --- a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala +++ b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala @@ -9,7 +9,8 @@ package edu.uci.ics.amber.engine.common.workflow final case class OutputPort( id: edu.uci.ics.amber.engine.common.workflow.PortIdentity = edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance, displayName: _root_.scala.Predef.String = "", - blocking: _root_.scala.Boolean = false + blocking: _root_.scala.Boolean = false, + storageLocation: _root_.scala.Predef.String = "" ) extends scalapb.GeneratedMessage with scalapb.lenses.Updatable[OutputPort] { @transient private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0 @@ -36,6 +37,13 @@ final case class OutputPort( __size += _root_.com.google.protobuf.CodedOutputStream.computeBoolSize(3, __value) } }; + + { + val __value = storageLocation + if (!__value.isEmpty) { + __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(4, __value) + } + }; __size } override def serializedSize: _root_.scala.Int = { @@ -67,10 +75,17 @@ final case class OutputPort( _output__.writeBool(3, __v) } }; + { + val __v = storageLocation + if (!__v.isEmpty) { + _output__.writeString(4, __v) + } + }; } def withId(__v: edu.uci.ics.amber.engine.common.workflow.PortIdentity): OutputPort = copy(id = __v) def withDisplayName(__v: _root_.scala.Predef.String): OutputPort = copy(displayName = __v) def withBlocking(__v: _root_.scala.Boolean): OutputPort = copy(blocking = __v) + def withStorageLocation(__v: _root_.scala.Predef.String): OutputPort = copy(storageLocation = __v) def getFieldByNumber(__fieldNumber: _root_.scala.Int): _root_.scala.Any = { (__fieldNumber: @_root_.scala.unchecked) match { case 1 => { @@ -85,6 +100,10 @@ final case class OutputPort( val __t = blocking if (__t != false) __t else null } + case 4 => { + val __t = storageLocation + if (__t != "") __t else null + } } } def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = { @@ -93,6 +112,7 @@ final case class OutputPort( case 1 => id.toPMessage case 2 => _root_.scalapb.descriptors.PString(displayName) case 3 => _root_.scalapb.descriptors.PBoolean(blocking) + case 4 => _root_.scalapb.descriptors.PString(storageLocation) } } def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToSingleLineUnicodeString(this) @@ -106,6 +126,7 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en var __id: _root_.scala.Option[edu.uci.ics.amber.engine.common.workflow.PortIdentity] = _root_.scala.None var __displayName: _root_.scala.Predef.String = "" var __blocking: _root_.scala.Boolean = false + var __storageLocation: _root_.scala.Predef.String = "" var _done__ = false while (!_done__) { val _tag__ = _input__.readTag() @@ -117,13 +138,16 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en __displayName = _input__.readStringRequireUtf8() case 24 => __blocking = _input__.readBool() + case 34 => + __storageLocation = _input__.readStringRequireUtf8() case tag => _input__.skipField(tag) } } edu.uci.ics.amber.engine.common.workflow.OutputPort( id = __id.getOrElse(edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance), displayName = __displayName, - blocking = __blocking + blocking = __blocking, + storageLocation = __storageLocation ) } implicit def messageReads: _root_.scalapb.descriptors.Reads[edu.uci.ics.amber.engine.common.workflow.OutputPort] = _root_.scalapb.descriptors.Reads{ @@ -132,7 +156,8 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en edu.uci.ics.amber.engine.common.workflow.OutputPort( id = __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[edu.uci.ics.amber.engine.common.workflow.PortIdentity]).getOrElse(edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance), displayName = __fieldsMap.get(scalaDescriptor.findFieldByNumber(2).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), - blocking = __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).map(_.as[_root_.scala.Boolean]).getOrElse(false) + blocking = __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).map(_.as[_root_.scala.Boolean]).getOrElse(false), + storageLocation = __fieldsMap.get(scalaDescriptor.findFieldByNumber(4).get).map(_.as[_root_.scala.Predef.String]).getOrElse("") ) case _ => throw new RuntimeException("Expected PMessage") } @@ -150,24 +175,29 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en lazy val defaultInstance = edu.uci.ics.amber.engine.common.workflow.OutputPort( id = edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance, displayName = "", - blocking = false + blocking = false, + storageLocation = "" ) implicit class OutputPortLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, edu.uci.ics.amber.engine.common.workflow.OutputPort]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, edu.uci.ics.amber.engine.common.workflow.OutputPort](_l) { def id: _root_.scalapb.lenses.Lens[UpperPB, edu.uci.ics.amber.engine.common.workflow.PortIdentity] = field(_.id)((c_, f_) => c_.copy(id = f_)) def displayName: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.displayName)((c_, f_) => c_.copy(displayName = f_)) def blocking: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Boolean] = field(_.blocking)((c_, f_) => c_.copy(blocking = f_)) + def storageLocation: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.storageLocation)((c_, f_) => c_.copy(storageLocation = f_)) } final val ID_FIELD_NUMBER = 1 final val DISPLAYNAME_FIELD_NUMBER = 2 final val BLOCKING_FIELD_NUMBER = 3 + final val STORAGELOCATION_FIELD_NUMBER = 4 def of( id: edu.uci.ics.amber.engine.common.workflow.PortIdentity, displayName: _root_.scala.Predef.String, - blocking: _root_.scala.Boolean + blocking: _root_.scala.Boolean, + storageLocation: _root_.scala.Predef.String ): _root_.edu.uci.ics.amber.engine.common.workflow.OutputPort = _root_.edu.uci.ics.amber.engine.common.workflow.OutputPort( id, displayName, - blocking + blocking, + storageLocation ) // @@protoc_insertion_point(GeneratedMessageCompanion[edu.uci.ics.amber.engine.common.OutputPort]) } diff --git a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala index 2d092df178b..fc94034104b 100644 --- a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala +++ b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala @@ -26,15 +26,16 @@ object WorkflowProto extends _root_.scalapb.GeneratedFileObject { 29tbW9uLlBvcnRJZGVudGl0eUIK4j8HEgJpZPABAVICaWQSMgoLZGlzcGxheU5hbWUYAiABKAlCEOI/DRILZGlzcGxheU5hbWVSC 2Rpc3BsYXlOYW1lEj4KD2FsbG93TXVsdGlMaW5rcxgDIAEoCEIU4j8REg9hbGxvd011bHRpTGlua3NSD2FsbG93TXVsdGlMaW5rc xJkCgxkZXBlbmRlbmNpZXMYBCADKAsyLS5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBvcnRJZGVudGl0eUIR4j8OE - gxkZXBlbmRlbmNpZXNSDGRlcGVuZGVuY2llcyK2AQoKT3V0cHV0UG9ydBJJCgJpZBgBIAEoCzItLmVkdS51Y2kuaWNzLmFtYmVyL + gxkZXBlbmRlbmNpZXNSDGRlcGVuZGVuY2llcyL2AQoKT3V0cHV0UG9ydBJJCgJpZBgBIAEoCzItLmVkdS51Y2kuaWNzLmFtYmVyL mVuZ2luZS5jb21tb24uUG9ydElkZW50aXR5QgriPwcSAmlk8AEBUgJpZBIyCgtkaXNwbGF5TmFtZRgCIAEoCUIQ4j8NEgtkaXNwb - GF5TmFtZVILZGlzcGxheU5hbWUSKQoIYmxvY2tpbmcYAyABKAhCDeI/ChIIYmxvY2tpbmdSCGJsb2NraW5nIo4DCgxQaHlzaWNhb - ExpbmsSYQoIZnJvbU9wSWQYASABKAsyMy5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBoeXNpY2FsT3BJZGVudGl0e - UIQ4j8NEghmcm9tT3BJZPABAVIIZnJvbU9wSWQSYQoKZnJvbVBvcnRJZBgCIAEoCzItLmVkdS51Y2kuaWNzLmFtYmVyLmVuZ2luZ - S5jb21tb24uUG9ydElkZW50aXR5QhLiPw8SCmZyb21Qb3J0SWTwAQFSCmZyb21Qb3J0SWQSWwoGdG9PcElkGAMgASgLMjMuZWR1L - nVjaS5pY3MuYW1iZXIuZW5naW5lLmNvbW1vbi5QaHlzaWNhbE9wSWRlbnRpdHlCDuI/CxIGdG9PcElk8AEBUgZ0b09wSWQSWwoId - G9Qb3J0SWQYBCABKAsyLS5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBvcnRJZGVudGl0eUIQ4j8NEgh0b1BvcnRJZ - PABAVIIdG9Qb3J0SWRCCeI/BkgAWAB4AGIGcHJvdG8z""" + GF5TmFtZVILZGlzcGxheU5hbWUSKQoIYmxvY2tpbmcYAyABKAhCDeI/ChIIYmxvY2tpbmdSCGJsb2NraW5nEj4KD3N0b3JhZ2VMb + 2NhdGlvbhgEIAEoCUIU4j8REg9zdG9yYWdlTG9jYXRpb25SD3N0b3JhZ2VMb2NhdGlvbiKOAwoMUGh5c2ljYWxMaW5rEmEKCGZyb + 21PcElkGAEgASgLMjMuZWR1LnVjaS5pY3MuYW1iZXIuZW5naW5lLmNvbW1vbi5QaHlzaWNhbE9wSWRlbnRpdHlCEOI/DRIIZnJvb + U9wSWTwAQFSCGZyb21PcElkEmEKCmZyb21Qb3J0SWQYAiABKAsyLS5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBvc + nRJZGVudGl0eUIS4j8PEgpmcm9tUG9ydElk8AEBUgpmcm9tUG9ydElkElsKBnRvT3BJZBgDIAEoCzIzLmVkdS51Y2kuaWNzLmFtY + mVyLmVuZ2luZS5jb21tb24uUGh5c2ljYWxPcElkZW50aXR5Qg7iPwsSBnRvT3BJZPABAVIGdG9PcElkElsKCHRvUG9ydElkGAQgA + SgLMi0uZWR1LnVjaS5pY3MuYW1iZXIuZW5naW5lLmNvbW1vbi5Qb3J0SWRlbnRpdHlCEOI/DRIIdG9Qb3J0SWTwAQFSCHRvUG9yd + ElkQgniPwZIAFgAeABiBnByb3RvMw==""" ).mkString) lazy val scalaDescriptor: _root_.scalapb.descriptors.FileDescriptor = { val scalaProto = com.google.protobuf.descriptor.FileDescriptorProto.parseFrom(ProtoBytes) From 582a8dbc8db7a222bde4e8e4eb8236a7019887a9 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Thu, 30 May 2024 11:40:44 -0700 Subject: [PATCH 6/6] revert unrelated changes. --- .../ics/texera/workflow/common/operators/LogicalOp.scala | 1 - .../common/workflow/SinkInjectionTransformer.scala | 1 - .../workflow/common/workflow/WorkflowCacheRewriter.scala | 1 + .../ics/amber/engine/architecture/worker/WorkerSpec.scala | 7 +------ 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala index cc4824e8b43..0136d740c8c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala @@ -298,7 +298,6 @@ abstract class LogicalOp extends PortDescriptor with Serializable { val inputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() @JsonIgnore val outputPortToSchemaMapping: mutable.Map[PortIdentity, Schema] = mutable.HashMap() - def operatorIdentifier: OperatorIdentity = OperatorIdentity(operatorId) def getPhysicalOp( diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala index 97bf3181b7d..1e34daafac1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/SinkInjectionTransformer.scala @@ -24,7 +24,6 @@ object SinkInjectionTransformer { operatorsToAddSink.foreach(opId => { val op = logicalPlan.getOperator(opId) op.operatorInfo.outputPorts.foreach(outPort => { - // Add sink (TODO:deprecate) val sink = new ProgressiveSinkOpDesc() sink.setOperatorId("sink_" + opId.id) logicalPlan = logicalPlan diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala index 65aa4c2e385..6dd8fd91b95 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCacheRewriter.scala @@ -126,6 +126,7 @@ object WorkflowCacheRewriter { sinksPointers.add(storageNode) } storage.get(storageKey) + case _ => } // update execution entry in MySQL to have pointers to the mongo collections diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala index 3fadd42e39e..99ed9c01590 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala @@ -165,12 +165,7 @@ class WorkerSpec ) val initializeOperatorLogic = ControlInvocation( 4, - InitializeExecutor( - 1, - OpExecInitInfo((_, _) => mockOpExecutor), - isSource = false, - portStorages = null - ) + InitializeExecutor(1, OpExecInitInfo((_, _) => mockOpExecutor), isSource = false) ) sendControlToWorker( worker,