From 5b622e320725d75d5e18c29711339bb7bf2ce0e3 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 29 Dec 2024 22:23:37 -0800 Subject: [PATCH] Support multiple output ports with storage (#3175) Previously, all results were tied to logical operators. This PR modifies the engine to associate results with output ports, enabling better granularity and support for operators with multiple outputs. ### Key Change: StorageKey with PortIdentity The most significant update in this PR is the adjustment of the storage key format to include both the logical operator ID and the port ID. This ensures that logical operators with multiple output ports (e.g., Split) can have distinct storages created for each output port. For now, the frontend retrieves results from the default output port (port 0). In future updates, the frontend will be enhanced to support retrieving results from additional output ports, providing more flexibility in how results are accessed and displayed. --- .../messaginglayer/OutputManager.scala | 2 +- .../scheduling/ScheduleGenerator.scala | 92 +++-------- .../web/service/ExecutionResultService.scala | 45 +++-- .../web/service/ResultExportService.scala | 8 +- .../texera/workflow/WorkflowCompiler.scala | 104 ++++++------ .../amber/engine/e2e/DataProcessingSpec.scala | 12 +- .../core/storage/result/MongoDocument.scala | 23 +-- .../core/storage/result/OpResultStorage.scala | 154 ++++++++++++------ .../operator/SpecialPhysicalOpFactory.scala | 34 +++- .../sink/managed/ProgressiveSinkOpExec.scala | 2 +- .../source/cache/CacheSourceOpExec.scala | 8 +- 11 files changed, 257 insertions(+), 227 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 18f7c4974b4..d83eb0c1a57 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -192,7 +192,7 @@ class OutputManager( } def getSingleOutputPortIdentity: PortIdentity = { - assert(ports.size == 1) + assert(ports.size == 1, "expect 1 output port, got " + ports.size) ports.head._1 } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 121be2289b1..5728f197dbc 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,22 +1,15 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} -import edu.uci.ics.amber.core.workflow.{ - PhysicalOp, - PhysicalPlan, - SchemaPropagationFunc, - WorkflowContext -} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ DefaultResourceAllocator, ExecutionClusterInfo } import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory -import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec -import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink} +import edu.uci.ics.amber.virtualidentity.PhysicalOpIdentity +import edu.uci.ics.amber.workflow.PhysicalLink import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator @@ -119,9 +112,9 @@ abstract class ScheduleGenerator( physicalPlan .getLinksBetween(upstreamPhysicalOpId, physicalOpId) .filter(link => - !physicalPlan.getOperator(physicalOpId).isSinkOperator && (physicalPlan + !physicalPlan.getOperator(physicalOpId).isSinkOperator && physicalPlan .getOperator(physicalOpId) - .isInputLinkDependee(link)) + .isInputLinkDependee(link) ) } } @@ -158,7 +151,19 @@ abstract class ScheduleGenerator( .removeLink(physicalLink) // create cache writer and link - val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink) + val storageKey = OpResultStorage.createStorageKey( + physicalLink.fromOpId.logicalOpId, + physicalLink.fromPortId, + isMaterialized = true + ) + val fromPortOutputMode = + physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode + val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + workflowContext.workflowId, + workflowContext.executionId, + storageKey, + fromPortOutputMode + ) val sourceToWriterLink = PhysicalLink( fromOp.id, @@ -170,7 +175,7 @@ abstract class ScheduleGenerator( .addOperator(matWriterPhysicalOp) .addLink(sourceToWriterLink) - // expect exactly one input port and one output port + // sink has exactly one input port and one output port val schema = newPhysicalPlan .getOperator(matWriterPhysicalOp.id) .outputPorts(matWriterPhysicalOp.outputPorts.keys.head) @@ -180,14 +185,17 @@ abstract class ScheduleGenerator( ResultStorage .getOpResultStorage(workflowContext.workflowId) .create( - key = matWriterPhysicalOp.id.logicalOpId, + key = storageKey, mode = OpResultStorage.defaultStorageMode, - schema = Some(schema) + schema = schema ) // create cache reader and link - val matReaderPhysicalOp: PhysicalOp = - createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink) + val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp( + workflowContext.workflowId, + workflowContext.executionId, + storageKey + ) val readerToDestLink = PhysicalLink( matReaderPhysicalOp.id, @@ -201,52 +209,4 @@ abstract class ScheduleGenerator( .addOperator(matReaderPhysicalOp) .addLink(readerToDestLink) } - - private def createMatReader( - matWriterLogicalOpId: OperatorIdentity, - physicalLink: PhysicalLink - ): PhysicalOp = { - val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) - PhysicalOp - .sourcePhysicalOp( - workflowContext.workflowId, - workflowContext.executionId, - OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"), - OpExecInitInfo((_, _) => - new CacheSourceOpExec( - opResultStorage.get(matWriterLogicalOpId) - ) - ) - ) - .withInputPorts(List.empty) - .withOutputPorts(List(OutputPort())) - .withPropagateSchema( - SchemaPropagationFunc(_ => - Map( - OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get - ) - ) - ) - .propagateSchema() - - } - - private def createMatWriter(physicalLink: PhysicalLink): PhysicalOp = { - val outputMode = - physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode - val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}" - SpecialPhysicalOpFactory.newSinkPhysicalOp( - workflowContext.workflowId, - workflowContext.executionId, - storageKey, - outputMode - ) - } - - private def getMatIdFromPhysicalLink(physicalLink: PhysicalLink) = - s"${physicalLink.fromOpId.logicalOpId}_${physicalLink.fromOpId.layerName}_" + - s"${physicalLink.fromPortId.id}_" + - s"${physicalLink.toOpId.logicalOpId}_${physicalLink.toOpId.layerName}_" + - s"${physicalLink.toPortId.id}" - } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index a0b09ff3fd1..70a21bba475 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,12 +5,8 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.StorageConfig -import edu.uci.ics.amber.core.storage.result.{ - MongoDocument, - OperatorResultMetadata, - ResultStorage, - WorkflowResultStore -} +import edu.uci.ics.amber.core.storage.result.OpResultStorage.MONGODB +import edu.uci.ics.amber.core.storage.result._ import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError} @@ -25,6 +21,7 @@ import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataSt import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.OutputPort.OutputMode +import edu.uci.ics.amber.workflow.PortIdentity import edu.uci.ics.texera.web.SubscriptionManager import edu.uci.ics.texera.web.model.websocket.event.{ PaginatedResultEvent, @@ -89,7 +86,9 @@ object ExecutionResultService { } val storage = - ResultStorage.getOpResultStorage(workflowIdentity).get(physicalOps.head.id.logicalOpId) + ResultStorage + .getOpResultStorage(workflowIdentity) + .get(OpResultStorage.createStorageKey(physicalOps.head.id.logicalOpId, PortIdentity())) val webUpdate = webOutputMode match { case PaginationMode() => val numTuples = storage.getCount @@ -238,10 +237,12 @@ class ExecutionResultService( oldInfo.tupleCount, info.tupleCount ) - if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") { + if (StorageConfig.resultStorageMode == MONGODB) { + // using the first port for now. TODO: support multiple ports + val storageKey = OpResultStorage.createStorageKey(opId, PortIdentity()) val opStorage = ResultStorage .getOpResultStorage(workflowIdentity) - .get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId) + .get(storageKey) opStorage match { case mongoDocument: MongoDocument[Tuple] => val tableCatStats = mongoDocument.getCategoricalStats @@ -277,15 +278,16 @@ class ExecutionResultService( def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = { // calculate from index (pageIndex starts from 1 instead of 0) val from = request.pageSize * (request.pageIndex - 1) - val opId = OperatorIdentity(request.operatorID) - val paginationIterable = { + // using the first port for now. TODO: support multiple ports + val storageKey = + OpResultStorage.createStorageKey(OperatorIdentity(request.operatorID), PortIdentity()) + val paginationIterable = { ResultStorage .getOpResultStorage(workflowIdentity) - .get(opId) + .get(storageKey) .getRange(from, from + request.pageSize) .to(Iterable) - } val mappedResults = paginationIterable .map(tuple => tuple.asKeyValuePairJson()) @@ -302,7 +304,7 @@ class ExecutionResultService( ResultStorage .getOpResultStorage(workflowIdentity) .getAllKeys - .filter(!_.id.startsWith("materialized_")) + .filter(!_.startsWith("materialized_")) .map(storageKey => { val count = ResultStorage .getOpResultStorage(workflowIdentity) @@ -310,20 +312,15 @@ class ExecutionResultService( .getCount .toInt - val opId = storageKey + val (opId, storagePortId) = OpResultStorage.decodeStorageKey(storageKey) - // use the first output port's mode + // Retrieve the mode of the specified output port val mode = physicalPlan .getPhysicalOpsOfLogicalOp(opId) - .flatMap(physicalOp => physicalOp.outputPorts) - .filter({ - case (portId, (port, links, schema)) => - !portId.internal - }) - .map({ - case (portId, (port, links, schema)) => port.mode - }) + .flatMap(_.outputPorts.get(storagePortId)) + .map(_._1.mode) .head + val changeDetector = if (mode == OutputMode.SET_SNAPSHOT) { UUID.randomUUID.toString diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala index c2a0693bd85..83e3d89f347 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala @@ -24,6 +24,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowVersionResource import org.jooq.types.UInteger import edu.uci.ics.amber.util.ArrowUtils +import edu.uci.ics.amber.workflow.PortIdentity import java.io.{PipedInputStream, PipedOutputStream} import java.nio.charset.StandardCharsets @@ -71,8 +72,11 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) { } // By now the workflow should finish running + // Only supports external port 0 for now. TODO: support multiple ports val operatorResult: VirtualDocument[Tuple] = - ResultStorage.getOpResultStorage(workflowIdentity).get(OperatorIdentity(request.operatorId)) + ResultStorage + .getOpResultStorage(workflowIdentity) + .get(OpResultStorage.createStorageKey(OperatorIdentity(request.operatorId), PortIdentity())) if (operatorResult == null) { return ResultExportResponse("error", "The workflow contains no results") } @@ -190,7 +194,7 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) { val columnIndex = request.columnIndex val filename = request.filename - if (rowIndex >= results.size || columnIndex >= results.head.getFields.size) { + if (rowIndex >= results.size || columnIndex >= results.head.getFields.length) { return ResultExportResponse("error", s"Invalid row or column index") } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 615cc937dbf..795939f1ea0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -2,7 +2,6 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} -import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper @@ -70,62 +69,59 @@ class WorkflowCompiler( // assign the sinks to toAddSink operators' external output ports subPlan .topologicalIterator() + .filter(opId => toAddSink.contains(opId.logicalOpId)) .map(physicalPlan.getOperator) - .flatMap { physicalOp => - physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) - } - .filter({ - case (physicalOp, (_, (outputPort, _, _))) => - toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal - }) - .foreach({ - case (physicalOp, (_, (outputPort, _, schema))) => - val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = physicalOp.id.logicalOpId - - // due to the size limit of single document in mongoDB (16MB) - // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. - val storageType = { - if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY - else OpResultStorage.defaultStorageMode - } - if (!storage.contains(storageKey)) { - // get the schema for result storage in certain mode - val sinkStorageSchema: Option[Schema] = - if (storageType == OpResultStorage.MONGODB) { - // use the output schema on the first output port as the schema for storage - Some(schema.right.get) - } else { - None + .foreach { physicalOp => + physicalOp.outputPorts + .filterNot(_._1.internal) + .foreach { + case (outputPortId, (outputPort, _, schema)) => + val storage = ResultStorage.getOpResultStorage(context.workflowId) + val storageKey = + OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId) + + // Determine the storage type, defaulting to memory for large HTML visualizations + val storageType = + if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY + else OpResultStorage.defaultStorageMode + + if (!storage.contains(storageKey)) { + // Create storage if it doesn't exist + val sinkStorageSchema = + schema.getOrElse(throw new IllegalStateException("Schema is missing")) + storage.create( + s"${context.executionId}_", + storageKey, + storageType, + sinkStorageSchema + ) + + // Add sink collection name to the JSON array of sinks + sinksPointers.add( + objectMapper + .createObjectNode() + .put("storageType", storageType) + .put("storageKey", s"${context.executionId}_$storageKey") + ) } - storage.create( - s"${context.executionId}_", - storageKey, - storageType, - sinkStorageSchema - ) - // add the sink collection name to the JSON array of sinks - val storageNode = objectMapper.createObjectNode() - storageNode.put("storageType", storageType) - storageNode.put("storageKey", s"${context.executionId}_$storageKey") - sinksPointers.add(storageNode) - } - val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - context.workflowId, - context.executionId, - storageKey.id, - outputPort.mode - ) - val sinkLink = - PhysicalLink( - physicalOp.id, - outputPort.id, - sinkPhysicalOp.id, - PortIdentity(internal = true) - ) - physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) - }) + // Create and link the sink operator + val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + context.workflowId, + context.executionId, + storageKey, + outputPort.mode + ) + val sinkLink = PhysicalLink( + physicalOp.id, + outputPort.id, + sinkPhysicalOp.id, + sinkPhysicalOp.outputPorts.head._1 + ) + + physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) + } + } } match { case Success(_) => diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index e56e0973e15..2f6f8ab67d5 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -69,8 +69,16 @@ class DataProcessingSpec .registerCallback[ExecutionStateUpdate](evt => { if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds - .filter(terminalOpId => resultStorage.contains(terminalOpId)) - .map(terminalOpId => terminalOpId -> resultStorage.get(terminalOpId).get().toList) + .filter(terminalOpId => + // expecting the first output port only. + resultStorage.contains(OpResultStorage.createStorageKey(terminalOpId, PortIdentity())) + ) + .map(terminalOpId => + terminalOpId -> resultStorage + .get(OpResultStorage.createStorageKey(terminalOpId, PortIdentity())) + .get() + .toList + ) .toMap completion.setDone() } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala index 18baa1844fb..92fa1cdce11 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala @@ -21,7 +21,7 @@ import java.util.Date class MongoDocument[T >: Null <: AnyRef]( id: String, var toDocument: T => Document, - var fromDocument: Option[Document => T] = None + var fromDocument: Document => T ) extends VirtualDocument[T] { /** @@ -74,9 +74,7 @@ class MongoDocument[T >: Null <: AnyRef]( override def hasNext: Boolean = cursor.hasNext override def next(): T = { - val fromDocumentFunc = - fromDocument.getOrElse(throw new NotImplementedError("fromDocument is not set")) - fromDocumentFunc(cursor.next()) + fromDocument(cursor.next()) } }.iterator } @@ -133,9 +131,7 @@ class MongoDocument[T >: Null <: AnyRef]( if (!cursor.hasNext) { throw new RuntimeException(f"Index $i out of bounds") } - val fromDocumentFunc = - fromDocument.getOrElse(throw new NotImplementedError("fromDocument is not set")) - fromDocumentFunc(cursor.next()) + fromDocument(cursor.next()) } /** @@ -146,19 +142,6 @@ class MongoDocument[T >: Null <: AnyRef]( collectionMgr.getCount } - /** - * Set the deserializer, i.e. from Document to T. This method can only be called once. - * - * @param fromDocument : the deserializer, convert MongoDB's Document to T. - * @throws IllegalStateException if setSerde is called more than once. - */ - def setDeserde(fromDocument: Document => T): Unit = { - if (this.fromDocument.isDefined) { - throw new IllegalStateException("setSerde can only be called once.") - } - this.fromDocument = Some(fromDocument) - } - def getNumericColStats: Map[String, Map[String, Double]] = collectionMgr.calculateNumericStats() diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index ab69d6f94d2..42728231270 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -5,103 +5,153 @@ import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.{Schema, Tuple} import edu.uci.ics.amber.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.workflow.PortIdentity import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.ImplicitConversions.`iterator asScala` +import scala.jdk.CollectionConverters.IteratorHasAsScala +/** + * Companion object for `OpResultStorage`, providing utility functions + * for key generation, decoding, and storage modes. + */ object OpResultStorage { val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase - val MEMORY = "memory" - val MONGODB = "mongodb" + val MEMORY: String = "memory" + val MONGODB: String = "mongodb" + + /** + * Creates a unique storage key by combining operator and port identities. + * + * @param operatorId The unique identifier of the operator. + * @param portIdentity The unique identifier of the port. + * @param isMaterialized Indicates whether the storage is materialized (e.g., persisted). + * @return A string representing the generated storage key, formatted as: + * "materialized___" if materialized, + * otherwise "__". + */ + def createStorageKey( + operatorId: OperatorIdentity, + portIdentity: PortIdentity, + isMaterialized: Boolean = false + ): String = { + val prefix = if (isMaterialized) "materialized_" else "" + s"$prefix${operatorId.id}_${portIdentity.id}_${portIdentity.internal}" + } + + /** + * Decodes a storage key back into its original components. + * + * @param key The storage key to decode. + * @return A tuple containing the operator identity and port identity. + * @throws IllegalArgumentException If the key format is invalid. + */ + def decodeStorageKey(key: String): (OperatorIdentity, PortIdentity) = { + val processedKey = if (key.startsWith("materialized_")) key.substring(13) else key + processedKey.split("_", 3) match { + case Array(opId, portId, internal) => + (OperatorIdentity(opId), PortIdentity(portId.toInt, internal.toBoolean)) + case _ => + throw new IllegalArgumentException(s"Invalid storage key: $key") + } + } } /** - * Public class of operator result storage. - * One execution links one instance of OpResultStorage, both have the same lifecycle. + * Handles the storage of operator results during workflow execution. + * Each `OpResultStorage` instance is tied to the lifecycle of a single execution. */ class OpResultStorage extends Serializable with LazyLogging { - // since some op need to get the schema from the OpResultStorage, the schema is stored as part of the OpResultStorage.cache - // TODO: once we make the storage self-contained, i.e. storing Schema in the storage as metadata, we can remove it - val cache: ConcurrentHashMap[OperatorIdentity, (VirtualDocument[Tuple], Option[Schema])] = - new ConcurrentHashMap[OperatorIdentity, (VirtualDocument[Tuple], Option[Schema])]() + /** + * In-memory cache for storing results and their associated schemas. + * TODO: Once the storage is self-contained (i.e., stores schemas as metadata), + * this can be removed. + */ + private val cache: ConcurrentHashMap[String, (VirtualDocument[Tuple], Schema)] = + new ConcurrentHashMap() /** - * Retrieve the result of an operator from OpResultStorage - * @param key The key used for storage and retrieval. - * Currently it is the uuid inside the cache source or cache sink operator. - * @return The storage object of this operator. + * Retrieves the result of an operator from the storage. + * + * @param key The storage key associated with the result. + * @return The result stored as a `VirtualDocument[Tuple]`. + * @throws NoSuchElementException If the key is not found in the cache. */ - def get(key: OperatorIdentity): VirtualDocument[Tuple] = { - cache.get(key)._1 + def get(key: String): VirtualDocument[Tuple] = { + Option(cache.get(key)) match { + case Some((document, _)) => document + case None => throw new NoSuchElementException(s"Storage with key $key not found") + } } /** - * Retrieve the schema of the result associate with target operator - * @param key the uuid inside the cache source or cache sink operator. - * @return The result schema of this operator. + * Retrieves the schema associated with an operator's result. + * + * @param key The storage key associated with the schema. + * @return The schema of the result. */ - def getSchema(key: OperatorIdentity): Option[Schema] = { + def getSchema(key: String): Schema = { cache.get(key)._2 } - def setSchema(key: OperatorIdentity, schema: Schema): Unit = { - val storage = get(key) - cache.put(key, (storage, Some(schema))) - } - + /** + * Creates a new storage object for an operator result. + * + * @param executionId An optional execution ID for unique identification. + * @param key The storage key for the result. + * @param mode The storage mode (e.g., "memory" or "mongodb"). + * @param schema The schema of the result. + * @return A `VirtualDocument[Tuple]` instance for storing results. + */ def create( executionId: String = "", - key: OperatorIdentity, + key: String, mode: String, - schema: Option[Schema] = None + schema: Schema ): VirtualDocument[Tuple] = { - val storage: VirtualDocument[Tuple] = - if (mode == "memory") { - new MemoryDocument[Tuple](key.id) + if (mode == OpResultStorage.MEMORY) { + new MemoryDocument[Tuple](key) } else { try { - val fromDocument = schema.map(Tuple.fromDocument) - new MongoDocument[Tuple](executionId + key, Tuple.toDocument, fromDocument) + new MongoDocument[Tuple]( + executionId + key, + Tuple.toDocument, + Tuple.fromDocument(schema) + ) } catch { case t: Throwable => - logger.warn("Failed to create mongo storage", t) - logger.info(s"Fall back to memory storage for $key") - // fall back to memory - new MemoryDocument[Tuple](key.id) + logger.warn("Failed to create MongoDB storage", t) + logger.info(s"Falling back to memory storage for $key") + new MemoryDocument[Tuple](key) } } cache.put(key, (storage, schema)) storage } - def contains(key: OperatorIdentity): Boolean = { - cache.containsKey(key) - } - /** - * Manually remove an entry from the cache. - * @param key The key used for storage and retrieval. - * Currently it is the uuid inside the cache source or cache sink operator. + * Checks if a storage key exists in the cache. + * + * @param key The storage key to check. + * @return True if the key exists, false otherwise. */ - def remove(key: OperatorIdentity): Unit = { - if (cache.contains(key)) { - cache.get(key)._1.clear() - } - cache.remove(key) - } + def contains(key: String): Boolean = cache.containsKey(key) /** - * Close this storage. Used for workflow cleanup. + * Clears all stored results. Typically used during workflow cleanup. */ def clear(): Unit = { cache.forEach((_, document) => document._1.clear()) cache.clear() } - def getAllKeys: Set[OperatorIdentity] = { - cache.keySet().iterator().toSet + /** + * Retrieves all storage keys currently in the cache. + * + * @return A set of all keys in the cache. + */ + def getAllKeys: Set[String] = { + cache.keySet().iterator().asScala.toSet } - } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 0024993166f..64028a3fef8 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -1,10 +1,12 @@ package edu.uci.ics.amber.operator import edu.uci.ics.amber.core.executor.OpExecInitInfo +import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec +import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{ ExecutionIdentity, OperatorIdentity, @@ -21,10 +23,11 @@ object SpecialPhysicalOpFactory { executionIdentity: ExecutionIdentity, storageKey: String, outputMode: OutputMode - ): PhysicalOp = + ): PhysicalOp = { + val (opId, portId) = OpResultStorage.decodeStorageKey(storageKey) PhysicalOp .localPhysicalOp( - PhysicalOpIdentity(OperatorIdentity(storageKey), "sink"), + PhysicalOpIdentity(opId, s"sink${portId.id}"), workflowIdentity, executionIdentity, OpExecInitInfo((idx, workers) => @@ -68,4 +71,31 @@ object SpecialPhysicalOpFactory { Map(PortIdentity(internal = true) -> outputSchema) }) ) + } + + def newSourcePhysicalOp( + workflowIdentity: WorkflowIdentity, + executionIdentity: ExecutionIdentity, + storageKey: String + ): PhysicalOp = { + + val (opId, portId) = OpResultStorage.decodeStorageKey(storageKey) + val opResultStorage = ResultStorage.getOpResultStorage(workflowIdentity) + val outputPort = OutputPort() + PhysicalOp + .sourcePhysicalOp( + PhysicalOpIdentity(opId, s"source${portId.id}"), + workflowIdentity, + executionIdentity, + OpExecInitInfo((_, _) => new CacheSourceOpExec(storageKey, workflowIdentity)) + ) + .withInputPorts(List.empty) + .withOutputPorts(List(outputPort)) + .withPropagateSchema( + SchemaPropagationFunc(_ => Map(outputPort.id -> opResultStorage.getSchema(storageKey))) + ) + .propagateSchema() + + } + } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala index aaeababd685..7290e064159 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala @@ -15,7 +15,7 @@ class ProgressiveSinkOpExec( workflowIdentity: WorkflowIdentity ) extends SinkOperatorExecutor { val writer: BufferedItemWriter[Tuple] = - ResultStorage.getOpResultStorage(workflowIdentity).get(OperatorIdentity(storageKey)).writer() + ResultStorage.getOpResultStorage(workflowIdentity).get(storageKey).writer() override def open(): Unit = { writer.open() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala index 2fa7772d3dc..ba335d1d5c9 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala @@ -2,12 +2,14 @@ package edu.uci.ics.amber.operator.source.cache import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.executor.SourceOperatorExecutor -import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} +import edu.uci.ics.amber.core.storage.result.ResultStorage +import edu.uci.ics.amber.core.tuple.TupleLike +import edu.uci.ics.amber.virtualidentity.WorkflowIdentity -class CacheSourceOpExec(storage: VirtualDocument[Tuple]) +class CacheSourceOpExec(storageKey: String, workflowIdentity: WorkflowIdentity) extends SourceOperatorExecutor with LazyLogging { + private val storage = ResultStorage.getOpResultStorage(workflowIdentity).get(storageKey) override def produceTuple(): Iterator[TupleLike] = storage.get()