Skip to content

Commit

Permalink
Support multiple output ports with storage (#3175)
Browse files Browse the repository at this point in the history
Previously, all results were tied to logical operators. This PR modifies
the engine to associate results with output ports, enabling better
granularity and support for operators with multiple outputs.

### Key Change: StorageKey with PortIdentity

The most significant update in this PR is the adjustment of the storage
key format to include both the logical operator ID and the port ID. This
ensures that logical operators with multiple output ports (e.g., Split)
can have distinct storages created for each output port.

For now, the frontend retrieves results from the default output port
(port 0). In future updates, the frontend will be enhanced to support
retrieving results from additional output ports, providing more
flexibility in how results are accessed and displayed.
  • Loading branch information
Yicong-Huang authored Dec 30, 2024
1 parent 14572dd commit 5b622e3
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class OutputManager(
}

def getSingleOutputPortIdentity: PortIdentity = {
assert(ports.size == 1)
assert(ports.size == 1, "expect 1 output port, got " + ports.size)
ports.head._1
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.workflow.{
PhysicalOp,
PhysicalPlan,
SchemaPropagationFunc,
WorkflowContext
}
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
}
import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink}
import edu.uci.ics.amber.virtualidentity.PhysicalOpIdentity
import edu.uci.ics.amber.workflow.PhysicalLink
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator

Expand Down Expand Up @@ -119,9 +112,9 @@ abstract class ScheduleGenerator(
physicalPlan
.getLinksBetween(upstreamPhysicalOpId, physicalOpId)
.filter(link =>
!physicalPlan.getOperator(physicalOpId).isSinkOperator && (physicalPlan
!physicalPlan.getOperator(physicalOpId).isSinkOperator && physicalPlan
.getOperator(physicalOpId)
.isInputLinkDependee(link))
.isInputLinkDependee(link)
)
}
}
Expand Down Expand Up @@ -158,7 +151,19 @@ abstract class ScheduleGenerator(
.removeLink(physicalLink)

// create cache writer and link
val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink)
val storageKey = OpResultStorage.createStorageKey(
physicalLink.fromOpId.logicalOpId,
physicalLink.fromPortId,
isMaterialized = true
)
val fromPortOutputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey,
fromPortOutputMode
)
val sourceToWriterLink =
PhysicalLink(
fromOp.id,
Expand All @@ -170,7 +175,7 @@ abstract class ScheduleGenerator(
.addOperator(matWriterPhysicalOp)
.addLink(sourceToWriterLink)

// expect exactly one input port and one output port
// sink has exactly one input port and one output port
val schema = newPhysicalPlan
.getOperator(matWriterPhysicalOp.id)
.outputPorts(matWriterPhysicalOp.outputPorts.keys.head)
Expand All @@ -180,14 +185,17 @@ abstract class ScheduleGenerator(
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriterPhysicalOp.id.logicalOpId,
key = storageKey,
mode = OpResultStorage.defaultStorageMode,
schema = Some(schema)
schema = schema
)

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp =
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink)
val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey
)
val readerToDestLink =
PhysicalLink(
matReaderPhysicalOp.id,
Expand All @@ -201,52 +209,4 @@ abstract class ScheduleGenerator(
.addOperator(matReaderPhysicalOp)
.addLink(readerToDestLink)
}

private def createMatReader(
matWriterLogicalOpId: OperatorIdentity,
physicalLink: PhysicalLink
): PhysicalOp = {
val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId)
PhysicalOp
.sourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"),
OpExecInitInfo((_, _) =>
new CacheSourceOpExec(
opResultStorage.get(matWriterLogicalOpId)
)
)
)
.withInputPorts(List.empty)
.withOutputPorts(List(OutputPort()))
.withPropagateSchema(
SchemaPropagationFunc(_ =>
Map(
OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get
)
)
)
.propagateSchema()

}

private def createMatWriter(physicalLink: PhysicalLink): PhysicalOp = {
val outputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}"
SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
storageKey,
outputMode
)
}

private def getMatIdFromPhysicalLink(physicalLink: PhysicalLink) =
s"${physicalLink.fromOpId.logicalOpId}_${physicalLink.fromOpId.layerName}_" +
s"${physicalLink.fromPortId.id}_" +
s"${physicalLink.toOpId.logicalOpId}_${physicalLink.toOpId.layerName}_" +
s"${physicalLink.toPortId.id}"

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.result.{
MongoDocument,
OperatorResultMetadata,
ResultStorage,
WorkflowResultStore
}
import edu.uci.ics.amber.core.storage.result.OpResultStorage.MONGODB
import edu.uci.ics.amber.core.storage.result._
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan}
import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError}
Expand All @@ -25,6 +21,7 @@ import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataSt
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.workflow.PortIdentity
import edu.uci.ics.texera.web.SubscriptionManager
import edu.uci.ics.texera.web.model.websocket.event.{
PaginatedResultEvent,
Expand Down Expand Up @@ -89,7 +86,9 @@ object ExecutionResultService {
}

val storage =
ResultStorage.getOpResultStorage(workflowIdentity).get(physicalOps.head.id.logicalOpId)
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(OpResultStorage.createStorageKey(physicalOps.head.id.logicalOpId, PortIdentity()))
val webUpdate = webOutputMode match {
case PaginationMode() =>
val numTuples = storage.getCount
Expand Down Expand Up @@ -238,10 +237,12 @@ class ExecutionResultService(
oldInfo.tupleCount,
info.tupleCount
)
if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") {
if (StorageConfig.resultStorageMode == MONGODB) {
// using the first port for now. TODO: support multiple ports
val storageKey = OpResultStorage.createStorageKey(opId, PortIdentity())
val opStorage = ResultStorage
.getOpResultStorage(workflowIdentity)
.get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId)
.get(storageKey)
opStorage match {
case mongoDocument: MongoDocument[Tuple] =>
val tableCatStats = mongoDocument.getCategoricalStats
Expand Down Expand Up @@ -277,15 +278,16 @@ class ExecutionResultService(
def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = {
// calculate from index (pageIndex starts from 1 instead of 0)
val from = request.pageSize * (request.pageIndex - 1)
val opId = OperatorIdentity(request.operatorID)
val paginationIterable = {

// using the first port for now. TODO: support multiple ports
val storageKey =
OpResultStorage.createStorageKey(OperatorIdentity(request.operatorID), PortIdentity())
val paginationIterable = {
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(opId)
.get(storageKey)
.getRange(from, from + request.pageSize)
.to(Iterable)

}
val mappedResults = paginationIterable
.map(tuple => tuple.asKeyValuePairJson())
Expand All @@ -302,28 +304,23 @@ class ExecutionResultService(
ResultStorage
.getOpResultStorage(workflowIdentity)
.getAllKeys
.filter(!_.id.startsWith("materialized_"))
.filter(!_.startsWith("materialized_"))
.map(storageKey => {
val count = ResultStorage
.getOpResultStorage(workflowIdentity)
.get(storageKey)
.getCount
.toInt

val opId = storageKey
val (opId, storagePortId) = OpResultStorage.decodeStorageKey(storageKey)

// use the first output port's mode
// Retrieve the mode of the specified output port
val mode = physicalPlan
.getPhysicalOpsOfLogicalOp(opId)
.flatMap(physicalOp => physicalOp.outputPorts)
.filter({
case (portId, (port, links, schema)) =>
!portId.internal
})
.map({
case (portId, (port, links, schema)) => port.mode
})
.flatMap(_.outputPorts.get(storagePortId))
.map(_._1.mode)
.head

val changeDetector =
if (mode == OutputMode.SET_SNAPSHOT) {
UUID.randomUUID.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowVersionResource
import org.jooq.types.UInteger
import edu.uci.ics.amber.util.ArrowUtils
import edu.uci.ics.amber.workflow.PortIdentity

import java.io.{PipedInputStream, PipedOutputStream}
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -71,8 +72,11 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
}

// By now the workflow should finish running
// Only supports external port 0 for now. TODO: support multiple ports
val operatorResult: VirtualDocument[Tuple] =
ResultStorage.getOpResultStorage(workflowIdentity).get(OperatorIdentity(request.operatorId))
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(OpResultStorage.createStorageKey(OperatorIdentity(request.operatorId), PortIdentity()))
if (operatorResult == null) {
return ResultExportResponse("error", "The workflow contains no results")
}
Expand Down Expand Up @@ -190,7 +194,7 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
val columnIndex = request.columnIndex
val filename = request.filename

if (rowIndex >= results.size || columnIndex >= results.head.getFields.size) {
if (rowIndex >= results.size || columnIndex >= results.head.getFields.length) {
return ResultExportResponse("error", s"Invalid row or column index")
}

Expand Down
Loading

0 comments on commit 5b622e3

Please sign in to comment.