diff --git a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto index 5d86ee18728..7a2cab21aab 100644 --- a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto +++ b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/workflow.proto @@ -27,6 +27,7 @@ message OutputPort { PortIdentity id = 1 [(scalapb.field).no_box = true]; string displayName = 2; bool blocking = 3; + string storageLocation = 4; } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala index 8e48e7405d5..a5a419a7c9b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ControllerProcessor.scala @@ -29,7 +29,8 @@ class ControllerProcessor( () => this.workflowScheduler.getNextRegions, workflowExecution, controllerConfig, - asyncRPCClient + asyncRPCClient, + opResultStorage ) private val initializer = new ControllerAsyncRPCHandlerInitializer(this) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala index 38af9cd6332..b92b014be81 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/deploysemantics/PhysicalOp.scala @@ -26,6 +26,9 @@ import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{ import edu.uci.ics.amber.engine.common.VirtualIdentityUtils import edu.uci.ics.amber.engine.common.virtualidentity._ import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} +import edu.uci.ics.texera.workflow.common.WorkflowContext +import edu.uci.ics.texera.workflow.common.operators.LogicalOp +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import edu.uci.ics.texera.workflow.common.workflow._ import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} @@ -440,6 +443,49 @@ case class PhysicalOp( } } + def assignOutputPortStorages( + logicalOp: LogicalOp, + context: WorkflowContext, + opResultStorageOptional: Option[OpResultStorage] + ): PhysicalOp = { + opResultStorageOptional match { + case Some(opResultStorage: OpResultStorage) => + logicalOp.outputPorts.zipWithIndex.foldLeft(this) { (currentOp, portWithIndex) => + { + if ( + portWithIndex._1.hasStorage && currentOp.outputPorts + .exists(pred => pred._1.id == portWithIndex._2) + ) { + val correspondingPortId = + currentOp.outputPorts.keys.filter(portId => portId.id == portWithIndex._2).head + val existingContent = outputPorts(correspondingPortId) + val storageKey = s"${currentOp.id}_outPort_${correspondingPortId.id}" + val storageType = OpResultStorage.defaultStorageMode + val createdStorageReader = opResultStorage.createPortStorage( + s"${context.executionId}_", + storageKey, + storageType + ) + existingContent._3 match { + case Left(_) => + case Right(schema) => createdStorageReader.setSchema(schema) + } + currentOp.copy(outputPorts = + outputPorts.updated( + correspondingPortId, + existingContent.copy(_1 = existingContent._1.copy(storageLocation = storageKey)) + ) + ) + } else { + currentOp + } + } + } + case _ => this + } + + } + /** * returns all output links. Optionally, if a specific portId is provided, returns the links connected to that portId. */ diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 6635d429bba..69ea0eb2bc4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -14,6 +14,7 @@ import edu.uci.ics.amber.engine.common.tuple.amber.{SchemaEnforceable, TupleLike import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.workflow.common.tuple.schema.Schema +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter import scala.collection.mutable @@ -135,6 +136,21 @@ class OutputManager( networkOutputBuffers((link, partitioner.allReceivers(bucketIndex))).addTuple(tuple) } } + + // Save to storage + + (outputPortId match { + case Some(portId) => ports.filter(_._1 == portId) + case None => ports + }).foreach(kv => { + val portId = kv._1 + kv._2.storage match { + case Some(storageWriter) => + val tuple = tupleLike.enforceSchema(getPort(portId).schema) + storageWriter.putOne(tuple) + case None => + } + }) } /** @@ -172,13 +188,16 @@ class OutputManager( }) } - def addPort(portId: PortIdentity, schema: Schema): Unit = { + def addPort(portId: PortIdentity, schema: Schema, storage: Option[SinkStorageWriter]): Unit = { // each port can only be added and initialized once. if (this.ports.contains(portId)) { return } - this.ports(portId) = WorkerPort(schema) - + this.ports(portId) = WorkerPort(schema, storage = storage) + this.ports(portId).storage match { + case Some(storageWriter) => storageWriter.open() + case None => + } } def getPort(portId: PortIdentity): WorkerPort = ports(portId) @@ -193,6 +212,15 @@ class OutputManager( outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) } + def closeOutputStorages(): Unit = { + this.ports.values.foreach(workerPort => { + workerPort.storage match { + case Some(storageWriter) => storageWriter.close() + case None => + } + }) + } + def getSingleOutputPortIdentity: PortIdentity = { assert(ports.size == 1) ports.head._1 diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala index 2144b3bc595..dc995a52f54 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerPort.scala @@ -2,11 +2,13 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.engine.common.virtualidentity.ChannelIdentity import edu.uci.ics.texera.workflow.common.tuple.schema.Schema +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter import scala.collection.mutable case class WorkerPort( schema: Schema, // TODO: change it to manage the actual AmberFIFOChannel instead of Boolean - channels: mutable.HashMap[ChannelIdentity, Boolean] = mutable.HashMap() + channels: mutable.HashMap[ChannelIdentity, Boolean] = mutable.HashMap(), + storage: Option[SinkStorageWriter] = None ) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 231eff2ee1d..73f40539b9b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -22,13 +22,15 @@ import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER import edu.uci.ics.amber.engine.common.workflow.PhysicalLink import edu.uci.ics.texera.web.workflowruntimestate.WorkflowAggregatedState +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import scala.collection.Seq class RegionExecutionCoordinator( region: Region, workflowExecution: WorkflowExecution, asyncRPCClient: AsyncRPCClient, - controllerConfig: ControllerConfig + controllerConfig: ControllerConfig, + opResultStorage: OpResultStorage ) { def execute(actorService: AkkaActorService): Future[Unit] = { @@ -144,23 +146,36 @@ class RegionExecutionCoordinator( val inputPortMapping = physicalOp.inputPorts .flatMap { case (inputPortId, (_, _, Right(schema))) => - Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> schema) + Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> (schema, None)) case _ => None } val outputPortMapping = physicalOp.outputPorts .flatMap { - case (outputPortId, (_, _, Right(schema))) => - Some(GlobalPortIdentity(physicalOp.id, outputPortId, input = false) -> schema) + case (outputPortId, (outputPort, _, Right(schema))) => + Some( + GlobalPortIdentity( + physicalOp.id, + outputPortId, + input = false + ) -> (schema, outputPort.storageLocation match { + case "" => None + case location => Some(opResultStorage.getPortStorage(location).getStorageWriter) + case _ => None + }) + ) case _ => None } inputPortMapping ++ outputPortMapping } .flatMap { - case (globalPortId, schema) => + case (globalPortId, (schema, storage)) => resourceConfig.operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId).map { workerId => asyncRPCClient - .send(AssignPort(globalPortId.portId, globalPortId.input, schema), workerId) + .send( + AssignPort(globalPortId.portId, globalPortId.input, schema, storage), + workerId + ) } } .toSeq diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 7f76932cfd0..5e2811c2ce6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -7,6 +7,7 @@ import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig import edu.uci.ics.amber.engine.architecture.controller.execution.WorkflowExecution import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient import edu.uci.ics.amber.engine.common.workflow.PhysicalLink +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import scala.collection.mutable @@ -14,7 +15,8 @@ class WorkflowExecutionCoordinator( getNextRegions: () => Set[Region], workflowExecution: WorkflowExecution, controllerConfig: ControllerConfig, - asyncRPCClient: AsyncRPCClient + asyncRPCClient: AsyncRPCClient, + opResultStorage: OpResultStorage ) extends LazyLogging { private val executedRegions: mutable.ListBuffer[Set[Region]] = mutable.ListBuffer() @@ -41,7 +43,8 @@ class WorkflowExecutionCoordinator( region, workflowExecution, asyncRPCClient, - controllerConfig + controllerConfig, + opResultStorage ) regionExecutionCoordinators(region.id) }) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index be6115e7718..c6f27378189 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -145,6 +145,7 @@ class DataProcessor( outputManager.emitEndOfUpstream() // Send Completed signal to worker actor. executor.close() + outputManager.closeOutputStorages() adaptiveBatchingMonitor.stopAdaptiveBatching() stateManager.transitTo(COMPLETED) logger.info( diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala index c0352215680..be390b9114b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/controlcommands/ControlCommandConvertUtils.scala @@ -40,7 +40,7 @@ object ControlCommandConvertUtils { ResumeWorkerV2() case OpenExecutor() => OpenExecutorV2() - case AssignPort(portId, input, schema) => + case AssignPort(portId, input, schema, _) => AssignPortV2(portId, input, schema.toRawSchema) case AddPartitioning(tag: PhysicalLink, partitioning: Partitioning) => AddPartitioningV2(tag, partitioning) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index f094880438e..7128e794315 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -5,13 +5,15 @@ import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AssignPortHa import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.texera.workflow.common.tuple.schema.Schema +import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter object AssignPortHandler { final case class AssignPort( portId: PortIdentity, input: Boolean, - schema: Schema + schema: Schema, + storage: Option[SinkStorageWriter] ) extends ControlCommand[Unit] } @@ -22,7 +24,7 @@ trait AssignPortHandler { if (msg.input) { dp.inputManager.addPort(msg.portId, msg.schema) } else { - dp.outputManager.addPort(msg.portId, msg.schema) + dp.outputManager.addPort(msg.portId, msg.schema, msg.storage) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala index 66a1ac28b00..13a3aca64c5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/PortDescriptor.scala @@ -9,7 +9,8 @@ case class PortDescription( allowMultiInputs: Boolean, isDynamicPort: Boolean, partitionRequirement: PartitionInfo, - dependencies: List[Int] = List.empty + dependencies: List[Int] = List.empty, + hasStorage: Boolean ) trait PortDescriptor { @@ -17,5 +18,5 @@ trait PortDescriptor { var inputPorts: List[PortDescription] = null @JsonProperty(required = false) - var outputPorts: List[PortDescription] = null + var outputPorts: List[PortDescription] = List.empty } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala index efd914494c4..e24897d8891 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/OpResultStorage.scala @@ -26,6 +26,47 @@ class OpResultStorage extends Serializable with LazyLogging { val cache: ConcurrentHashMap[OperatorIdentity, SinkStorageReader] = new ConcurrentHashMap[OperatorIdentity, SinkStorageReader]() + val portStorage: ConcurrentHashMap[String, SinkStorageReader] = + new ConcurrentHashMap[String, SinkStorageReader]() + + def getPortStorage(key: String): SinkStorageReader = { + portStorage.get(key) + } + + def createPortStorage( + executionId: String = "", + key: String, + mode: String + ): SinkStorageReader = { + val storage: SinkStorageReader = + if (mode == "memory") { + new MemoryStorage + } else { + try { + new MongoDBSinkStorage(executionId + key) + } catch { + case t: Throwable => + logger.warn("Failed to create mongo storage", t) + logger.info(s"Fall back to memory storage for $key") + // fall back to memory + new MemoryStorage + } + } + portStorage.put(key, storage) + storage + } + + def containsPortStorage(key: String): Boolean = { + portStorage.contains(key) + } + + def removePortStorage(key: String): Unit = { + if (portStorage.contains(key)) { + portStorage.get(key).clear() + } + portStorage.remove(key) + } + /** * Retrieve the result of an operator from OpResultStorage * @param key The key used for storage and retrieval. @@ -81,6 +122,8 @@ class OpResultStorage extends Serializable with LazyLogging { def close(): Unit = { cache.forEach((_, sinkStorageReader) => sinkStorageReader.clear()) cache.clear() + portStorage.forEach((_, sinkStorageReader) => sinkStorageReader.clear()) + portStorage.clear() } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala index df16fe76be1..a5d60f0e2d8 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/PhysicalPlan.scala @@ -10,6 +10,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.{ } import edu.uci.ics.amber.engine.common.workflow.PhysicalLink import edu.uci.ics.texera.workflow.common.WorkflowContext +import edu.uci.ics.texera.workflow.common.storage.OpResultStorage import org.jgrapht.alg.connectivity.BiconnectivityInspector import org.jgrapht.alg.shortestpath.AllDirectedPaths import org.jgrapht.graph.DirectedAcyclicGraph @@ -20,7 +21,11 @@ import scala.jdk.CollectionConverters.{IteratorHasAsScala, ListHasAsScala, SetHa object PhysicalPlan { - def apply(context: WorkflowContext, logicalPlan: LogicalPlan): PhysicalPlan = { + def apply( + context: WorkflowContext, + logicalPlan: LogicalPlan, + opResultStorage: Option[OpResultStorage] = None + ): PhysicalPlan = { var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty) @@ -49,7 +54,11 @@ object PhysicalPlan { val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id) // Add the operator to the physical plan - physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema()) + physicalPlan = physicalPlan.addOperator( + physicalOp + .propagateSchema() + .assignOutputPortStorages(logicalOp, context, opResultStorage) + ) // Add all the links to the physical plan physicalPlan = (externalLinks ++ internalLinks) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala index df60b4d3326..bb84abb97e7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala @@ -85,7 +85,7 @@ class WorkflowCompiler( ) // the PhysicalPlan with topology expanded. - val physicalPlan = PhysicalPlan(context, rewrittenLogicalPlan) + val physicalPlan = PhysicalPlan(context, rewrittenLogicalPlan, Some(opResultStorage)) Workflow( context, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java index 2b43aa23383..9c02e91a8ef 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java @@ -49,7 +49,7 @@ public OperatorInfo operatorInfo() { "Performs a filter operation", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, true, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java index b73c5b32eb1..74bdd5b44de 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java @@ -110,7 +110,7 @@ public OperatorInfo operatorInfo() { "View the results", OperatorGroupConstants.UTILITY_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, false, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java index a624419b920..91400637466 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java @@ -77,7 +77,7 @@ public OperatorInfo operatorInfo() { "Cast between types", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, false, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java index f3196d95973..b4757c8f61f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java @@ -100,7 +100,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in Python script", OperatorGroupConstants.PYTHON_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, true, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java index 58fe4977211..8c3b96812bb 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/r/RUDFSourceOpDesc.java @@ -95,7 +95,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in R script", OperatorGroupConstants.R_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, ""))).toList(), false, false, false, diff --git a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala index 6a495c698b3..a1d336b70cb 100644 --- a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala +++ b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/OutputPort.scala @@ -9,7 +9,8 @@ package edu.uci.ics.amber.engine.common.workflow final case class OutputPort( id: edu.uci.ics.amber.engine.common.workflow.PortIdentity = edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance, displayName: _root_.scala.Predef.String = "", - blocking: _root_.scala.Boolean = false + blocking: _root_.scala.Boolean = false, + storageLocation: _root_.scala.Predef.String = "" ) extends scalapb.GeneratedMessage with scalapb.lenses.Updatable[OutputPort] { @transient private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0 @@ -36,6 +37,13 @@ final case class OutputPort( __size += _root_.com.google.protobuf.CodedOutputStream.computeBoolSize(3, __value) } }; + + { + val __value = storageLocation + if (!__value.isEmpty) { + __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(4, __value) + } + }; __size } override def serializedSize: _root_.scala.Int = { @@ -67,10 +75,17 @@ final case class OutputPort( _output__.writeBool(3, __v) } }; + { + val __v = storageLocation + if (!__v.isEmpty) { + _output__.writeString(4, __v) + } + }; } def withId(__v: edu.uci.ics.amber.engine.common.workflow.PortIdentity): OutputPort = copy(id = __v) def withDisplayName(__v: _root_.scala.Predef.String): OutputPort = copy(displayName = __v) def withBlocking(__v: _root_.scala.Boolean): OutputPort = copy(blocking = __v) + def withStorageLocation(__v: _root_.scala.Predef.String): OutputPort = copy(storageLocation = __v) def getFieldByNumber(__fieldNumber: _root_.scala.Int): _root_.scala.Any = { (__fieldNumber: @_root_.scala.unchecked) match { case 1 => { @@ -85,6 +100,10 @@ final case class OutputPort( val __t = blocking if (__t != false) __t else null } + case 4 => { + val __t = storageLocation + if (__t != "") __t else null + } } } def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = { @@ -93,6 +112,7 @@ final case class OutputPort( case 1 => id.toPMessage case 2 => _root_.scalapb.descriptors.PString(displayName) case 3 => _root_.scalapb.descriptors.PBoolean(blocking) + case 4 => _root_.scalapb.descriptors.PString(storageLocation) } } def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToSingleLineUnicodeString(this) @@ -106,6 +126,7 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en var __id: _root_.scala.Option[edu.uci.ics.amber.engine.common.workflow.PortIdentity] = _root_.scala.None var __displayName: _root_.scala.Predef.String = "" var __blocking: _root_.scala.Boolean = false + var __storageLocation: _root_.scala.Predef.String = "" var _done__ = false while (!_done__) { val _tag__ = _input__.readTag() @@ -117,13 +138,16 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en __displayName = _input__.readStringRequireUtf8() case 24 => __blocking = _input__.readBool() + case 34 => + __storageLocation = _input__.readStringRequireUtf8() case tag => _input__.skipField(tag) } } edu.uci.ics.amber.engine.common.workflow.OutputPort( id = __id.getOrElse(edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance), displayName = __displayName, - blocking = __blocking + blocking = __blocking, + storageLocation = __storageLocation ) } implicit def messageReads: _root_.scalapb.descriptors.Reads[edu.uci.ics.amber.engine.common.workflow.OutputPort] = _root_.scalapb.descriptors.Reads{ @@ -132,7 +156,8 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en edu.uci.ics.amber.engine.common.workflow.OutputPort( id = __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[edu.uci.ics.amber.engine.common.workflow.PortIdentity]).getOrElse(edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance), displayName = __fieldsMap.get(scalaDescriptor.findFieldByNumber(2).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), - blocking = __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).map(_.as[_root_.scala.Boolean]).getOrElse(false) + blocking = __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).map(_.as[_root_.scala.Boolean]).getOrElse(false), + storageLocation = __fieldsMap.get(scalaDescriptor.findFieldByNumber(4).get).map(_.as[_root_.scala.Predef.String]).getOrElse("") ) case _ => throw new RuntimeException("Expected PMessage") } @@ -150,24 +175,29 @@ object OutputPort extends scalapb.GeneratedMessageCompanion[edu.uci.ics.amber.en lazy val defaultInstance = edu.uci.ics.amber.engine.common.workflow.OutputPort( id = edu.uci.ics.amber.engine.common.workflow.PortIdentity.defaultInstance, displayName = "", - blocking = false + blocking = false, + storageLocation = "" ) implicit class OutputPortLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, edu.uci.ics.amber.engine.common.workflow.OutputPort]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, edu.uci.ics.amber.engine.common.workflow.OutputPort](_l) { def id: _root_.scalapb.lenses.Lens[UpperPB, edu.uci.ics.amber.engine.common.workflow.PortIdentity] = field(_.id)((c_, f_) => c_.copy(id = f_)) def displayName: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.displayName)((c_, f_) => c_.copy(displayName = f_)) def blocking: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Boolean] = field(_.blocking)((c_, f_) => c_.copy(blocking = f_)) + def storageLocation: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.storageLocation)((c_, f_) => c_.copy(storageLocation = f_)) } final val ID_FIELD_NUMBER = 1 final val DISPLAYNAME_FIELD_NUMBER = 2 final val BLOCKING_FIELD_NUMBER = 3 + final val STORAGELOCATION_FIELD_NUMBER = 4 def of( id: edu.uci.ics.amber.engine.common.workflow.PortIdentity, displayName: _root_.scala.Predef.String, - blocking: _root_.scala.Boolean + blocking: _root_.scala.Boolean, + storageLocation: _root_.scala.Predef.String ): _root_.edu.uci.ics.amber.engine.common.workflow.OutputPort = _root_.edu.uci.ics.amber.engine.common.workflow.OutputPort( id, displayName, - blocking + blocking, + storageLocation ) // @@protoc_insertion_point(GeneratedMessageCompanion[edu.uci.ics.amber.engine.common.OutputPort]) } diff --git a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala index 2d092df178b..fc94034104b 100644 --- a/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala +++ b/core/amber/src/main/scalapb/edu/uci/ics/amber/engine/common/workflow/WorkflowProto.scala @@ -26,15 +26,16 @@ object WorkflowProto extends _root_.scalapb.GeneratedFileObject { 29tbW9uLlBvcnRJZGVudGl0eUIK4j8HEgJpZPABAVICaWQSMgoLZGlzcGxheU5hbWUYAiABKAlCEOI/DRILZGlzcGxheU5hbWVSC 2Rpc3BsYXlOYW1lEj4KD2FsbG93TXVsdGlMaW5rcxgDIAEoCEIU4j8REg9hbGxvd011bHRpTGlua3NSD2FsbG93TXVsdGlMaW5rc xJkCgxkZXBlbmRlbmNpZXMYBCADKAsyLS5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBvcnRJZGVudGl0eUIR4j8OE - gxkZXBlbmRlbmNpZXNSDGRlcGVuZGVuY2llcyK2AQoKT3V0cHV0UG9ydBJJCgJpZBgBIAEoCzItLmVkdS51Y2kuaWNzLmFtYmVyL + gxkZXBlbmRlbmNpZXNSDGRlcGVuZGVuY2llcyL2AQoKT3V0cHV0UG9ydBJJCgJpZBgBIAEoCzItLmVkdS51Y2kuaWNzLmFtYmVyL mVuZ2luZS5jb21tb24uUG9ydElkZW50aXR5QgriPwcSAmlk8AEBUgJpZBIyCgtkaXNwbGF5TmFtZRgCIAEoCUIQ4j8NEgtkaXNwb - GF5TmFtZVILZGlzcGxheU5hbWUSKQoIYmxvY2tpbmcYAyABKAhCDeI/ChIIYmxvY2tpbmdSCGJsb2NraW5nIo4DCgxQaHlzaWNhb - ExpbmsSYQoIZnJvbU9wSWQYASABKAsyMy5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBoeXNpY2FsT3BJZGVudGl0e - UIQ4j8NEghmcm9tT3BJZPABAVIIZnJvbU9wSWQSYQoKZnJvbVBvcnRJZBgCIAEoCzItLmVkdS51Y2kuaWNzLmFtYmVyLmVuZ2luZ - S5jb21tb24uUG9ydElkZW50aXR5QhLiPw8SCmZyb21Qb3J0SWTwAQFSCmZyb21Qb3J0SWQSWwoGdG9PcElkGAMgASgLMjMuZWR1L - nVjaS5pY3MuYW1iZXIuZW5naW5lLmNvbW1vbi5QaHlzaWNhbE9wSWRlbnRpdHlCDuI/CxIGdG9PcElk8AEBUgZ0b09wSWQSWwoId - G9Qb3J0SWQYBCABKAsyLS5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBvcnRJZGVudGl0eUIQ4j8NEgh0b1BvcnRJZ - PABAVIIdG9Qb3J0SWRCCeI/BkgAWAB4AGIGcHJvdG8z""" + GF5TmFtZVILZGlzcGxheU5hbWUSKQoIYmxvY2tpbmcYAyABKAhCDeI/ChIIYmxvY2tpbmdSCGJsb2NraW5nEj4KD3N0b3JhZ2VMb + 2NhdGlvbhgEIAEoCUIU4j8REg9zdG9yYWdlTG9jYXRpb25SD3N0b3JhZ2VMb2NhdGlvbiKOAwoMUGh5c2ljYWxMaW5rEmEKCGZyb + 21PcElkGAEgASgLMjMuZWR1LnVjaS5pY3MuYW1iZXIuZW5naW5lLmNvbW1vbi5QaHlzaWNhbE9wSWRlbnRpdHlCEOI/DRIIZnJvb + U9wSWTwAQFSCGZyb21PcElkEmEKCmZyb21Qb3J0SWQYAiABKAsyLS5lZHUudWNpLmljcy5hbWJlci5lbmdpbmUuY29tbW9uLlBvc + nRJZGVudGl0eUIS4j8PEgpmcm9tUG9ydElk8AEBUgpmcm9tUG9ydElkElsKBnRvT3BJZBgDIAEoCzIzLmVkdS51Y2kuaWNzLmFtY + mVyLmVuZ2luZS5jb21tb24uUGh5c2ljYWxPcElkZW50aXR5Qg7iPwsSBnRvT3BJZPABAVIGdG9PcElkElsKCHRvUG9ydElkGAQgA + SgLMi0uZWR1LnVjaS5pY3MuYW1iZXIuZW5naW5lLmNvbW1vbi5Qb3J0SWRlbnRpdHlCEOI/DRIIdG9Qb3J0SWTwAQFSCHRvUG9yd + ElkQgniPwZIAFgAeABiBnByb3RvMw==""" ).mkString) lazy val scalaDescriptor: _root_.scalapb.descriptors.FileDescriptor = { val scalaProto = com.google.protobuf.descriptor.FileDescriptorProto.parseFrom(ProtoBytes) diff --git a/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index d28a991d70e..ca2a430c8fc 100644 --- a/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/core/gui/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -26,6 +26,7 @@ import { exhaustiveGuard } from "../../../common/util/switch"; import { WorkflowStatusService } from "../workflow-status/workflow-status.service"; import { isDefined } from "../../../common/util/predicate"; import { intersection } from "../../../common/util/set"; +import { PortDescription } from "../../types/workflow-common.interface"; // TODO: change this declaration export const FORM_DEBOUNCE_TIME_MS = 150; @@ -169,6 +170,7 @@ export class ExecuteWorkflowService { this.workflowActionService.getTexeraGraph(), targetOperatorId ); + console.log(logicalPlan); this.resetExecutionState(); this.workflowStatusService.resetStatus(); this.sendExecutionRequest(executionName, logicalPlan); @@ -376,7 +378,19 @@ export class ExecuteWorkflowService { intersection(operatorIds, workflowGraph.getOperatorsMarkedForReuseResult()) ); - return { operators, links, opsToViewResult, opsToReuseResult }; + const terminalOpIds: string[] = Array.from(operatorIds).filter(opId => !links.some(link => link.fromOpId === opId)); + + const opsNeedingOutputStorage = new Set(opsToViewResult.concat(terminalOpIds)); + + const operatorsWithPortStorage: LogicalOperator[] = operators.map(op => ({ + ...op, + outputPorts: (op.outputPorts as PortDescription[]).map(outputPort => ({ + ...outputPort, + hasStorage: opsNeedingOutputStorage.has(op.operatorID), + })), + })); + + return { operators: operatorsWithPortStorage, links, opsToViewResult, opsToReuseResult }; } public getWorkerIds(operatorId: string): ReadonlyArray { diff --git a/core/gui/src/app/workspace/types/workflow-common.interface.ts b/core/gui/src/app/workspace/types/workflow-common.interface.ts index 69ef962ea01..9c9506120c3 100644 --- a/core/gui/src/app/workspace/types/workflow-common.interface.ts +++ b/core/gui/src/app/workspace/types/workflow-common.interface.ts @@ -40,6 +40,7 @@ export interface PortDescription isDynamicPort?: boolean; partitionRequirement?: PartitionInfo; dependencies?: { id: number; internal: boolean }[]; + hasStorage?: boolean; }> {} export interface OperatorPredicate