diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 18f7c4974b4..d83eb0c1a57 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -192,7 +192,7 @@ class OutputManager( } def getSingleOutputPortIdentity: PortIdentity = { - assert(ports.size == 1) + assert(ports.size == 1, "expect 1 output port, got " + ports.size) ports.head._1 } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 121be2289b1..5728f197dbc 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,22 +1,15 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} -import edu.uci.ics.amber.core.workflow.{ - PhysicalOp, - PhysicalPlan, - SchemaPropagationFunc, - WorkflowContext -} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ DefaultResourceAllocator, ExecutionClusterInfo } import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory -import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec -import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink} +import edu.uci.ics.amber.virtualidentity.PhysicalOpIdentity +import edu.uci.ics.amber.workflow.PhysicalLink import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator @@ -119,9 +112,9 @@ abstract class ScheduleGenerator( physicalPlan .getLinksBetween(upstreamPhysicalOpId, physicalOpId) .filter(link => - !physicalPlan.getOperator(physicalOpId).isSinkOperator && (physicalPlan + !physicalPlan.getOperator(physicalOpId).isSinkOperator && physicalPlan .getOperator(physicalOpId) - .isInputLinkDependee(link)) + .isInputLinkDependee(link) ) } } @@ -158,7 +151,19 @@ abstract class ScheduleGenerator( .removeLink(physicalLink) // create cache writer and link - val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink) + val storageKey = OpResultStorage.createStorageKey( + physicalLink.fromOpId.logicalOpId, + physicalLink.fromPortId, + isMaterialized = true + ) + val fromPortOutputMode = + physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode + val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + workflowContext.workflowId, + workflowContext.executionId, + storageKey, + fromPortOutputMode + ) val sourceToWriterLink = PhysicalLink( fromOp.id, @@ -170,7 +175,7 @@ abstract class ScheduleGenerator( .addOperator(matWriterPhysicalOp) .addLink(sourceToWriterLink) - // expect exactly one input port and one output port + // sink has exactly one input port and one output port val schema = newPhysicalPlan .getOperator(matWriterPhysicalOp.id) .outputPorts(matWriterPhysicalOp.outputPorts.keys.head) @@ -180,14 +185,17 @@ abstract class ScheduleGenerator( ResultStorage .getOpResultStorage(workflowContext.workflowId) .create( - key = matWriterPhysicalOp.id.logicalOpId, + key = storageKey, mode = OpResultStorage.defaultStorageMode, - schema = Some(schema) + schema = schema ) // create cache reader and link - val matReaderPhysicalOp: PhysicalOp = - createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink) + val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp( + workflowContext.workflowId, + workflowContext.executionId, + storageKey + ) val readerToDestLink = PhysicalLink( matReaderPhysicalOp.id, @@ -201,52 +209,4 @@ abstract class ScheduleGenerator( .addOperator(matReaderPhysicalOp) .addLink(readerToDestLink) } - - private def createMatReader( - matWriterLogicalOpId: OperatorIdentity, - physicalLink: PhysicalLink - ): PhysicalOp = { - val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) - PhysicalOp - .sourcePhysicalOp( - workflowContext.workflowId, - workflowContext.executionId, - OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"), - OpExecInitInfo((_, _) => - new CacheSourceOpExec( - opResultStorage.get(matWriterLogicalOpId) - ) - ) - ) - .withInputPorts(List.empty) - .withOutputPorts(List(OutputPort())) - .withPropagateSchema( - SchemaPropagationFunc(_ => - Map( - OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get - ) - ) - ) - .propagateSchema() - - } - - private def createMatWriter(physicalLink: PhysicalLink): PhysicalOp = { - val outputMode = - physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode - val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}" - SpecialPhysicalOpFactory.newSinkPhysicalOp( - workflowContext.workflowId, - workflowContext.executionId, - storageKey, - outputMode - ) - } - - private def getMatIdFromPhysicalLink(physicalLink: PhysicalLink) = - s"${physicalLink.fromOpId.logicalOpId}_${physicalLink.fromOpId.layerName}_" + - s"${physicalLink.fromPortId.id}_" + - s"${physicalLink.toOpId.logicalOpId}_${physicalLink.toOpId.layerName}_" + - s"${physicalLink.toPortId.id}" - } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index a0b09ff3fd1..70a21bba475 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,12 +5,8 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.StorageConfig -import edu.uci.ics.amber.core.storage.result.{ - MongoDocument, - OperatorResultMetadata, - ResultStorage, - WorkflowResultStore -} +import edu.uci.ics.amber.core.storage.result.OpResultStorage.MONGODB +import edu.uci.ics.amber.core.storage.result._ import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan} import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError} @@ -25,6 +21,7 @@ import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataSt import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime} import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.OutputPort.OutputMode +import edu.uci.ics.amber.workflow.PortIdentity import edu.uci.ics.texera.web.SubscriptionManager import edu.uci.ics.texera.web.model.websocket.event.{ PaginatedResultEvent, @@ -89,7 +86,9 @@ object ExecutionResultService { } val storage = - ResultStorage.getOpResultStorage(workflowIdentity).get(physicalOps.head.id.logicalOpId) + ResultStorage + .getOpResultStorage(workflowIdentity) + .get(OpResultStorage.createStorageKey(physicalOps.head.id.logicalOpId, PortIdentity())) val webUpdate = webOutputMode match { case PaginationMode() => val numTuples = storage.getCount @@ -238,10 +237,12 @@ class ExecutionResultService( oldInfo.tupleCount, info.tupleCount ) - if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") { + if (StorageConfig.resultStorageMode == MONGODB) { + // using the first port for now. TODO: support multiple ports + val storageKey = OpResultStorage.createStorageKey(opId, PortIdentity()) val opStorage = ResultStorage .getOpResultStorage(workflowIdentity) - .get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId) + .get(storageKey) opStorage match { case mongoDocument: MongoDocument[Tuple] => val tableCatStats = mongoDocument.getCategoricalStats @@ -277,15 +278,16 @@ class ExecutionResultService( def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = { // calculate from index (pageIndex starts from 1 instead of 0) val from = request.pageSize * (request.pageIndex - 1) - val opId = OperatorIdentity(request.operatorID) - val paginationIterable = { + // using the first port for now. TODO: support multiple ports + val storageKey = + OpResultStorage.createStorageKey(OperatorIdentity(request.operatorID), PortIdentity()) + val paginationIterable = { ResultStorage .getOpResultStorage(workflowIdentity) - .get(opId) + .get(storageKey) .getRange(from, from + request.pageSize) .to(Iterable) - } val mappedResults = paginationIterable .map(tuple => tuple.asKeyValuePairJson()) @@ -302,7 +304,7 @@ class ExecutionResultService( ResultStorage .getOpResultStorage(workflowIdentity) .getAllKeys - .filter(!_.id.startsWith("materialized_")) + .filter(!_.startsWith("materialized_")) .map(storageKey => { val count = ResultStorage .getOpResultStorage(workflowIdentity) @@ -310,20 +312,15 @@ class ExecutionResultService( .getCount .toInt - val opId = storageKey + val (opId, storagePortId) = OpResultStorage.decodeStorageKey(storageKey) - // use the first output port's mode + // Retrieve the mode of the specified output port val mode = physicalPlan .getPhysicalOpsOfLogicalOp(opId) - .flatMap(physicalOp => physicalOp.outputPorts) - .filter({ - case (portId, (port, links, schema)) => - !portId.internal - }) - .map({ - case (portId, (port, links, schema)) => port.mode - }) + .flatMap(_.outputPorts.get(storagePortId)) + .map(_._1.mode) .head + val changeDetector = if (mode == OutputMode.SET_SNAPSHOT) { UUID.randomUUID.toString diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala index c2a0693bd85..83e3d89f347 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala @@ -24,6 +24,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowVersionResource import org.jooq.types.UInteger import edu.uci.ics.amber.util.ArrowUtils +import edu.uci.ics.amber.workflow.PortIdentity import java.io.{PipedInputStream, PipedOutputStream} import java.nio.charset.StandardCharsets @@ -71,8 +72,11 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) { } // By now the workflow should finish running + // Only supports external port 0 for now. TODO: support multiple ports val operatorResult: VirtualDocument[Tuple] = - ResultStorage.getOpResultStorage(workflowIdentity).get(OperatorIdentity(request.operatorId)) + ResultStorage + .getOpResultStorage(workflowIdentity) + .get(OpResultStorage.createStorageKey(OperatorIdentity(request.operatorId), PortIdentity())) if (operatorResult == null) { return ResultExportResponse("error", "The workflow contains no results") } @@ -190,7 +194,7 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) { val columnIndex = request.columnIndex val filename = request.filename - if (rowIndex >= results.size || columnIndex >= results.head.getFields.size) { + if (rowIndex >= results.size || columnIndex >= results.head.getFields.length) { return ResultExportResponse("error", s"Invalid row or column index") } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 615cc937dbf..795939f1ea0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -2,7 +2,6 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} -import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.engine.common.Utils.objectMapper @@ -70,62 +69,59 @@ class WorkflowCompiler( // assign the sinks to toAddSink operators' external output ports subPlan .topologicalIterator() + .filter(opId => toAddSink.contains(opId.logicalOpId)) .map(physicalPlan.getOperator) - .flatMap { physicalOp => - physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) - } - .filter({ - case (physicalOp, (_, (outputPort, _, _))) => - toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal - }) - .foreach({ - case (physicalOp, (_, (outputPort, _, schema))) => - val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = physicalOp.id.logicalOpId - - // due to the size limit of single document in mongoDB (16MB) - // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. - val storageType = { - if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY - else OpResultStorage.defaultStorageMode - } - if (!storage.contains(storageKey)) { - // get the schema for result storage in certain mode - val sinkStorageSchema: Option[Schema] = - if (storageType == OpResultStorage.MONGODB) { - // use the output schema on the first output port as the schema for storage - Some(schema.right.get) - } else { - None + .foreach { physicalOp => + physicalOp.outputPorts + .filterNot(_._1.internal) + .foreach { + case (outputPortId, (outputPort, _, schema)) => + val storage = ResultStorage.getOpResultStorage(context.workflowId) + val storageKey = + OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId) + + // Determine the storage type, defaulting to memory for large HTML visualizations + val storageType = + if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY + else OpResultStorage.defaultStorageMode + + if (!storage.contains(storageKey)) { + // Create storage if it doesn't exist + val sinkStorageSchema = + schema.getOrElse(throw new IllegalStateException("Schema is missing")) + storage.create( + s"${context.executionId}_", + storageKey, + storageType, + sinkStorageSchema + ) + + // Add sink collection name to the JSON array of sinks + sinksPointers.add( + objectMapper + .createObjectNode() + .put("storageType", storageType) + .put("storageKey", s"${context.executionId}_$storageKey") + ) } - storage.create( - s"${context.executionId}_", - storageKey, - storageType, - sinkStorageSchema - ) - // add the sink collection name to the JSON array of sinks - val storageNode = objectMapper.createObjectNode() - storageNode.put("storageType", storageType) - storageNode.put("storageKey", s"${context.executionId}_$storageKey") - sinksPointers.add(storageNode) - } - val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - context.workflowId, - context.executionId, - storageKey.id, - outputPort.mode - ) - val sinkLink = - PhysicalLink( - physicalOp.id, - outputPort.id, - sinkPhysicalOp.id, - PortIdentity(internal = true) - ) - physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) - }) + // Create and link the sink operator + val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + context.workflowId, + context.executionId, + storageKey, + outputPort.mode + ) + val sinkLink = PhysicalLink( + physicalOp.id, + outputPort.id, + sinkPhysicalOp.id, + sinkPhysicalOp.outputPorts.head._1 + ) + + physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) + } + } } match { case Success(_) => diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index e56e0973e15..2f6f8ab67d5 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -69,8 +69,16 @@ class DataProcessingSpec .registerCallback[ExecutionStateUpdate](evt => { if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds - .filter(terminalOpId => resultStorage.contains(terminalOpId)) - .map(terminalOpId => terminalOpId -> resultStorage.get(terminalOpId).get().toList) + .filter(terminalOpId => + // expecting the first output port only. + resultStorage.contains(OpResultStorage.createStorageKey(terminalOpId, PortIdentity())) + ) + .map(terminalOpId => + terminalOpId -> resultStorage + .get(OpResultStorage.createStorageKey(terminalOpId, PortIdentity())) + .get() + .toList + ) .toMap completion.setDone() } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala index 18baa1844fb..92fa1cdce11 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala @@ -21,7 +21,7 @@ import java.util.Date class MongoDocument[T >: Null <: AnyRef]( id: String, var toDocument: T => Document, - var fromDocument: Option[Document => T] = None + var fromDocument: Document => T ) extends VirtualDocument[T] { /** @@ -74,9 +74,7 @@ class MongoDocument[T >: Null <: AnyRef]( override def hasNext: Boolean = cursor.hasNext override def next(): T = { - val fromDocumentFunc = - fromDocument.getOrElse(throw new NotImplementedError("fromDocument is not set")) - fromDocumentFunc(cursor.next()) + fromDocument(cursor.next()) } }.iterator } @@ -133,9 +131,7 @@ class MongoDocument[T >: Null <: AnyRef]( if (!cursor.hasNext) { throw new RuntimeException(f"Index $i out of bounds") } - val fromDocumentFunc = - fromDocument.getOrElse(throw new NotImplementedError("fromDocument is not set")) - fromDocumentFunc(cursor.next()) + fromDocument(cursor.next()) } /** @@ -146,19 +142,6 @@ class MongoDocument[T >: Null <: AnyRef]( collectionMgr.getCount } - /** - * Set the deserializer, i.e. from Document to T. This method can only be called once. - * - * @param fromDocument : the deserializer, convert MongoDB's Document to T. - * @throws IllegalStateException if setSerde is called more than once. - */ - def setDeserde(fromDocument: Document => T): Unit = { - if (this.fromDocument.isDefined) { - throw new IllegalStateException("setSerde can only be called once.") - } - this.fromDocument = Some(fromDocument) - } - def getNumericColStats: Map[String, Map[String, Double]] = collectionMgr.calculateNumericStats() diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index ab69d6f94d2..42728231270 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -5,103 +5,153 @@ import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.{Schema, Tuple} import edu.uci.ics.amber.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.workflow.PortIdentity import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.ImplicitConversions.`iterator asScala` +import scala.jdk.CollectionConverters.IteratorHasAsScala +/** + * Companion object for `OpResultStorage`, providing utility functions + * for key generation, decoding, and storage modes. + */ object OpResultStorage { val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase - val MEMORY = "memory" - val MONGODB = "mongodb" + val MEMORY: String = "memory" + val MONGODB: String = "mongodb" + + /** + * Creates a unique storage key by combining operator and port identities. + * + * @param operatorId The unique identifier of the operator. + * @param portIdentity The unique identifier of the port. + * @param isMaterialized Indicates whether the storage is materialized (e.g., persisted). + * @return A string representing the generated storage key, formatted as: + * "materialized___" if materialized, + * otherwise "__". + */ + def createStorageKey( + operatorId: OperatorIdentity, + portIdentity: PortIdentity, + isMaterialized: Boolean = false + ): String = { + val prefix = if (isMaterialized) "materialized_" else "" + s"$prefix${operatorId.id}_${portIdentity.id}_${portIdentity.internal}" + } + + /** + * Decodes a storage key back into its original components. + * + * @param key The storage key to decode. + * @return A tuple containing the operator identity and port identity. + * @throws IllegalArgumentException If the key format is invalid. + */ + def decodeStorageKey(key: String): (OperatorIdentity, PortIdentity) = { + val processedKey = if (key.startsWith("materialized_")) key.substring(13) else key + processedKey.split("_", 3) match { + case Array(opId, portId, internal) => + (OperatorIdentity(opId), PortIdentity(portId.toInt, internal.toBoolean)) + case _ => + throw new IllegalArgumentException(s"Invalid storage key: $key") + } + } } /** - * Public class of operator result storage. - * One execution links one instance of OpResultStorage, both have the same lifecycle. + * Handles the storage of operator results during workflow execution. + * Each `OpResultStorage` instance is tied to the lifecycle of a single execution. */ class OpResultStorage extends Serializable with LazyLogging { - // since some op need to get the schema from the OpResultStorage, the schema is stored as part of the OpResultStorage.cache - // TODO: once we make the storage self-contained, i.e. storing Schema in the storage as metadata, we can remove it - val cache: ConcurrentHashMap[OperatorIdentity, (VirtualDocument[Tuple], Option[Schema])] = - new ConcurrentHashMap[OperatorIdentity, (VirtualDocument[Tuple], Option[Schema])]() + /** + * In-memory cache for storing results and their associated schemas. + * TODO: Once the storage is self-contained (i.e., stores schemas as metadata), + * this can be removed. + */ + private val cache: ConcurrentHashMap[String, (VirtualDocument[Tuple], Schema)] = + new ConcurrentHashMap() /** - * Retrieve the result of an operator from OpResultStorage - * @param key The key used for storage and retrieval. - * Currently it is the uuid inside the cache source or cache sink operator. - * @return The storage object of this operator. + * Retrieves the result of an operator from the storage. + * + * @param key The storage key associated with the result. + * @return The result stored as a `VirtualDocument[Tuple]`. + * @throws NoSuchElementException If the key is not found in the cache. */ - def get(key: OperatorIdentity): VirtualDocument[Tuple] = { - cache.get(key)._1 + def get(key: String): VirtualDocument[Tuple] = { + Option(cache.get(key)) match { + case Some((document, _)) => document + case None => throw new NoSuchElementException(s"Storage with key $key not found") + } } /** - * Retrieve the schema of the result associate with target operator - * @param key the uuid inside the cache source or cache sink operator. - * @return The result schema of this operator. + * Retrieves the schema associated with an operator's result. + * + * @param key The storage key associated with the schema. + * @return The schema of the result. */ - def getSchema(key: OperatorIdentity): Option[Schema] = { + def getSchema(key: String): Schema = { cache.get(key)._2 } - def setSchema(key: OperatorIdentity, schema: Schema): Unit = { - val storage = get(key) - cache.put(key, (storage, Some(schema))) - } - + /** + * Creates a new storage object for an operator result. + * + * @param executionId An optional execution ID for unique identification. + * @param key The storage key for the result. + * @param mode The storage mode (e.g., "memory" or "mongodb"). + * @param schema The schema of the result. + * @return A `VirtualDocument[Tuple]` instance for storing results. + */ def create( executionId: String = "", - key: OperatorIdentity, + key: String, mode: String, - schema: Option[Schema] = None + schema: Schema ): VirtualDocument[Tuple] = { - val storage: VirtualDocument[Tuple] = - if (mode == "memory") { - new MemoryDocument[Tuple](key.id) + if (mode == OpResultStorage.MEMORY) { + new MemoryDocument[Tuple](key) } else { try { - val fromDocument = schema.map(Tuple.fromDocument) - new MongoDocument[Tuple](executionId + key, Tuple.toDocument, fromDocument) + new MongoDocument[Tuple]( + executionId + key, + Tuple.toDocument, + Tuple.fromDocument(schema) + ) } catch { case t: Throwable => - logger.warn("Failed to create mongo storage", t) - logger.info(s"Fall back to memory storage for $key") - // fall back to memory - new MemoryDocument[Tuple](key.id) + logger.warn("Failed to create MongoDB storage", t) + logger.info(s"Falling back to memory storage for $key") + new MemoryDocument[Tuple](key) } } cache.put(key, (storage, schema)) storage } - def contains(key: OperatorIdentity): Boolean = { - cache.containsKey(key) - } - /** - * Manually remove an entry from the cache. - * @param key The key used for storage and retrieval. - * Currently it is the uuid inside the cache source or cache sink operator. + * Checks if a storage key exists in the cache. + * + * @param key The storage key to check. + * @return True if the key exists, false otherwise. */ - def remove(key: OperatorIdentity): Unit = { - if (cache.contains(key)) { - cache.get(key)._1.clear() - } - cache.remove(key) - } + def contains(key: String): Boolean = cache.containsKey(key) /** - * Close this storage. Used for workflow cleanup. + * Clears all stored results. Typically used during workflow cleanup. */ def clear(): Unit = { cache.forEach((_, document) => document._1.clear()) cache.clear() } - def getAllKeys: Set[OperatorIdentity] = { - cache.keySet().iterator().toSet + /** + * Retrieves all storage keys currently in the cache. + * + * @return A set of all keys in the cache. + */ + def getAllKeys: Set[String] = { + cache.keySet().iterator().asScala.toSet } - } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 0024993166f..64028a3fef8 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -1,10 +1,12 @@ package edu.uci.ics.amber.operator import edu.uci.ics.amber.core.executor.OpExecInitInfo +import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec +import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{ ExecutionIdentity, OperatorIdentity, @@ -21,10 +23,11 @@ object SpecialPhysicalOpFactory { executionIdentity: ExecutionIdentity, storageKey: String, outputMode: OutputMode - ): PhysicalOp = + ): PhysicalOp = { + val (opId, portId) = OpResultStorage.decodeStorageKey(storageKey) PhysicalOp .localPhysicalOp( - PhysicalOpIdentity(OperatorIdentity(storageKey), "sink"), + PhysicalOpIdentity(opId, s"sink${portId.id}"), workflowIdentity, executionIdentity, OpExecInitInfo((idx, workers) => @@ -68,4 +71,31 @@ object SpecialPhysicalOpFactory { Map(PortIdentity(internal = true) -> outputSchema) }) ) + } + + def newSourcePhysicalOp( + workflowIdentity: WorkflowIdentity, + executionIdentity: ExecutionIdentity, + storageKey: String + ): PhysicalOp = { + + val (opId, portId) = OpResultStorage.decodeStorageKey(storageKey) + val opResultStorage = ResultStorage.getOpResultStorage(workflowIdentity) + val outputPort = OutputPort() + PhysicalOp + .sourcePhysicalOp( + PhysicalOpIdentity(opId, s"source${portId.id}"), + workflowIdentity, + executionIdentity, + OpExecInitInfo((_, _) => new CacheSourceOpExec(storageKey, workflowIdentity)) + ) + .withInputPorts(List.empty) + .withOutputPorts(List(outputPort)) + .withPropagateSchema( + SchemaPropagationFunc(_ => Map(outputPort.id -> opResultStorage.getSchema(storageKey))) + ) + .propagateSchema() + + } + } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala index aaeababd685..7290e064159 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala @@ -15,7 +15,7 @@ class ProgressiveSinkOpExec( workflowIdentity: WorkflowIdentity ) extends SinkOperatorExecutor { val writer: BufferedItemWriter[Tuple] = - ResultStorage.getOpResultStorage(workflowIdentity).get(OperatorIdentity(storageKey)).writer() + ResultStorage.getOpResultStorage(workflowIdentity).get(storageKey).writer() override def open(): Unit = { writer.open() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala index 2fa7772d3dc..ba335d1d5c9 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala @@ -2,12 +2,14 @@ package edu.uci.ics.amber.operator.source.cache import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.executor.SourceOperatorExecutor -import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} +import edu.uci.ics.amber.core.storage.result.ResultStorage +import edu.uci.ics.amber.core.tuple.TupleLike +import edu.uci.ics.amber.virtualidentity.WorkflowIdentity -class CacheSourceOpExec(storage: VirtualDocument[Tuple]) +class CacheSourceOpExec(storageKey: String, workflowIdentity: WorkflowIdentity) extends SourceOperatorExecutor with LazyLogging { + private val storage = ResultStorage.getOpResultStorage(workflowIdentity).get(storageKey) override def produceTuple(): Iterator[TupleLike] = storage.get()