From 3c0c7d770310f83b9b83c612cd9952aa924f6d7a Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:24:39 -0800 Subject: [PATCH 01/20] working code --- .../web/service/ExecutionResultService.scala | 146 +++++++------- .../service/WorkflowExecutionService.scala | 2 +- .../texera/web/service/WorkflowService.scala | 2 +- .../workflow/SinkInjectionTransformer.scala | 72 ------- .../texera/workflow/WorkflowCompiler.scala | 181 +++++++++++------- .../protobuf/edu/uci/ics/amber/workflow.proto | 1 + .../core/storage/result/OpResultStorage.scala | 5 + .../amber/core/workflow/PhysicalPlan.scala | 4 + .../filter/SpecializedFilterOpDesc.java | 2 +- .../sink/managed/ProgressiveSinkOpDesc.java | 2 +- .../typecasting/TypeCastingOpDesc.java | 2 +- .../source/PythonUDFSourceOpDescV2.java | 2 +- .../operator/udf/r/RUDFSourceOpDesc.java | 2 +- 13 files changed, 199 insertions(+), 224 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 1c091600db7..3adc68027b4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,35 +5,20 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.StorageConfig -import edu.uci.ics.amber.core.storage.result.{ - MongoDocument, - OperatorResultMetadata, - ResultStorage, - WorkflowResultStore -} +import edu.uci.ics.amber.core.storage.result.{MongoDocument, OperatorResultMetadata, ResultStorage, WorkflowResultStore} import edu.uci.ics.amber.core.tuple.Tuple +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError} -import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ - COMPLETED, - FAILED, - KILLED, - RUNNING -} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED, KILLED, RUNNING} import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.amber.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.OutputPort.OutputMode import edu.uci.ics.texera.web.SubscriptionManager -import edu.uci.ics.texera.web.model.websocket.event.{ - PaginatedResultEvent, - TexeraWebSocketEvent, - WebResultUpdateEvent -} +import edu.uci.ics.texera.web.model.websocket.event.{PaginatedResultEvent, TexeraWebSocketEvent, WebResultUpdateEvent} import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} -import edu.uci.ics.texera.workflow.LogicalPlan import java.util.UUID import scala.collection.mutable @@ -63,12 +48,23 @@ object ExecutionResultService { * Produces the WebResultUpdate to send to frontend from a result update from the engine. */ private def convertWebResultUpdate( - sink: ProgressiveSinkOpDesc, + workflowIdentity: WorkflowIdentity, + physicalOps: List[PhysicalOp], oldTupleCount: Int, newTupleCount: Int ): WebResultUpdate = { + val outputMode = physicalOps + .flatMap(op => op.outputPorts) + .filter({ + case (portId, (port, links, schema)) => !portId.internal + }) + .map({ + case (portId, (port, links, schema)) => port.mode + }) + .head + val webOutputMode: WebOutputMode = { - sink.getOutputMode match { + outputMode match { // currently, only table outputs are using these modes case OutputMode.SET_DELTA => SetDeltaMode() case OutputMode.SET_SNAPSHOT => PaginationMode() @@ -79,7 +75,7 @@ object ExecutionResultService { } val storage = - ResultStorage.getOpResultStorage(sink.getContext.workflowId).get(sink.getUpstreamId.get) + ResultStorage.getOpResultStorage(workflowIdentity).get(physicalOps.head.id.logicalOpId) val webUpdate = webOutputMode match { case PaginationMode() => val numTuples = storage.getCount @@ -98,7 +94,7 @@ object ExecutionResultService { case _ => throw new RuntimeException( - "update mode combination not supported: " + (webOutputMode, sink.getOutputMode) + "update mode combination not supported: " + (webOutputMode, outputMode) ) } webUpdate @@ -150,18 +146,16 @@ object ExecutionResultService { * - send result update event to the frontend */ class ExecutionResultService( + workflowIdentity: WorkflowIdentity, val workflowStateStore: WorkflowStateStore ) extends SubscriptionManager with LazyLogging { - - var sinkOperators: mutable.HashMap[OperatorIdentity, ProgressiveSinkOpDesc] = - mutable.HashMap[OperatorIdentity, ProgressiveSinkOpDesc]() private val resultPullingFrequency = AmberConfig.executionResultPollingInSecs private var resultUpdateCancellable: Cancellable = _ def attachToExecution( stateStore: ExecutionStateStore, - logicalPlan: LogicalPlan, + physicalPlan: PhysicalPlan, client: AmberClient ): Unit = { @@ -181,7 +175,7 @@ class ExecutionResultService( 2.seconds, resultPullingFrequency.seconds ) { - onResultUpdate() + onResultUpdate(physicalPlan) } } } else { @@ -197,7 +191,7 @@ class ExecutionResultService( logger.info("Workflow execution terminated. Stop update results.") if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) { // immediately perform final update - onResultUpdate() + onResultUpdate(physicalPlan) } } }) @@ -225,18 +219,15 @@ class ExecutionResultService( case (opId, info) => val oldInfo = oldState.resultInfo.getOrElse(opId, OperatorResultMetadata()) buf(opId.id) = ExecutionResultService.convertWebResultUpdate( - sinkOperators(opId), + workflowIdentity, + physicalPlan.getPhysicalOpsOfLogicalOp(opId), oldInfo.tupleCount, info.tupleCount ) - if ( - StorageConfig.resultStorageMode.toLowerCase == "mongodb" - && !opId.id.startsWith("sink") - ) { - val sinkOp = sinkOperators(opId) + if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") { val opStorage = ResultStorage - .getOpResultStorage(sinkOp.getContext.workflowId) - .get(sinkOp.getUpstreamId.get) + .getOpResultStorage(workflowIdentity) + .get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId) opStorage match { case mongoDocument: MongoDocument[Tuple] => val tableCatStats = mongoDocument.getCategoricalStats @@ -262,22 +253,12 @@ class ExecutionResultService( }) ) - // first clear all the results - sinkOperators.clear() +// // first clear all the results +// sinkOperators.clear() workflowStateStore.resultStore.updateState { _ => WorkflowResultStore() // empty result store } - // For operators connected to a sink and sinks, - // create result service so that the results can be displayed. - logicalPlan.getTerminalOperatorIds.map(sink => { - logicalPlan.getOperator(sink) match { - case sinkOp: ProgressiveSinkOpDesc => - sinkOperators += ((sinkOp.getUpstreamId.get, sinkOp)) - sinkOperators += ((sink, sinkOp)) - case other => // skip other non-texera-managed sinks, if any - } - }) } def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = { @@ -286,16 +267,12 @@ class ExecutionResultService( val opId = OperatorIdentity(request.operatorID) val paginationIterable = { - if (sinkOperators.contains(opId)) { - val sinkOp = sinkOperators(opId) - ResultStorage - .getOpResultStorage(sinkOp.getContext.workflowId) - .get(sinkOp.getUpstreamId.get) - .getRange(from, from + request.pageSize) - .to(Iterable) - } else { - Iterable.empty - } + ResultStorage + .getOpResultStorage(workflowIdentity) + .get(opId) + .getRange(from, from + request.pageSize) + .to(Iterable) + } val mappedResults = paginationIterable .map(tuple => tuple.asKeyValuePairJson()) @@ -306,23 +283,40 @@ class ExecutionResultService( PaginatedResultEvent.apply(request, mappedResults, attributes) } - private def onResultUpdate(): Unit = { + private def onResultUpdate(physicalPlan: PhysicalPlan): Unit = { workflowStateStore.resultStore.updateState { _ => - val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = sinkOperators.map { - - case (id, sink) => - val count = ResultStorage - .getOpResultStorage(sink.getContext.workflowId) - .get(sink.getUpstreamId.get) - .getCount - .toInt - val mode = sink.getOutputMode - val changeDetector = - if (mode == OutputMode.SET_SNAPSHOT) { - UUID.randomUUID.toString - } else "" - (id, OperatorResultMetadata(count, changeDetector)) - }.toMap + val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = { + ResultStorage + .getOpResultStorage(workflowIdentity) + .getAllKeys + .filter(opId => !opId.id.startsWith("materialized")) + .map(opId => { + val count = ResultStorage + .getOpResultStorage(workflowIdentity) + .get(opId) + .getCount + .toInt + + // use the first output port's mode + val mode = physicalPlan + .getPhysicalOpsOfLogicalOp(opId) + .flatMap(physicalOp => physicalOp.outputPorts) + .filter({ + case (portId, (port, links, schema)) => + !portId.internal + }) + .map({ + case (portId, (port, links, schema)) => port.mode + }) + .head + val changeDetector = + if (mode == OutputMode.SET_SNAPSHOT) { + UUID.randomUUID.toString + } else "" + (opId, OperatorResultMetadata(count, changeDetector)) + }) + .toMap + } WorkflowResultStore(newInfo) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala index 26e5899c051..09015a0a00c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala @@ -114,7 +114,7 @@ class WorkflowExecutionService( executionConsoleService = new ExecutionConsoleService(client, executionStateStore, wsInput) logger.info("Starting the workflow execution.") - resultService.attachToExecution(executionStateStore, workflow.logicalPlan, client) + resultService.attachToExecution(executionStateStore, workflow.physicalPlan, client) executionStateStore.metadataStore.updateState(metadataStore => updateWorkflowState(READY, metadataStore) .withFatalErrors(Seq.empty) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala index a4c3c0c0687..6f0b7868e54 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala @@ -77,7 +77,7 @@ class WorkflowService( val stateStore = new WorkflowStateStore() var executionService: BehaviorSubject[WorkflowExecutionService] = BehaviorSubject.create() - val resultService: ExecutionResultService = new ExecutionResultService(stateStore) + val resultService: ExecutionResultService = new ExecutionResultService(workflowId, stateStore) val exportService: ResultExportService = new ResultExportService(workflowId) val lifeCycleManager: WorkflowLifecycleManager = new WorkflowLifecycleManager( s"workflowId=$workflowId", diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala index 80f0c3e290f..e69de29bb2d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala @@ -1,72 +0,0 @@ -package edu.uci.ics.texera.workflow - -import edu.uci.ics.amber.operator.sink.SinkOpDesc -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.amber.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.workflow.PortIdentity - -object SinkInjectionTransformer { - - def transform(opsToViewResult: List[String], oldPlan: LogicalPlan): LogicalPlan = { - var logicalPlan = oldPlan - - // for any terminal operator without a sink, add a sink - val nonSinkTerminalOps = logicalPlan.getTerminalOperatorIds.filter(opId => - !logicalPlan.getOperator(opId).isInstanceOf[SinkOpDesc] - ) - // for any operators marked as view result without a sink, add a sink - val viewResultOps = opsToViewResult - .map(idString => OperatorIdentity(idString)) - .filter(opId => !logicalPlan.getDownstreamOps(opId).exists(op => op.isInstanceOf[SinkOpDesc])) - - val operatorsToAddSink = (nonSinkTerminalOps ++ viewResultOps).toSet - operatorsToAddSink.foreach(opId => { - val op = logicalPlan.getOperator(opId) - op.operatorInfo.outputPorts.foreach(outPort => { - val sink = new ProgressiveSinkOpDesc() - sink.setOperatorId("sink_" + opId.id) - logicalPlan = logicalPlan - .addOperator(sink) - .addLink( - op.operatorIdentifier, - outPort.id, - sink.operatorIdentifier, - toPortId = PortIdentity() - ) - }) - }) - - // check precondition: all the terminal operators should be sinks - assert( - logicalPlan.getTerminalOperatorIds.forall(o => - logicalPlan.getOperator(o).isInstanceOf[SinkOpDesc] - ) - ) - - // for each sink: - // set the corresponding upstream ID and port - // set output mode based on the visualization operator before it - logicalPlan.getTerminalOperatorIds.foreach(sinkOpId => { - val sinkOp = logicalPlan.getOperator(sinkOpId).asInstanceOf[ProgressiveSinkOpDesc] - val upstream = logicalPlan.getUpstreamOps(sinkOpId).headOption - val edge = logicalPlan.links.find(l => - l.fromOpId == upstream.map(_.operatorIdentifier).orNull - && l.toOpId == sinkOpId - ) - assert(upstream.nonEmpty) - if (upstream.nonEmpty && edge.nonEmpty) { - // set upstream ID and port - sinkOp.setUpstreamId(upstream.get.operatorIdentifier) - sinkOp.setUpstreamPort(edge.get.fromPortId.id) - - // set output mode for visualization operator - val outputPort = - upstream.get.operatorInfo.outputPorts.find(port => port.id == edge.get.fromPortId).get - sinkOp.setOutputMode(outputPort.mode) - } - }) - - logicalPlan - } - -} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 9dd0bdc14f7..4498fd0daf4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -1,16 +1,17 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging +import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.PhysicalOp.getExternalPortSchemas -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc +import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SINGLE_SNAPSHOT -import edu.uci.ics.amber.workflow.PhysicalLink +import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_SNAPSHOT, SINGLE_SNAPSHOT} +import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} import edu.uci.ics.amber.workflowruntimestate.WorkflowFatalError import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService @@ -98,9 +99,15 @@ class WorkflowCompiler( // function to expand logical plan to physical plan private def expandLogicalPlan( logicalPlan: LogicalPlan, + logicalOpsToViewResult: List[String], errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] ): PhysicalPlan = { + val terminalLogicalOps = logicalPlan.getTerminalOperatorIds + val toAddSink = (terminalLogicalOps ++ logicalOpsToViewResult).toSet var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty) + // create a JSON object that holds pointers to the workflow's results in Mongo + val resultsJSON = objectMapper.createObjectNode() + val sinksPointers = objectMapper.createArrayNode() logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => Try { @@ -135,8 +142,91 @@ class WorkflowCompiler( .foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) } } }) + + // assign the view results + subPlan.topologicalIterator().map(subPlan.getOperator).flatMap { physicalOp => + physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) + }.filter({ + case (physicalOp, (_, (outputPort, _, _))) => toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal + }).foreach({ + case (physicalOp, (_, (outputPort, _, schema))) => + val storage = ResultStorage.getOpResultStorage(context.workflowId) + val storageKey = physicalOp.id.logicalOpId + // due to the size limit of single document in mongoDB (16MB) + // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. + val storageType = { + if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY + else OpResultStorage.defaultStorageMode + } + if (!storage.contains(storageKey)) { + // get the schema for result storage in certain mode + val sinkStorageSchema: Option[Schema] = + if (storageType == OpResultStorage.MONGODB) { + // use the output schema on the first output port as the schema for storage + Some(schema.right.get) + } else { + None + } + storage.create( + s"${context.executionId}_", + storageKey, + storageType, + sinkStorageSchema + ) + // add the sink collection name to the JSON array of sinks + val storageNode = objectMapper.createObjectNode() + storageNode.put("storageType", storageType) + storageNode.put("storageKey", s"${context.executionId}_$storageKey") + sinksPointers.add(storageNode) + } + + val sinkPhysicalOp = PhysicalOp.localPhysicalOp( + context.workflowId, + context.executionId, + OperatorIdentity("sink_" + storageKey.id), + OpExecInitInfo( + (idx, workers) => + new edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec( + outputPort.mode, + storageKey.id, + context.workflowId + ) + ) + ) + .withInputPorts(List(InputPort(PortIdentity()))) + .withOutputPorts(List(OutputPort(PortIdentity()))) + .withPropagateSchema( + SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { + // Get the first schema from inputSchemas + val inputSchema = inputSchemas.values.head + + // Define outputSchema based on outputMode + val outputSchema = if (outputPort.mode == SET_SNAPSHOT) { + if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { + // input is insert/retract delta: remove the flag column in the output + Schema.builder() + .add(inputSchema) + .remove(ProgressiveUtils.insertRetractFlagAttr.getName) + .build() + } else { + // input is insert-only delta: output schema is the same as input schema + inputSchema + } + } else { + // SET_DELTA: output schema is the same as input schema + inputSchema + } + + // Create a Scala immutable Map + Map(PortIdentity() -> outputSchema) + }) + ) + val sinkLink = PhysicalLink(physicalOp.id, outputPort.id, sinkPhysicalOp.id, PortIdentity()) + physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) + }) } match { case Success(_) => + case Failure(err) => errorList match { case Some(list) => list.append((logicalOpId, err)) @@ -144,6 +234,13 @@ class WorkflowCompiler( } } ) + + // update execution entry in MySQL to have pointers to the mongo collections + resultsJSON.set("results", sinksPointers) + println("hello!!!" + resultsJSON.toString) + ExecutionsMetadataPersistService.tryUpdateExistingExecution(context.executionId) { + _.setResult(resultsJSON.toString) + } physicalPlan } @@ -163,78 +260,24 @@ class WorkflowCompiler( // 1. convert the pojo to logical plan var logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) - // 2. Manipulate logical plan by: - // - inject sink - logicalPlan = SinkInjectionTransformer.transform( - logicalPlanPojo.opsToViewResult, - logicalPlan - ) +// // 2. Manipulate logical plan by: +// // - inject sink +// logicalPlan = SinkInjectionTransformer.transform( +// logicalPlanPojo.opsToViewResult, +// logicalPlan +// ) // - resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(None) // 3. Propagate the schema to get the input & output schemas for each port of each operator logicalPlan.propagateWorkflowSchema(context, None) - // 4. assign the sink storage using logical plan and expand the logical plan to the physical plan, - assignSinkStorage(logicalPlan, context) - val physicalPlan = expandLogicalPlan(logicalPlan, None) +// // 4. assign the sink storage using logical plan +// assignSinkStorage(logicalPlan, context) - Workflow(context, logicalPlan, physicalPlan) - } + // 5. expand the logical plan to the physical plan, + val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) - /** - * Once standalone compiler is done, move this function to the execution service, and change the 1st parameter from LogicalPlan to PhysicalPlan - */ - @Deprecated - def assignSinkStorage( - logicalPlan: LogicalPlan, - context: WorkflowContext, - reuseStorageSet: Set[OperatorIdentity] = Set() - ): Unit = { - val storage = ResultStorage.getOpResultStorage(context.workflowId) - // create a JSON object that holds pointers to the workflow's results in Mongo - val resultsJSON = objectMapper.createObjectNode() - val sinksPointers = objectMapper.createArrayNode() - // assign storage to texera-managed sinks before generating exec config - logicalPlan.operators.foreach { - case o @ (sink: ProgressiveSinkOpDesc) => - val storageKey = sink.getUpstreamId.getOrElse(o.operatorIdentifier) - // due to the size limit of single document in mongoDB (16MB) - // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. - val storageType = { - if (sink.getOutputMode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY - else OpResultStorage.defaultStorageMode - } - if (!reuseStorageSet.contains(storageKey) || !storage.contains(storageKey)) { - // get the schema for result storage in certain mode - val sinkStorageSchema: Option[Schema] = - if (storageType == OpResultStorage.MONGODB) { - // use the output schema on the first output port as the schema for storage - Some(o.outputPortToSchemaMapping.head._2) - } else { - None - } - storage.create( - s"${o.getContext.executionId}_", - storageKey, - storageType, - sinkStorageSchema - ) - // add the sink collection name to the JSON array of sinks - val storageNode = objectMapper.createObjectNode() - storageNode.put("storageType", storageType) - storageNode.put("storageKey", s"${o.getContext.executionId}_$storageKey") - sinksPointers.add(storageNode) - } - storage.get(storageKey) - - case _ => - } - // update execution entry in MySQL to have pointers to the mongo collections - resultsJSON.set("results", sinksPointers) - ExecutionsMetadataPersistService.tryUpdateExistingExecution(context.executionId) { - _.setResult(resultsJSON.toString) - } + Workflow(context, logicalPlan, physicalPlan) } - } diff --git a/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto b/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto index 0ee4c68d36a..bc7a9e0e7e9 100644 --- a/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto +++ b/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto @@ -40,6 +40,7 @@ message OutputPort { string displayName = 2; bool blocking = 3; OutputMode mode = 4; + string storageUrl = 5; } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index 3485fb1285d..03ba5c58f3d 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -7,6 +7,7 @@ import edu.uci.ics.amber.core.tuple.{Schema, Tuple} import edu.uci.ics.amber.virtualidentity.OperatorIdentity import java.util.concurrent.ConcurrentHashMap +import scala.collection.convert.ImplicitConversions.`iterator asScala` object OpResultStorage { val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase @@ -98,4 +99,8 @@ class OpResultStorage extends Serializable with LazyLogging { cache.clear() } + def getAllKeys: Set[OperatorIdentity] = { + cache.keySet().iterator().toSet + } + } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index ce8070a1aa7..95681eb7711 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -126,6 +126,10 @@ case class PhysicalPlan( } + def getTerminalPhysicalOpIds(): Set[PhysicalOpIdentity] = { + operators.filter(op => dag.outDegreeOf(op.id) == 0).map(_.id) + } + @JsonIgnore def getOutputPartitionInfo( link: PhysicalLink, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java index ea35f40dee0..6c9c96703fa 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java @@ -48,7 +48,7 @@ public OperatorInfo operatorInfo() { "Performs a filter operation", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), false, false, true, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java index 1f63fb86ab4..00ce71efa4a 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java @@ -96,7 +96,7 @@ public OperatorInfo operatorInfo() { "View the results", OperatorGroupConstants.UTILITY_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), false, false, false, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java index 87d902f01f9..6324e279721 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java @@ -75,7 +75,7 @@ public OperatorInfo operatorInfo() { "Cast between types", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), false, false, false, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java index 944f171c397..f4766f57b1a 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java @@ -99,7 +99,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in Python script", OperatorGroupConstants.PYTHON_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), false, false, true, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java index 2b5785d9468..972574bcc98 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java @@ -108,7 +108,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in R script", OperatorGroupConstants.R_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), false, false, false, From d41950781b9a3205043fa916159a1ad4cd43ad44 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:32:41 -0800 Subject: [PATCH 02/20] clean up --- .../texera/workflow/WorkflowCompiler.scala | 90 +------------------ 1 file changed, 3 insertions(+), 87 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 4498fd0daf4..3f7f7c48ae4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -4,7 +4,6 @@ import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.PhysicalOp.getExternalPortSchemas import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper @@ -12,7 +11,6 @@ import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.virtualidentity.OperatorIdentity import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_SNAPSHOT, SINGLE_SNAPSHOT} import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} -import edu.uci.ics.amber.workflowruntimestate.WorkflowFatalError import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService @@ -20,78 +18,6 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.util.{Failure, Success, Try} -object WorkflowCompiler { - // util function for extracting the error causes - private def getStackTraceWithAllCauses(err: Throwable, topLevel: Boolean = true): String = { - val header = if (topLevel) { - "Stack trace for developers: \n\n" - } else { - "\n\nCaused by:\n" - } - val message = header + err.toString + "\n" + err.getStackTrace.mkString("\n") - if (err.getCause != null) { - message + getStackTraceWithAllCauses(err.getCause, topLevel = false) - } else { - message - } - } - - // util function for convert the error list to error map, and report the error in log - private def collectInputSchemasOfSinks( - physicalPlan: PhysicalPlan, - errorList: ArrayBuffer[(OperatorIdentity, Throwable)] // Mandatory error list - ): Map[OperatorIdentity, Array[Schema]] = { - physicalPlan.operators - .filter(op => op.isSinkOperator) - .map { physicalOp => - physicalOp.id.logicalOpId -> getExternalPortSchemas( - physicalOp, - fromInput = true, - Some(errorList) - ).flatten.toArray - } - .toMap - } - - // Only collects the input schemas for the sink operator - private def collectInputSchemaFromPhysicalPlanForSink( - physicalPlan: PhysicalPlan, - errorList: ArrayBuffer[(OperatorIdentity, Throwable)] // Mandatory error list - ): Map[OperatorIdentity, List[Option[Schema]]] = { - val physicalInputSchemas = - physicalPlan.operators.filter(op => op.isSinkOperator).map { physicalOp => - // Process inputPorts and capture Throwable values in the errorList - physicalOp.id -> physicalOp.inputPorts.values - .filterNot(_._1.id.internal) - .map { - case (port, _, schema) => - schema match { - case Left(err) => - // Save the Throwable into the errorList - errorList.append((physicalOp.id.logicalOpId, err)) - port.id -> None // Use None for this port - case Right(validSchema) => - port.id -> Some(validSchema) // Use the valid schema - } - } - .toList // Convert to a list for further processing - } - - // Group the physical input schemas by their logical operator ID and consolidate the schemas - physicalInputSchemas - .groupBy(_._1.logicalOpId) - .view - .mapValues(_.flatMap(_._2).toList.sortBy(_._1.id).map(_._2)) - .toMap - } -} - -case class WorkflowCompilationResult( - physicalPlan: Option[PhysicalPlan], // if physical plan is none, the compilation is failed - operatorIdToInputSchemas: Map[OperatorIdentity, List[Option[Schema]]], - operatorIdToError: Map[OperatorIdentity, WorkflowFatalError] -) - class WorkflowCompiler( context: WorkflowContext ) extends LazyLogging { @@ -237,7 +163,6 @@ class WorkflowCompiler( // update execution entry in MySQL to have pointers to the mongo collections resultsJSON.set("results", sinksPointers) - println("hello!!!" + resultsJSON.toString) ExecutionsMetadataPersistService.tryUpdateExistingExecution(context.executionId) { _.setResult(resultsJSON.toString) } @@ -258,24 +183,15 @@ class WorkflowCompiler( logicalPlanPojo: LogicalPlanPojo ): Workflow = { // 1. convert the pojo to logical plan - var logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) + val logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) -// // 2. Manipulate logical plan by: -// // - inject sink -// logicalPlan = SinkInjectionTransformer.transform( -// logicalPlanPojo.opsToViewResult, -// logicalPlan -// ) - // - resolve the file name in each scan source operator + // 2. resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(None) // 3. Propagate the schema to get the input & output schemas for each port of each operator logicalPlan.propagateWorkflowSchema(context, None) -// // 4. assign the sink storage using logical plan -// assignSinkStorage(logicalPlan, context) - - // 5. expand the logical plan to the physical plan, + // 4. expand the logical plan to the physical plan, and assign storage val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) Workflow(context, logicalPlan, physicalPlan) From 242d99f312e9ec694869baa85a721f2595825412 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:40:59 -0800 Subject: [PATCH 03/20] extract out helper --- .../texera/workflow/WorkflowCompiler.scala | 49 +++-------------- .../operator/SpecialPhysicalOpFactory.scala | 55 +++++++++++++++++++ 2 files changed, 63 insertions(+), 41 deletions(-) create mode 100644 core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 3f7f7c48ae4..b970fddb9f5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -7,6 +7,7 @@ import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper +import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.virtualidentity.OperatorIdentity import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_SNAPSHOT, SINGLE_SNAPSHOT} @@ -106,47 +107,13 @@ class WorkflowCompiler( sinksPointers.add(storageNode) } - val sinkPhysicalOp = PhysicalOp.localPhysicalOp( - context.workflowId, - context.executionId, - OperatorIdentity("sink_" + storageKey.id), - OpExecInitInfo( - (idx, workers) => - new edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec( - outputPort.mode, - storageKey.id, - context.workflowId - ) - ) - ) - .withInputPorts(List(InputPort(PortIdentity()))) - .withOutputPorts(List(OutputPort(PortIdentity()))) - .withPropagateSchema( - SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { - // Get the first schema from inputSchemas - val inputSchema = inputSchemas.values.head - - // Define outputSchema based on outputMode - val outputSchema = if (outputPort.mode == SET_SNAPSHOT) { - if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { - // input is insert/retract delta: remove the flag column in the output - Schema.builder() - .add(inputSchema) - .remove(ProgressiveUtils.insertRetractFlagAttr.getName) - .build() - } else { - // input is insert-only delta: output schema is the same as input schema - inputSchema - } - } else { - // SET_DELTA: output schema is the same as input schema - inputSchema - } - - // Create a Scala immutable Map - Map(PortIdentity() -> outputSchema) - }) - ) + val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + context.workflowId, + context.executionId, + storageKey.id, + outputPort.mode, + isMaterialization = false + ) val sinkLink = PhysicalLink(physicalOp.id, outputPort.id, sinkPhysicalOp.id, PortIdentity()) physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) }) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala new file mode 100644 index 00000000000..e4a393cd57b --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -0,0 +1,55 @@ +package edu.uci.ics.amber.operator + +import edu.uci.ics.amber.core.executor.OpExecInitInfo +import edu.uci.ics.amber.core.tuple.Schema +import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} +import edu.uci.ics.amber.operator.sink.ProgressiveUtils +import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec +import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} +import edu.uci.ics.amber.workflow.OutputPort.OutputMode +import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SET_SNAPSHOT +import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} + +object SpecialPhysicalOpFactory { + def newSinkPhysicalOp(workflowIdentity: WorkflowIdentity, executionIdentity: ExecutionIdentity, storageKey: String, outputMode: OutputMode, isMaterialization: Boolean = false): PhysicalOp = PhysicalOp.localPhysicalOp( + workflowIdentity, + executionIdentity, + OperatorIdentity("sink_" + storageKey), + OpExecInitInfo( + (idx, workers) => + new ProgressiveSinkOpExec( + outputMode, + storageKey, + workflowIdentity + ) + ) + ) + .withInputPorts(List(InputPort(PortIdentity()))) + .withOutputPorts(List(OutputPort(PortIdentity()))) + .withPropagateSchema( + SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { + // Get the first schema from inputSchemas + val inputSchema = inputSchemas.values.head + + // Define outputSchema based on outputMode + val outputSchema = if (outputMode == SET_SNAPSHOT) { + if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { + // input is insert/retract delta: remove the flag column in the output + Schema.builder() + .add(inputSchema) + .remove(ProgressiveUtils.insertRetractFlagAttr.getName) + .build() + } else { + // input is insert-only delta: output schema is the same as input schema + inputSchema + } + } else { + // SET_DELTA: output schema is the same as input schema + inputSchema + } + + // Create a Scala immutable Map + Map(PortIdentity() -> outputSchema) + }) + ) +} From 9d670d81446e1e1891ca63ffb9d9ca4b0a739f4c Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:23:49 -0800 Subject: [PATCH 04/20] let ScheduleGenerator also use helper func to create sink physicalOp --- .../scheduling/ScheduleGenerator.scala | 57 +++++++------------ .../web/service/ExecutionResultService.scala | 8 ++- .../texera/workflow/WorkflowCompiler.scala | 6 +- .../core/storage/result/OpResultStorage.scala | 1 + .../operator/SpecialPhysicalOpFactory.scala | 4 +- 5 files changed, 33 insertions(+), 43 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 800d8344fd3..497be75409e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -3,17 +3,10 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{ - PhysicalOp, - PhysicalPlan, - SchemaPropagationFunc, - WorkflowContext -} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex -import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ - DefaultResourceAllocator, - ExecutionClusterInfo -} +import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo} +import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity} @@ -159,9 +152,7 @@ abstract class ScheduleGenerator( .removeLink(physicalLink) // create cache writer and link - val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get - val matWriterPhysicalOp: PhysicalOp = - createMatWriter(physicalLink, Array(matWriterInputSchema)) + val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink) val sourceToWriterLink = PhysicalLink( fromOp.id, @@ -173,6 +164,17 @@ abstract class ScheduleGenerator( .addOperator(matWriterPhysicalOp) .addLink(sourceToWriterLink) + // expect exactly one input port and one output port + val schema = newPhysicalPlan.getOperator(matWriterPhysicalOp.id).outputPorts(OutputPort().id)._3.toOption.get + ResultStorage + .getOpResultStorage(workflowContext.workflowId) + .create( + key = matWriterPhysicalOp.id.logicalOpId, + mode = OpResultStorage.defaultStorageMode, + schema = Some(schema) + ) + + // create cache reader and link val matReaderPhysicalOp: PhysicalOp = createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink) @@ -220,30 +222,15 @@ abstract class ScheduleGenerator( } private def createMatWriter( - physicalLink: PhysicalLink, - inputSchema: Array[Schema] - ): PhysicalOp = { - val matWriter = new ProgressiveSinkOpDesc() - matWriter.setContext(workflowContext) - matWriter.setOperatorId(s"materialized_${getMatIdFromPhysicalLink(physicalLink)}") - // expect exactly one input port and one output port - val schema = matWriter.getOutputSchema(inputSchema) - ResultStorage - .getOpResultStorage(workflowContext.workflowId) - .create( - key = matWriter.operatorIdentifier, - mode = OpResultStorage.defaultStorageMode, - schema = Some(schema) - ) - matWriter.setUpstreamId( - matWriter.operatorIdentifier - ) - - matWriter.getPhysicalOp( + physicalLink: PhysicalLink): PhysicalOp = { + val outputMode =physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode + val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}" + SpecialPhysicalOpFactory.newSinkPhysicalOp( workflowContext.workflowId, - workflowContext.executionId + workflowContext.executionId, + storageKey, + outputMode ) - } private def getMatIdFromPhysicalLink(physicalLink: PhysicalLink) = diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 3adc68027b4..bec361b69a2 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -289,14 +289,16 @@ class ExecutionResultService( ResultStorage .getOpResultStorage(workflowIdentity) .getAllKeys - .filter(opId => !opId.id.startsWith("materialized")) - .map(opId => { + .filter(storageKey => storageKey.id.startsWith("sink_")) + .map(storageKey => { val count = ResultStorage .getOpResultStorage(workflowIdentity) - .get(opId) + .get(storageKey) .getCount .toInt + val opId = OperatorIdentity(storageKey.id.substring(5)) + // use the first output port's mode val mode = physicalPlan .getPhysicalOpsOfLogicalOp(opId) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index b970fddb9f5..72b0670c424 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -78,7 +78,8 @@ class WorkflowCompiler( }).foreach({ case (physicalOp, (_, (outputPort, _, schema))) => val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = physicalOp.id.logicalOpId + val storageKey = OperatorIdentity("sink_" + physicalOp.id.logicalOpId.id) + // due to the size limit of single document in mongoDB (16MB) // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. val storageType = { @@ -111,8 +112,7 @@ class WorkflowCompiler( context.workflowId, context.executionId, storageKey.id, - outputPort.mode, - isMaterialization = false + outputPort.mode ) val sinkLink = PhysicalLink(physicalOp.id, outputPort.id, sinkPhysicalOp.id, PortIdentity()) physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index 03ba5c58f3d..ab69d6f94d2 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -56,6 +56,7 @@ class OpResultStorage extends Serializable with LazyLogging { mode: String, schema: Option[Schema] = None ): VirtualDocument[Tuple] = { + val storage: VirtualDocument[Tuple] = if (mode == "memory") { new MemoryDocument[Tuple](key.id) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index e4a393cd57b..4d88c475717 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -11,10 +11,10 @@ import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SET_SNAPSHOT import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} object SpecialPhysicalOpFactory { - def newSinkPhysicalOp(workflowIdentity: WorkflowIdentity, executionIdentity: ExecutionIdentity, storageKey: String, outputMode: OutputMode, isMaterialization: Boolean = false): PhysicalOp = PhysicalOp.localPhysicalOp( + def newSinkPhysicalOp(workflowIdentity: WorkflowIdentity, executionIdentity: ExecutionIdentity, storageKey: String, outputMode: OutputMode): PhysicalOp = PhysicalOp.localPhysicalOp( workflowIdentity, executionIdentity, - OperatorIdentity("sink_" + storageKey), + OperatorIdentity(storageKey), OpExecInitInfo( (idx, workers) => new ProgressiveSinkOpExec( From 69674b978d4ef232b1a47dac7fdd864c39d74582 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:40:16 -0800 Subject: [PATCH 05/20] remove sink desc --- .../scheduling/ScheduleGenerator.scala | 4 +- .../ics/amber/compiler/WorkflowCompiler.scala | 13 +- .../util/SinkInjectionTransformer.scala | 73 -------- .../uci/ics/amber/operator/LogicalOp.scala | 2 - .../ics/amber/operator/TestOperators.scala | 11 +- .../sink/managed/ProgressiveSinkOpDesc.java | 158 ------------------ 6 files changed, 6 insertions(+), 255 deletions(-) delete mode 100644 core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/util/SinkInjectionTransformer.scala delete mode 100644 core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 497be75409e..06c637d9135 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -2,14 +2,12 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} -import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo} import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec -import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity} +import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink} import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala index 6f32150311b..c77fa5536ab 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala @@ -63,7 +63,7 @@ object WorkflowCompiler { errorList: ArrayBuffer[(OperatorIdentity, Throwable)] // Mandatory error list ): Map[OperatorIdentity, List[Option[Schema]]] = { val physicalInputSchemas = - physicalPlan.operators.filter(op => !op.isSinkOperator).map { physicalOp => + physicalPlan.operators.map { physicalOp => // Process inputPorts and capture Throwable values in the errorList physicalOp.id -> physicalOp.inputPorts.values .filterNot(_._1.id.internal) @@ -164,19 +164,14 @@ class WorkflowCompiler( val errorList = new ArrayBuffer[(OperatorIdentity, Throwable)]() var opIdToInputSchema: Map[OperatorIdentity, List[Option[Schema]]] = Map() // 1. convert the pojo to logical plan - var logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) + val logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo) - // 2. Manipulate logical plan by: - // - inject sink - logicalPlan = SinkInjectionTransformer.transform( - logicalPlanPojo.opsToViewResult, - logicalPlan - ) // - resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(Some(errorList)) - // 3. expand the logical plan to the physical plan, + // 3. expand the logical plan to the physical plan val physicalPlan = expandLogicalPlan(logicalPlan, Some(errorList)) + if (errorList.isEmpty) { // no error during the expansion, then do: // - collect the input schema for each op diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/util/SinkInjectionTransformer.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/util/SinkInjectionTransformer.scala deleted file mode 100644 index 75d952590c0..00000000000 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/util/SinkInjectionTransformer.scala +++ /dev/null @@ -1,73 +0,0 @@ -package edu.uci.ics.amber.compiler.util - -import edu.uci.ics.amber.compiler.model.LogicalPlan -import edu.uci.ics.amber.operator.sink.SinkOpDesc -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.amber.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.workflow.PortIdentity - -object SinkInjectionTransformer { - - def transform(opsToViewResult: List[String], oldPlan: LogicalPlan): LogicalPlan = { - var logicalPlan = oldPlan - - // for any terminal operator without a sink, add a sink - val nonSinkTerminalOps = logicalPlan.getTerminalOperatorIds.filter(opId => - !logicalPlan.getOperator(opId).isInstanceOf[SinkOpDesc] - ) - // for any operators marked as view result without a sink, add a sink - val viewResultOps = opsToViewResult - .map(idString => OperatorIdentity(idString)) - .filter(opId => !logicalPlan.getDownstreamOps(opId).exists(op => op.isInstanceOf[SinkOpDesc])) - - val operatorsToAddSink = (nonSinkTerminalOps ++ viewResultOps).toSet - operatorsToAddSink.foreach(opId => { - val op = logicalPlan.getOperator(opId) - op.operatorInfo.outputPorts.foreach(outPort => { - val sink = new ProgressiveSinkOpDesc() - sink.setOperatorId("sink_" + opId.id) - logicalPlan = logicalPlan - .addOperator(sink) - .addLink( - op.operatorIdentifier, - outPort.id, - sink.operatorIdentifier, - toPortId = PortIdentity() - ) - }) - }) - - // check precondition: all the terminal operators should be sinks - assert( - logicalPlan.getTerminalOperatorIds.forall(o => - logicalPlan.getOperator(o).isInstanceOf[SinkOpDesc] - ) - ) - - // for each sink: - // set the corresponding upstream ID and port - // set output mode based on the visualization operator before it - logicalPlan.getTerminalOperatorIds.foreach(sinkOpId => { - val sinkOp = logicalPlan.getOperator(sinkOpId).asInstanceOf[ProgressiveSinkOpDesc] - val upstream = logicalPlan.getUpstreamOps(sinkOpId).headOption - val edge = logicalPlan.links.find(l => - l.fromOpId == upstream.map(_.operatorIdentifier).orNull - && l.toOpId == sinkOpId - ) - assert(upstream.nonEmpty) - if (upstream.nonEmpty && edge.nonEmpty) { - // set upstream ID and port - sinkOp.setUpstreamId(upstream.get.operatorIdentifier) - sinkOp.setUpstreamPort(edge.get.fromPortId.id) - - // set output mode for visualization operator - val outputPort = - upstream.get.operatorInfo.outputPorts.find(port => port.id == edge.get.fromPortId).get - sinkOp.setOutputMode(outputPort.mode) - } - }) - - logicalPlan - } - -} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index ac88f098cd3..925fc5314a0 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -37,7 +37,6 @@ import edu.uci.ics.amber.operator.randomksampling.RandomKSamplingOpDesc import edu.uci.ics.amber.operator.regex.RegexOpDesc import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc import edu.uci.ics.amber.operator.sentiment.SentimentAnalysisOpDesc -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc import edu.uci.ics.amber.operator.sklearn.{ SklearnAdaptiveBoostingOpDesc, SklearnBaggingOpDesc, @@ -158,7 +157,6 @@ trait StateTransferFunc value = classOf[TwitterSearchSourceOpDesc], name = "TwitterSearch" ), - new Type(value = classOf[ProgressiveSinkOpDesc], name = "SimpleSink"), new Type(value = classOf[CandlestickChartOpDesc], name = "CandlestickChart"), new Type(value = classOf[SplitOpDesc], name = "Split"), new Type(value = classOf[ContourPlotOpDesc], name = "ContourPlot"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala index 890eed672f5..5d49ec9768f 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala @@ -1,14 +1,9 @@ package edu.uci.ics.amber.operator import edu.uci.ics.amber.core.storage.FileResolver -import edu.uci.ics.amber.operator.aggregate.{ - AggregateOpDesc, - AggregationFunction, - AggregationOperation -} +import edu.uci.ics.amber.operator.aggregate.{AggregateOpDesc, AggregationFunction, AggregationOperation} import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc -import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc import edu.uci.ics.amber.operator.source.scan.json.JSONLScanSourceOpDesc import edu.uci.ics.amber.operator.source.sql.asterixdb.AsterixDBSourceOpDesc @@ -144,10 +139,6 @@ object TestOperators { asterixDBOp } - def sinkOpDesc(): ProgressiveSinkOpDesc = { - new ProgressiveSinkOpDesc() - } - def pythonOpDesc(): PythonUDFOpDescV2 = { val udf = new PythonUDFOpDescV2() udf.workers = 1 diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java deleted file mode 100644 index 00ce71efa4a..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpDesc.java +++ /dev/null @@ -1,158 +0,0 @@ -package edu.uci.ics.amber.operator.sink.managed; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.base.Preconditions; -import edu.uci.ics.amber.core.executor.OpExecInitInfo; -import edu.uci.ics.amber.core.executor.OperatorExecutor; -import edu.uci.ics.amber.core.tuple.Schema; -import edu.uci.ics.amber.core.workflow.PhysicalOp; -import edu.uci.ics.amber.core.workflow.SchemaPropagationFunc; -import edu.uci.ics.amber.operator.metadata.OperatorGroupConstants; -import edu.uci.ics.amber.operator.metadata.OperatorInfo; -import edu.uci.ics.amber.operator.sink.ProgressiveUtils; -import edu.uci.ics.amber.operator.sink.SinkOpDesc; -import edu.uci.ics.amber.operator.util.OperatorDescriptorUtils; -import edu.uci.ics.amber.virtualidentity.ExecutionIdentity; -import edu.uci.ics.amber.virtualidentity.OperatorIdentity; -import edu.uci.ics.amber.virtualidentity.WorkflowIdentity; -import edu.uci.ics.amber.workflow.InputPort; -import edu.uci.ics.amber.workflow.OutputPort; -import edu.uci.ics.amber.workflow.PortIdentity; -import scala.Option; -import scala.Tuple2; -import scala.collection.immutable.Map; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.function.Function; - - -import static java.util.Collections.singletonList; -import static scala.jdk.javaapi.CollectionConverters.asScala; - -public class ProgressiveSinkOpDesc extends SinkOpDesc { - - // use SET_SNAPSHOT as the default output mode - // this will be set internally by the workflow compiler - @JsonIgnore - private OutputPort.OutputMode outputMode = OutputPort.OutputMode$.MODULE$.fromValue(0); - - - // corresponding upstream operator ID and output port, will be set by workflow compiler - @JsonIgnore - private Option upstreamId = Option.empty(); - - @JsonIgnore - private Option upstreamPort = Option.empty(); - - @Override - public PhysicalOp getPhysicalOp(WorkflowIdentity workflowId, ExecutionIdentity executionId) { - - return PhysicalOp.localPhysicalOp( - workflowId, - executionId, - operatorIdentifier(), - OpExecInitInfo.apply( - (Function, OperatorExecutor> & java.io.Serializable) - worker -> new edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec(outputMode, this.getUpstreamId().get().id(), workflowId) - ) - ) - .withInputPorts(this.operatorInfo().inputPorts()) - .withOutputPorts(this.operatorInfo().outputPorts()) - .withPropagateSchema( - SchemaPropagationFunc.apply((Function, Map> & Serializable) inputSchemas -> { - // Initialize a Java HashMap - java.util.Map javaMap = new java.util.HashMap<>(); - - Schema inputSchema = inputSchemas.values().head(); - - // SET_SNAPSHOT: - Schema outputSchema; - if (this.outputMode.equals(OutputPort.OutputMode$.MODULE$.fromValue(0))) { - if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr().getName())) { - // input is insert/retract delta: the flag column is removed in output - outputSchema = Schema.builder().add(inputSchema) - .remove(ProgressiveUtils.insertRetractFlagAttr().getName()).build(); - } else { - // input is insert-only delta: output schema is the same as input schema - outputSchema = inputSchema; - } - } else { - // SET_DELTA: output schema is always the same as input schema - outputSchema = inputSchema; - } - - javaMap.put(operatorInfo().outputPorts().head().id(), outputSchema); - // Convert the Java Map to a Scala immutable Map - return OperatorDescriptorUtils.toImmutableMap(javaMap); - }) - ); - } - - @Override - public OperatorInfo operatorInfo() { - return new OperatorInfo( - "View Results", - "View the results", - OperatorGroupConstants.UTILITY_GROUP(), - asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), - false, - false, - false, - false); - } - - @Override - public Schema getOutputSchema(Schema[] schemas) { - Preconditions.checkArgument(schemas.length == 1); - Schema inputSchema = schemas[0]; - - // SET_SNAPSHOT: - if (this.outputMode.equals(OutputPort.OutputMode$.MODULE$.fromValue(0))) { - if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr().getName())) { - // input is insert/retract delta: the flag column is removed in output - return Schema.builder().add(inputSchema) - .remove(ProgressiveUtils.insertRetractFlagAttr().getName()).build(); - } else { - // input is insert-only delta: output schema is the same as input schema - return inputSchema; - } - } else { - // SET_DELTA: output schema is always the same as input schema - return inputSchema; - } - } - - @JsonIgnore - public OutputPort.OutputMode getOutputMode() { - return outputMode; - } - - @JsonIgnore - public void setOutputMode(OutputPort.OutputMode outputMode) { - this.outputMode = outputMode; - } - - @JsonIgnore - public Option getUpstreamId() { - return upstreamId; - } - - @JsonIgnore - public void setUpstreamId(OperatorIdentity upstreamId) { - this.upstreamId = Option.apply(upstreamId); - } - - @JsonIgnore - public Option getUpstreamPort() { - return upstreamPort; - } - - @JsonIgnore - public void setUpstreamPort(Integer upstreamPort) { - this.upstreamPort = Option.apply(upstreamPort); - } - - -} From 94612951188c6264b1346843635a1a8be8340b67 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:41:53 -0800 Subject: [PATCH 06/20] remove sink desc --- .../edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala | 3 +-- .../scala/edu/uci/ics/amber/operator/sink/SinkOpDesc.scala | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) delete mode 100644 core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/SinkOpDesc.scala diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala index c680177e28c..cb77ae266aa 100644 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/SchemaPropagationSpec.scala @@ -4,7 +4,6 @@ import edu.uci.ics.amber.core.tuple.{AttributeType, Schema} import edu.uci.ics.amber.core.workflow.{PhysicalOp, WorkflowContext} import edu.uci.ics.amber.operator.LogicalOp import edu.uci.ics.amber.operator.metadata.OperatorInfo -import edu.uci.ics.amber.operator.sink.SinkOpDesc import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} @@ -24,7 +23,7 @@ class SchemaPropagationSpec extends AnyFlatSpec with BeforeAndAfter { OperatorInfo("", "", "", List(InputPort()), List(OutputPort())) } - private class TempTestSinkOpDesc extends SinkOpDesc { + private class TempTestSinkOpDesc extends LogicalOp { override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/SinkOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/SinkOpDesc.scala deleted file mode 100644 index da190bcb7e4..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/SinkOpDesc.scala +++ /dev/null @@ -1,5 +0,0 @@ -package edu.uci.ics.amber.operator.sink - -import edu.uci.ics.amber.operator.LogicalOp - -abstract class SinkOpDesc extends LogicalOp From 2a24e40435a242f2f0e5bf7210f2c8b8a550806c Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:46:32 -0800 Subject: [PATCH 07/20] fix format --- .../scheduling/ScheduleGenerator.scala | 26 +++-- .../web/service/ExecutionResultService.scala | 20 +++- .../workflow/SinkInjectionTransformer.scala | 1 + .../texera/workflow/WorkflowCompiler.scala | 104 ++++++++++-------- .../operator/SpecialPhysicalOpFactory.scala | 67 ++++++----- .../ics/amber/operator/TestOperators.scala | 6 +- 6 files changed, 137 insertions(+), 87 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 06c637d9135..af23816aa40 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -2,9 +2,17 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{ + PhysicalOp, + PhysicalPlan, + SchemaPropagationFunc, + WorkflowContext +} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex -import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo} +import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ + DefaultResourceAllocator, + ExecutionClusterInfo +} import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} @@ -163,7 +171,12 @@ abstract class ScheduleGenerator( .addLink(sourceToWriterLink) // expect exactly one input port and one output port - val schema = newPhysicalPlan.getOperator(matWriterPhysicalOp.id).outputPorts(OutputPort().id)._3.toOption.get + val schema = newPhysicalPlan + .getOperator(matWriterPhysicalOp.id) + .outputPorts(OutputPort().id) + ._3 + .toOption + .get ResultStorage .getOpResultStorage(workflowContext.workflowId) .create( @@ -172,7 +185,6 @@ abstract class ScheduleGenerator( schema = Some(schema) ) - // create cache reader and link val matReaderPhysicalOp: PhysicalOp = createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink) @@ -219,9 +231,9 @@ abstract class ScheduleGenerator( } - private def createMatWriter( - physicalLink: PhysicalLink): PhysicalOp = { - val outputMode =physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode + private def createMatWriter(physicalLink: PhysicalLink): PhysicalOp = { + val outputMode = + physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}" SpecialPhysicalOpFactory.newSinkPhysicalOp( workflowContext.workflowId, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index bec361b69a2..12ff2c72ceb 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,18 +5,32 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.StorageConfig -import edu.uci.ics.amber.core.storage.result.{MongoDocument, OperatorResultMetadata, ResultStorage, WorkflowResultStore} +import edu.uci.ics.amber.core.storage.result.{ + MongoDocument, + OperatorResultMetadata, + ResultStorage, + WorkflowResultStore +} import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError} -import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED, KILLED, RUNNING} +import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ + COMPLETED, + FAILED, + KILLED, + RUNNING +} import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.OutputPort.OutputMode import edu.uci.ics.texera.web.SubscriptionManager -import edu.uci.ics.texera.web.model.websocket.event.{PaginatedResultEvent, TexeraWebSocketEvent, WebResultUpdateEvent} +import edu.uci.ics.texera.web.model.websocket.event.{ + PaginatedResultEvent, + TexeraWebSocketEvent, + WebResultUpdateEvent +} import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala index e69de29bb2d..8b137891791 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/SinkInjectionTransformer.scala @@ -0,0 +1 @@ + diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 72b0670c424..6e0e24bdb56 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -4,7 +4,12 @@ import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{ + PhysicalOp, + PhysicalPlan, + SchemaPropagationFunc, + WorkflowContext +} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory @@ -71,52 +76,59 @@ class WorkflowCompiler( }) // assign the view results - subPlan.topologicalIterator().map(subPlan.getOperator).flatMap { physicalOp => - physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) - }.filter({ - case (physicalOp, (_, (outputPort, _, _))) => toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal - }).foreach({ - case (physicalOp, (_, (outputPort, _, schema))) => - val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = OperatorIdentity("sink_" + physicalOp.id.logicalOpId.id) - - // due to the size limit of single document in mongoDB (16MB) - // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. - val storageType = { - if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY - else OpResultStorage.defaultStorageMode - } - if (!storage.contains(storageKey)) { - // get the schema for result storage in certain mode - val sinkStorageSchema: Option[Schema] = - if (storageType == OpResultStorage.MONGODB) { - // use the output schema on the first output port as the schema for storage - Some(schema.right.get) - } else { - None - } - storage.create( - s"${context.executionId}_", - storageKey, - storageType, - sinkStorageSchema + subPlan + .topologicalIterator() + .map(subPlan.getOperator) + .flatMap { physicalOp => + physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) + } + .filter({ + case (physicalOp, (_, (outputPort, _, _))) => + toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal + }) + .foreach({ + case (physicalOp, (_, (outputPort, _, schema))) => + val storage = ResultStorage.getOpResultStorage(context.workflowId) + val storageKey = OperatorIdentity("sink_" + physicalOp.id.logicalOpId.id) + + // due to the size limit of single document in mongoDB (16MB) + // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. + val storageType = { + if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY + else OpResultStorage.defaultStorageMode + } + if (!storage.contains(storageKey)) { + // get the schema for result storage in certain mode + val sinkStorageSchema: Option[Schema] = + if (storageType == OpResultStorage.MONGODB) { + // use the output schema on the first output port as the schema for storage + Some(schema.right.get) + } else { + None + } + storage.create( + s"${context.executionId}_", + storageKey, + storageType, + sinkStorageSchema + ) + // add the sink collection name to the JSON array of sinks + val storageNode = objectMapper.createObjectNode() + storageNode.put("storageType", storageType) + storageNode.put("storageKey", s"${context.executionId}_$storageKey") + sinksPointers.add(storageNode) + } + + val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + context.workflowId, + context.executionId, + storageKey.id, + outputPort.mode ) - // add the sink collection name to the JSON array of sinks - val storageNode = objectMapper.createObjectNode() - storageNode.put("storageType", storageType) - storageNode.put("storageKey", s"${context.executionId}_$storageKey") - sinksPointers.add(storageNode) - } - - val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - context.workflowId, - context.executionId, - storageKey.id, - outputPort.mode - ) - val sinkLink = PhysicalLink(physicalOp.id, outputPort.id, sinkPhysicalOp.id, PortIdentity()) - physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) - }) + val sinkLink = + PhysicalLink(physicalOp.id, outputPort.id, sinkPhysicalOp.id, PortIdentity()) + physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) + }) } match { case Success(_) => diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 4d88c475717..541c1c97127 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -11,45 +11,52 @@ import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SET_SNAPSHOT import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} object SpecialPhysicalOpFactory { - def newSinkPhysicalOp(workflowIdentity: WorkflowIdentity, executionIdentity: ExecutionIdentity, storageKey: String, outputMode: OutputMode): PhysicalOp = PhysicalOp.localPhysicalOp( - workflowIdentity, - executionIdentity, - OperatorIdentity(storageKey), - OpExecInitInfo( - (idx, workers) => + def newSinkPhysicalOp( + workflowIdentity: WorkflowIdentity, + executionIdentity: ExecutionIdentity, + storageKey: String, + outputMode: OutputMode + ): PhysicalOp = + PhysicalOp + .localPhysicalOp( + workflowIdentity, + executionIdentity, + OperatorIdentity(storageKey), + OpExecInitInfo((idx, workers) => new ProgressiveSinkOpExec( outputMode, storageKey, workflowIdentity ) + ) ) - ) - .withInputPorts(List(InputPort(PortIdentity()))) - .withOutputPorts(List(OutputPort(PortIdentity()))) - .withPropagateSchema( - SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { - // Get the first schema from inputSchemas - val inputSchema = inputSchemas.values.head + .withInputPorts(List(InputPort(PortIdentity()))) + .withOutputPorts(List(OutputPort(PortIdentity()))) + .withPropagateSchema( + SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { + // Get the first schema from inputSchemas + val inputSchema = inputSchemas.values.head - // Define outputSchema based on outputMode - val outputSchema = if (outputMode == SET_SNAPSHOT) { - if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { - // input is insert/retract delta: remove the flag column in the output - Schema.builder() - .add(inputSchema) - .remove(ProgressiveUtils.insertRetractFlagAttr.getName) - .build() + // Define outputSchema based on outputMode + val outputSchema = if (outputMode == SET_SNAPSHOT) { + if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { + // input is insert/retract delta: remove the flag column in the output + Schema + .builder() + .add(inputSchema) + .remove(ProgressiveUtils.insertRetractFlagAttr.getName) + .build() + } else { + // input is insert-only delta: output schema is the same as input schema + inputSchema + } } else { - // input is insert-only delta: output schema is the same as input schema + // SET_DELTA: output schema is the same as input schema inputSchema } - } else { - // SET_DELTA: output schema is the same as input schema - inputSchema - } - // Create a Scala immutable Map - Map(PortIdentity() -> outputSchema) - }) - ) + // Create a Scala immutable Map + Map(PortIdentity() -> outputSchema) + }) + ) } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala index 5d49ec9768f..bf03e272577 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/TestOperators.scala @@ -1,7 +1,11 @@ package edu.uci.ics.amber.operator import edu.uci.ics.amber.core.storage.FileResolver -import edu.uci.ics.amber.operator.aggregate.{AggregateOpDesc, AggregationFunction, AggregationOperation} +import edu.uci.ics.amber.operator.aggregate.{ + AggregateOpDesc, + AggregationFunction, + AggregationOperation +} import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc From 8f05a4b7a0dd6503f236c78756cfe0782d621b6b Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:49:32 -0800 Subject: [PATCH 08/20] update mode --- .../operator/SpecialPhysicalOpFactory.scala | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 541c1c97127..6b630108941 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -7,7 +7,7 @@ import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.OutputPort.OutputMode -import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SET_SNAPSHOT +import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_DELTA, SET_SNAPSHOT, SINGLE_SNAPSHOT} import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} object SpecialPhysicalOpFactory { @@ -38,21 +38,23 @@ object SpecialPhysicalOpFactory { val inputSchema = inputSchemas.values.head // Define outputSchema based on outputMode - val outputSchema = if (outputMode == SET_SNAPSHOT) { - if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { - // input is insert/retract delta: remove the flag column in the output - Schema - .builder() - .add(inputSchema) - .remove(ProgressiveUtils.insertRetractFlagAttr.getName) - .build() - } else { - // input is insert-only delta: output schema is the same as input schema + val outputSchema = outputMode match { + case SET_SNAPSHOT | SINGLE_SNAPSHOT => + if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { + // with insert/retract delta: remove the flag column + Schema + .builder() + .add(inputSchema) + .remove(ProgressiveUtils.insertRetractFlagAttr.getName) + .build() + } else { + // with insert-only delta: output schema is the same as input schema + inputSchema + } + + case SET_DELTA => + // output schema is the same as input schema inputSchema - } - } else { - // SET_DELTA: output schema is the same as input schema - inputSchema } // Create a Scala immutable Map From 65c626e51e90c6256b95bfcff70f1d98a597f20a Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:51:55 -0800 Subject: [PATCH 09/20] update mode --- .../scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala index c77fa5536ab..8a23c681ef7 100644 --- a/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala +++ b/core/workflow-compiling-service/src/main/scala/edu/uci/ics/amber/compiler/WorkflowCompiler.scala @@ -6,8 +6,8 @@ import edu.uci.ics.amber.compiler.WorkflowCompiler.{ collectInputSchemaFromPhysicalPlan, convertErrorListToWorkflowFatalErrorMap } + import edu.uci.ics.amber.compiler.model.{LogicalPlan, LogicalPlanPojo} -import edu.uci.ics.amber.compiler.util.SinkInjectionTransformer import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.virtualidentity.OperatorIdentity From 09441d2b990fcc2498302757db026cdac7254737 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 13:45:37 -0800 Subject: [PATCH 10/20] fix comments --- .../ics/texera/web/service/ExecutionResultService.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 12ff2c72ceb..f0d162a99ad 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -241,7 +241,7 @@ class ExecutionResultService( if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") { val opStorage = ResultStorage .getOpResultStorage(workflowIdentity) - .get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId) + .get(opId) opStorage match { case mongoDocument: MongoDocument[Tuple] => val tableCatStats = mongoDocument.getCategoricalStats @@ -267,8 +267,9 @@ class ExecutionResultService( }) ) -// // first clear all the results -// sinkOperators.clear() + // first clear all the results + ResultStorage.getOpResultStorage(workflowIdentity).clear() + workflowStateStore.resultStore.updateState { _ => WorkflowResultStore() // empty result store } From 416fdbd0df938c4781af160a8991b31f7de95ce6 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:05:59 -0800 Subject: [PATCH 11/20] Update ExecutionResultService.scala --- .../uci/ics/texera/web/service/ExecutionResultService.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index f0d162a99ad..4f2933d0b1f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -267,9 +267,7 @@ class ExecutionResultService( }) ) - // first clear all the results - ResultStorage.getOpResultStorage(workflowIdentity).clear() - + // clear all the result metadata workflowStateStore.resultStore.updateState { _ => WorkflowResultStore() // empty result store } From a74ea4d102f6fd188a23c1bc53bc4693bb808783 Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:38:27 -0800 Subject: [PATCH 12/20] fix the opId issue --- .../web/service/ExecutionResultService.scala | 3 +-- .../texera/workflow/WorkflowCompiler.scala | 22 +++++++++---------- .../operator/SpecialPhysicalOpFactory.scala | 13 +++++++---- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 4f2933d0b1f..b0363956f31 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -302,7 +302,6 @@ class ExecutionResultService( ResultStorage .getOpResultStorage(workflowIdentity) .getAllKeys - .filter(storageKey => storageKey.id.startsWith("sink_")) .map(storageKey => { val count = ResultStorage .getOpResultStorage(workflowIdentity) @@ -310,7 +309,7 @@ class ExecutionResultService( .getCount .toInt - val opId = OperatorIdentity(storageKey.id.substring(5)) + val opId = storageKey // use the first output port's mode val mode = physicalPlan diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 6e0e24bdb56..88ad5b7fbb0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -1,22 +1,15 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{ - PhysicalOp, - PhysicalPlan, - SchemaPropagationFunc, - WorkflowContext -} +import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory -import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_SNAPSHOT, SINGLE_SNAPSHOT} -import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} +import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SINGLE_SNAPSHOT +import edu.uci.ics.amber.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService @@ -89,7 +82,7 @@ class WorkflowCompiler( .foreach({ case (physicalOp, (_, (outputPort, _, schema))) => val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = OperatorIdentity("sink_" + physicalOp.id.logicalOpId.id) + val storageKey = physicalOp.id.logicalOpId // due to the size limit of single document in mongoDB (16MB) // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. @@ -126,7 +119,12 @@ class WorkflowCompiler( outputPort.mode ) val sinkLink = - PhysicalLink(physicalOp.id, outputPort.id, sinkPhysicalOp.id, PortIdentity()) + PhysicalLink( + physicalOp.id, + outputPort.id, + sinkPhysicalOp.id, + PortIdentity(internal = true) + ) physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) }) } match { diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 6b630108941..a3ba3faae41 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -5,7 +5,12 @@ import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} +import edu.uci.ics.amber.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + PhysicalOpIdentity, + WorkflowIdentity +} import edu.uci.ics.amber.workflow.OutputPort.OutputMode import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_DELTA, SET_SNAPSHOT, SINGLE_SNAPSHOT} import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} @@ -19,9 +24,9 @@ object SpecialPhysicalOpFactory { ): PhysicalOp = PhysicalOp .localPhysicalOp( + PhysicalOpIdentity(OperatorIdentity(storageKey), "sink"), workflowIdentity, executionIdentity, - OperatorIdentity(storageKey), OpExecInitInfo((idx, workers) => new ProgressiveSinkOpExec( outputMode, @@ -30,8 +35,8 @@ object SpecialPhysicalOpFactory { ) ) ) - .withInputPorts(List(InputPort(PortIdentity()))) - .withOutputPorts(List(OutputPort(PortIdentity()))) + .withInputPorts(List(InputPort(PortIdentity(internal = true)))) + .withOutputPorts(List(OutputPort(PortIdentity(internal = true)))) .withPropagateSchema( SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { // Get the first schema from inputSchemas From 3dbfc33b8292bf6058a69becebb5b147a3c2cf25 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 14:36:40 -0800 Subject: [PATCH 13/20] Revert "fix comments" This reverts commit 09441d2b990fcc2498302757db026cdac7254737. --- .../edu/uci/ics/texera/web/service/ExecutionResultService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index b0363956f31..338bb12760d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -241,7 +241,7 @@ class ExecutionResultService( if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") { val opStorage = ResultStorage .getOpResultStorage(workflowIdentity) - .get(opId) + .get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId) opStorage match { case mongoDocument: MongoDocument[Tuple] => val tableCatStats = mongoDocument.getCategoricalStats From dffc7026f8f348cd9aa5dd302b6ff4744e172b8e Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:55:56 -0800 Subject: [PATCH 14/20] fix storage key issue --- .../uci/ics/texera/web/service/ExecutionResultService.scala | 1 + .../scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala | 2 +- .../edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 338bb12760d..a0b09ff3fd1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -302,6 +302,7 @@ class ExecutionResultService( ResultStorage .getOpResultStorage(workflowIdentity) .getAllKeys + .filter(!_.id.startsWith("materialized_")) .map(storageKey => { val count = ResultStorage .getOpResultStorage(workflowIdentity) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 88ad5b7fbb0..739a988c4b6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -123,7 +123,7 @@ class WorkflowCompiler( physicalOp.id, outputPort.id, sinkPhysicalOp.id, - PortIdentity(internal = true) + PortIdentity() ) physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) }) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index a3ba3faae41..d17a99744d7 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -35,8 +35,8 @@ object SpecialPhysicalOpFactory { ) ) ) - .withInputPorts(List(InputPort(PortIdentity(internal = true)))) - .withOutputPorts(List(OutputPort(PortIdentity(internal = true)))) + .withInputPorts(List(InputPort(PortIdentity()))) + .withOutputPorts(List(OutputPort(PortIdentity()))) .withPropagateSchema( SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { // Get the first schema from inputSchemas From ad5901955f693532dd0d8c383b3f08e656693058 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:31:07 -0800 Subject: [PATCH 15/20] fix --- .../engine/architecture/scheduling/ScheduleGenerator.scala | 2 +- .../edu/uci/ics/texera/workflow/WorkflowCompiler.scala | 2 +- .../uci/ics/amber/operator/SpecialPhysicalOpFactory.scala | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index af23816aa40..121be2289b1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -173,7 +173,7 @@ abstract class ScheduleGenerator( // expect exactly one input port and one output port val schema = newPhysicalPlan .getOperator(matWriterPhysicalOp.id) - .outputPorts(OutputPort().id) + .outputPorts(matWriterPhysicalOp.outputPorts.keys.head) ._3 .toOption .get diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 739a988c4b6..88ad5b7fbb0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -123,7 +123,7 @@ class WorkflowCompiler( physicalOp.id, outputPort.id, sinkPhysicalOp.id, - PortIdentity() + PortIdentity(internal = true) ) physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) }) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index d17a99744d7..0d01d3d6dbc 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -35,8 +35,8 @@ object SpecialPhysicalOpFactory { ) ) ) - .withInputPorts(List(InputPort(PortIdentity()))) - .withOutputPorts(List(OutputPort(PortIdentity()))) + .withInputPorts(List(InputPort(PortIdentity(internal = true)))) + .withOutputPorts(List(OutputPort(PortIdentity(internal = true)))) .withPropagateSchema( SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => { // Get the first schema from inputSchemas @@ -63,7 +63,7 @@ object SpecialPhysicalOpFactory { } // Create a Scala immutable Map - Map(PortIdentity() -> outputSchema) + Map(PortIdentity(internal = true) -> outputSchema) }) ) } From 4e6bc2ebe194a7de9b59bdd5760d572c378155c4 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 20 Dec 2024 07:50:53 -0800 Subject: [PATCH 16/20] fix test --- .../CostBasedScheduleGeneratorSpec.scala | 24 +-- ...ExpansionGreedyScheduleGeneratorSpec.scala | 57 +------ .../engine/e2e/BatchSizePropagationSpec.scala | 60 ++----- .../amber/engine/e2e/DataProcessingSpec.scala | 159 ++++-------------- .../uci/ics/amber/engine/e2e/PauseSpec.scala | 29 +--- .../faulttolerance/CheckpointSpec.scala | 9 +- 6 files changed, 68 insertions(+), 270 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala index be180b7cba3..d67524555e7 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala @@ -11,17 +11,15 @@ import org.scalatest.flatspec.AnyFlatSpec class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { - "CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join->sink workflow" in { + "CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, keywordOpDesc, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -41,12 +39,6 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -106,17 +98,15 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { } - "CostBasedRegionPlanGenerator" should "finish top-down search using different pruning techniques with correct number of states explored in csv->->filter->join->sink workflow" in { + "CostBasedRegionPlanGenerator" should "finish top-down search using different pruning techniques with correct number of states explored in csv->->filter->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, keywordOpDesc, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -136,12 +126,6 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala index ea8c3c96a51..c28c3265a20 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala @@ -13,24 +13,17 @@ import org.scalatest.flatspec.AnyFlatSpec class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { - "RegionPlanGenerator" should "correctly find regions in headerlessCsv->keyword->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in headerlessCsv->keyword workflow" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, keywordOpDesc, sink), + List(headerlessCsvOpDesc, keywordOpDesc), List( LogicalLink( headerlessCsvOpDesc.operatorIdentifier, PortIdentity(0), keywordOpDesc.operatorIdentifier, PortIdentity(0) - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(0), - sink.operatorIdentifier, - PortIdentity(0) ) ), new WorkflowContext() @@ -61,17 +54,15 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } } - "RegionPlanGenerator" should "correctly find regions in csv->(csv->)->join->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in csv->(csv->)->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, headerlessCsvOpDesc2, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -85,12 +76,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -140,17 +125,15 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } - "RegionPlanGenerator" should "correctly find regions in csv->->filter->join->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in csv->->filter->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, keywordOpDesc, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -170,12 +153,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -206,19 +183,17 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } } // - "RegionPlanGenerator" should "correctly find regions in buildcsv->probecsv->hashjoin->hashjoin->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in buildcsv->probecsv->hashjoin->hashjoin workflow" in { val buildCsv = TestOperators.headerlessSmallCsvScanOpDesc() val probeCsv = TestOperators.smallCsvScanOpDesc() val hashJoin1 = TestOperators.joinOpDesc("column-1", "Region") val hashJoin2 = TestOperators.joinOpDesc("column-2", "Country") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( buildCsv, probeCsv, hashJoin1, - hashJoin2, - sink + hashJoin2 ), List( LogicalLink( @@ -244,12 +219,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(), hashJoin2.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - hashJoin2.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -284,14 +253,12 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory val split = new SplitOpDesc() val training = new PythonUDFOpDescV2() val inference = new DualInputPortsPythonUDFOpDescV2() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( csv, split, training, - inference, - sink + inference ), List( LogicalLink( @@ -317,12 +284,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(1), inference.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - inference.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala index 1b9a90d978c..a62d37ca284 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -102,7 +102,7 @@ class BatchSizePropagationSpec } } - "Engine" should "propagate the correct batch size for headerlessCsv->sink workflow" in { + "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -110,18 +110,10 @@ class BatchSizePropagationSpec val context = new WorkflowContext(workflowSettings = customWorkflowSettings) val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, sink), - List( - LogicalLink( - headerlessCsvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(headerlessCsvOpDesc), + List(), context ) @@ -131,7 +123,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 1) } - "Engine" should "propagate the correct batch size for headerlessCsv->keyword->sink workflow" in { + "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -140,22 +132,15 @@ class BatchSizePropagationSpec val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, keywordOpDesc, sink), + List(headerlessCsvOpDesc, keywordOpDesc), List( LogicalLink( headerlessCsvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context @@ -167,7 +152,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 500) } - "Engine" should "propagate the correct batch size for csv->keyword->count->sink workflow" in { + "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -178,10 +163,9 @@ class BatchSizePropagationSpec val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val countOpDesc = TestOperators.aggregateAndGroupByDesc("Region", AggregationFunction.COUNT, List[String]()) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, countOpDesc, sink), + List(csvOpDesc, keywordOpDesc, countOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -194,12 +178,6 @@ class BatchSizePropagationSpec PortIdentity(), countOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - countOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context @@ -211,7 +189,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 100) } - "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy->sink workflow" in { + "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -226,10 +204,8 @@ class BatchSizePropagationSpec AggregationFunction.AVERAGE, List[String]("Country") ) - val sink = TestOperators.sinkOpDesc() - val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc, sink), + List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -242,12 +218,6 @@ class BatchSizePropagationSpec PortIdentity(), averageAndGroupByOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - averageAndGroupByOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context @@ -259,7 +229,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 300) } - "Engine" should "propagate the correct batch size for csv->(csv->)->join->sink workflow" in { + "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -269,14 +239,12 @@ class BatchSizePropagationSpec val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, headerlessCsvOpDesc2, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -290,12 +258,6 @@ class BatchSizePropagationSpec PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 11a63cbf2c6..f6c04cb826b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -123,67 +123,43 @@ class DataProcessingSpec ("localhost", config.getPort.toString, database, table, username, password) } - "Engine" should "execute headerlessCsv->sink workflow normally" in { + "Engine" should "execute headerlessCsv workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, sink), - List( - LogicalLink( - headerlessCsvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(headerlessCsvOpDesc), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) assert(results.size == 100) } - "Engine" should "execute headerlessMultiLineDataCsv-->sink workflow normally" in { + "Engine" should "execute headerlessMultiLineDataCsv workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallMultiLineDataCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, sink), - List( - LogicalLink( - headerlessCsvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(headerlessCsvOpDesc), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) assert(results.size == 100) } - "Engine" should "execute jsonl->sink workflow normally" in { + "Engine" should "execute jsonl workflow normally" in { val jsonlOp = TestOperators.smallJSONLScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(jsonlOp, sink), - List( - LogicalLink( - jsonlOp.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(jsonlOp), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) assert(results.size == 100) for (result <- results) { - val schema = result.asInstanceOf[Tuple].getSchema + val schema = result.getSchema assert(schema.getAttribute("id").getType == AttributeType.LONG) assert(schema.getAttribute("first_name").getType == AttributeType.STRING) assert(schema.getAttribute("flagged").getType == AttributeType.BOOLEAN) @@ -194,27 +170,19 @@ class DataProcessingSpec } - "Engine" should "execute mediumFlattenJsonl->sink workflow normally" in { + "Engine" should "execute mediumFlattenJsonl workflow normally" in { val jsonlOp = TestOperators.mediumFlattenJSONLScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(jsonlOp, sink), - List( - LogicalLink( - jsonlOp.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(jsonlOp), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) assert(results.size == 1000) for (result <- results) { - val schema = result.asInstanceOf[Tuple].getSchema + val schema = result.getSchema assert(schema.getAttribute("id").getType == AttributeType.LONG) assert(schema.getAttribute("first_name").getType == AttributeType.STRING) assert(schema.getAttribute("flagged").getType == AttributeType.BOOLEAN) @@ -225,24 +193,17 @@ class DataProcessingSpec } } - "Engine" should "execute headerlessCsv->keyword->sink workflow normally" in { + "Engine" should "execute headerlessCsv->keyword workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, keywordOpDesc, sink), + List(headerlessCsvOpDesc, keywordOpDesc), List( LogicalLink( headerlessCsvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -250,42 +211,27 @@ class DataProcessingSpec executeWorkflow(workflow) } - "Engine" should "execute csv->sink workflow normally" in { + "Engine" should "execute csv workflow normally" in { val csvOpDesc = TestOperators.smallCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, sink), - List( - LogicalLink( - csvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(csvOpDesc), + List(), workflowContext ) executeWorkflow(workflow) } - "Engine" should "execute csv->keyword->sink workflow normally" in { + "Engine" should "execute csv->keyword workflow normally" in { val csvOpDesc = TestOperators.smallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, sink), + List(csvOpDesc, keywordOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -298,9 +244,8 @@ class DataProcessingSpec val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val countOpDesc = TestOperators.aggregateAndGroupByDesc("Region", AggregationFunction.COUNT, List[String]()) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, countOpDesc, sink), + List(csvOpDesc, keywordOpDesc, countOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -313,12 +258,6 @@ class DataProcessingSpec PortIdentity(), countOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - countOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -326,7 +265,7 @@ class DataProcessingSpec executeWorkflow(workflow) } - "Engine" should "execute csv->keyword->averageAndGroupBy->sink workflow normally" in { + "Engine" should "execute csv->keyword->averageAndGroupBy workflow normally" in { val csvOpDesc = TestOperators.smallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val averageAndGroupByOpDesc = @@ -335,9 +274,8 @@ class DataProcessingSpec AggregationFunction.AVERAGE, List[String]("Country") ) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc, sink), + List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -350,12 +288,6 @@ class DataProcessingSpec PortIdentity(), averageAndGroupByOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - averageAndGroupByOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -363,17 +295,15 @@ class DataProcessingSpec executeWorkflow(workflow) } - "Engine" should "execute csv->(csv->)->join->sink workflow normally" in { + "Engine" should "execute csv->(csv->)->join workflow normally" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, headerlessCsvOpDesc2, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -387,12 +317,6 @@ class DataProcessingSpec PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -401,20 +325,17 @@ class DataProcessingSpec } // TODO: use mock data to perform the test, remove dependency on the real AsterixDB - // "Engine" should "execute asterixdb->sink workflow normally" in { + // "Engine" should "execute asterixdb workflow normally" in { // // val asterixDBOp = TestOperators.asterixDBSourceOpDesc() - // val sink = TestOperators.sinkOpDesc() // val (id, workflow) = buildWorkflow( - // List(asterixDBOp, sink), - // List( - // OperatorLink(OperatorPort(asterixDBOp.operatorIdentifier, 0), OperatorPort(sink.operatorIdentifier, 0)) - // ) + // List(asterixDBOp), + // List() // ) // executeWorkflow(id, workflow) // } - "Engine" should "execute mysql->sink workflow normally" in { + "Engine" should "execute mysql workflow normally" in { val (host, port, database, table, username, password) = initializeInMemoryMySQLInstance() val inMemoryMsSQLSourceOpDesc = TestOperators.inMemoryMySQLSourceOpDesc( host, @@ -425,17 +346,9 @@ class DataProcessingSpec password ) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(inMemoryMsSQLSourceOpDesc, sink), - List( - LogicalLink( - inMemoryMsSQLSourceOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(inMemoryMsSQLSourceOpDesc), + List(), workflowContext ) executeWorkflow(workflow) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index cc668fbc49c..014f3080b98 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -73,44 +73,29 @@ class PauseSpec Await.result(completion) } - "Engine" should "be able to pause csv->sink workflow" in { + "Engine" should "be able to pause csv workflow" in { val csvOpDesc = TestOperators.mediumCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() - logger.info(s"csv-id ${csvOpDesc.operatorIdentifier}, sink-id ${sink.operatorIdentifier}") + logger.info(s"csv-id ${csvOpDesc.operatorIdentifier}") shouldPause( - List(csvOpDesc, sink), - List( - LogicalLink( - csvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ) + List(csvOpDesc), + List() ) } - "Engine" should "be able to pause csv->keyword->sink workflow" in { + "Engine" should "be able to pause csv->keyword workflow" in { val csvOpDesc = TestOperators.mediumCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") - val sink = TestOperators.sinkOpDesc() logger.info( - s"csv-id ${csvOpDesc.operatorIdentifier}, keyword-id ${keywordOpDesc.operatorIdentifier}, sink-id ${sink.operatorIdentifier}" + s"csv-id ${csvOpDesc.operatorIdentifier}, keyword-id ${keywordOpDesc.operatorIdentifier}" ) shouldPause( - List(csvOpDesc, keywordOpDesc, sink), + List(csvOpDesc, keywordOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ) ) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala index d697a4946e6..6c694b989c1 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala @@ -24,21 +24,14 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { val resultStorage = new OpResultStorage() val csvOpDesc = TestOperators.mediumCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, sink), + List(csvOpDesc, keywordOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() From 64e4d44851db87cbe1f92104c9b6c8ca2dc945ac Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 20 Dec 2024 07:52:43 -0800 Subject: [PATCH 17/20] remove unused --- .../scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala index 95681eb7711..ce8070a1aa7 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -126,10 +126,6 @@ case class PhysicalPlan( } - def getTerminalPhysicalOpIds(): Set[PhysicalOpIdentity] = { - operators.filter(op => dag.outDegreeOf(op.id) == 0).map(_.id) - } - @JsonIgnore def getOutputPartitionInfo( link: PhysicalLink, From 69d82904f709671480f3a3091ee4bff1afe3528d Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 20 Dec 2024 07:54:10 -0800 Subject: [PATCH 18/20] add comment --- .../scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 88ad5b7fbb0..9144fa63d48 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -68,7 +68,7 @@ class WorkflowCompiler( } }) - // assign the view results + // assign the sinks to toAddSink operators' external output ports subPlan .topologicalIterator() .map(subPlan.getOperator) From b603b8da654793dbd74820eaaf3263c3f3dbc680 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:32:39 -0800 Subject: [PATCH 19/20] fix test --- .../ics/amber/engine/e2e/DataProcessingSpec.scala | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index f6c04cb826b..e56e0973e15 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -69,16 +69,8 @@ class DataProcessingSpec .registerCallback[ExecutionStateUpdate](evt => { if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds - .map(sinkOpId => - (sinkOpId, workflow.logicalPlan.getUpstreamOps(sinkOpId).head.operatorIdentifier) - ) - .filter { - case (_, upstreamOpId) => resultStorage.contains(upstreamOpId) - } - .map { - case (sinkOpId, upstreamOpId) => - (sinkOpId, resultStorage.get(upstreamOpId).get().toList) - } + .filter(terminalOpId => resultStorage.contains(terminalOpId)) + .map(terminalOpId => terminalOpId -> resultStorage.get(terminalOpId).get().toList) .toMap completion.setDone() } From 65ef08d1f659bd650f2c9f615578569256ae2fe8 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:04:03 -0800 Subject: [PATCH 20/20] revert some changes --- .../src/main/protobuf/edu/uci/ics/amber/workflow.proto | 1 - .../edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala | 2 ++ .../uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java | 2 +- .../ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala | 1 + .../uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java | 2 +- .../operator/udf/python/source/PythonUDFSourceOpDescV2.java | 2 +- .../edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java | 2 +- 7 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto b/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto index bc7a9e0e7e9..0ee4c68d36a 100644 --- a/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto +++ b/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto @@ -40,7 +40,6 @@ message OutputPort { string displayName = 2; bool blocking = 3; OutputMode mode = 4; - string storageUrl = 5; } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 0d01d3d6dbc..0024993166f 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -60,6 +60,8 @@ object SpecialPhysicalOpFactory { case SET_DELTA => // output schema is the same as input schema inputSchema + case _ => + throw new UnsupportedOperationException(s"Output mode $outputMode is not supported.") } // Create a Scala immutable Map diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java index 6c9c96703fa..ea35f40dee0 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/filter/SpecializedFilterOpDesc.java @@ -48,7 +48,7 @@ public OperatorInfo operatorInfo() { "Performs a filter operation", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), false, false, true, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala index a9408013d67..aaeababd685 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala @@ -28,6 +28,7 @@ class ProgressiveSinkOpExec( outputMode match { case OutputMode.SET_SNAPSHOT | OutputMode.SINGLE_SNAPSHOT => updateSetSnapshot(tuple) case OutputMode.SET_DELTA => writer.putOne(tuple) + case _ => throw new UnsupportedOperationException("Unsupported output mode") } } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java index 6324e279721..87d902f01f9 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/typecasting/TypeCastingOpDesc.java @@ -75,7 +75,7 @@ public OperatorInfo operatorInfo() { "Cast between types", OperatorGroupConstants.CLEANING_GROUP(), asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList()).toSeq()))).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), false, false, false, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java index f4766f57b1a..944f171c397 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.java @@ -99,7 +99,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in Python script", OperatorGroupConstants.PYTHON_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), false, false, true, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java index 972574bcc98..2b5785d9468 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.java @@ -108,7 +108,7 @@ public OperatorInfo operatorInfo() { "User-defined function operator in R script", OperatorGroupConstants.R_GROUP(), asScala(new ArrayList()).toList(), - asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0), ""))).toList(), + asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(), false, false, false,