Skip to content

Commit

Permalink
Remove sink desc (#3170)
Browse files Browse the repository at this point in the history
This PR refactors the handling of sink operators in Texera by removing
the sink descriptor and introducing a streamlined approach to creating
physical sink operators during compilation and scheduling. Additionally,
it shifts the storage assignment logic from the logical layer to the
physical layer.

1. **Sink Descriptor Removal:** Removed the sink descriptor, physical
sink operators are no longer created through descriptors. In the future,
we will remove physical sink operators.
2. **Sink Operator Creation:**
- Introduced a temporary factory for creating physical sink operators
without relying on a descriptor.
- Physical sink operators are now considered part of the sub-plan of
their upstream logical operator. For example: If the HashJoin logical
operator requires a sink, its physical sub-plan includes the building
physicalOp, probing physicalOp, and the sink physicalOp.
3. **Storage Assignment Refactor:**
- Merged the storage assignment logic into the physical layer, removing
it from the logical layer.
- When a physical sink operator is created (either during compilation or
scheduling), its associated storage is also created at the same moment.
  • Loading branch information
Yicong-Huang authored Dec 20, 2024
1 parent 5524099 commit f90362e
Show file tree
Hide file tree
Showing 22 changed files with 329 additions and 839 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{
PhysicalOp,
PhysicalPlan,
Expand All @@ -14,9 +13,9 @@ import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
}
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity}
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink}
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator
Expand Down Expand Up @@ -159,9 +158,7 @@ abstract class ScheduleGenerator(
.removeLink(physicalLink)

// create cache writer and link
val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get
val matWriterPhysicalOp: PhysicalOp =
createMatWriter(physicalLink, Array(matWriterInputSchema))
val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink)
val sourceToWriterLink =
PhysicalLink(
fromOp.id,
Expand All @@ -173,6 +170,21 @@ abstract class ScheduleGenerator(
.addOperator(matWriterPhysicalOp)
.addLink(sourceToWriterLink)

// expect exactly one input port and one output port
val schema = newPhysicalPlan
.getOperator(matWriterPhysicalOp.id)
.outputPorts(matWriterPhysicalOp.outputPorts.keys.head)
._3
.toOption
.get
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriterPhysicalOp.id.logicalOpId,
mode = OpResultStorage.defaultStorageMode,
schema = Some(schema)
)

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp =
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink)
Expand Down Expand Up @@ -219,31 +231,16 @@ abstract class ScheduleGenerator(

}

private def createMatWriter(
physicalLink: PhysicalLink,
inputSchema: Array[Schema]
): PhysicalOp = {
val matWriter = new ProgressiveSinkOpDesc()
matWriter.setContext(workflowContext)
matWriter.setOperatorId(s"materialized_${getMatIdFromPhysicalLink(physicalLink)}")
// expect exactly one input port and one output port
val schema = matWriter.getOutputSchema(inputSchema)
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriter.operatorIdentifier,
mode = OpResultStorage.defaultStorageMode,
schema = Some(schema)
)
matWriter.setUpstreamId(
matWriter.operatorIdentifier
)

matWriter.getPhysicalOp(
private def createMatWriter(physicalLink: PhysicalLink): PhysicalOp = {
val outputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}"
SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId
workflowContext.executionId,
storageKey,
outputMode
)

}

private def getMatIdFromPhysicalLink(physicalLink: PhysicalLink) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import edu.uci.ics.amber.core.storage.result.{
WorkflowResultStore
}
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan}
import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError}
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
COMPLETED,
Expand All @@ -22,8 +23,7 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.OutputPort.OutputMode
import edu.uci.ics.texera.web.SubscriptionManager
import edu.uci.ics.texera.web.model.websocket.event.{
Expand All @@ -33,7 +33,6 @@ import edu.uci.ics.texera.web.model.websocket.event.{
}
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import edu.uci.ics.texera.workflow.LogicalPlan

import java.util.UUID
import scala.collection.mutable
Expand Down Expand Up @@ -63,12 +62,23 @@ object ExecutionResultService {
* Produces the WebResultUpdate to send to frontend from a result update from the engine.
*/
private def convertWebResultUpdate(
sink: ProgressiveSinkOpDesc,
workflowIdentity: WorkflowIdentity,
physicalOps: List[PhysicalOp],
oldTupleCount: Int,
newTupleCount: Int
): WebResultUpdate = {
val outputMode = physicalOps
.flatMap(op => op.outputPorts)
.filter({
case (portId, (port, links, schema)) => !portId.internal
})
.map({
case (portId, (port, links, schema)) => port.mode
})
.head

val webOutputMode: WebOutputMode = {
sink.getOutputMode match {
outputMode match {
// currently, only table outputs are using these modes
case OutputMode.SET_DELTA => SetDeltaMode()
case OutputMode.SET_SNAPSHOT => PaginationMode()
Expand All @@ -79,7 +89,7 @@ object ExecutionResultService {
}

val storage =
ResultStorage.getOpResultStorage(sink.getContext.workflowId).get(sink.getUpstreamId.get)
ResultStorage.getOpResultStorage(workflowIdentity).get(physicalOps.head.id.logicalOpId)
val webUpdate = webOutputMode match {
case PaginationMode() =>
val numTuples = storage.getCount
Expand All @@ -98,7 +108,7 @@ object ExecutionResultService {

case _ =>
throw new RuntimeException(
"update mode combination not supported: " + (webOutputMode, sink.getOutputMode)
"update mode combination not supported: " + (webOutputMode, outputMode)
)
}
webUpdate
Expand Down Expand Up @@ -150,18 +160,16 @@ object ExecutionResultService {
* - send result update event to the frontend
*/
class ExecutionResultService(
workflowIdentity: WorkflowIdentity,
val workflowStateStore: WorkflowStateStore
) extends SubscriptionManager
with LazyLogging {

var sinkOperators: mutable.HashMap[OperatorIdentity, ProgressiveSinkOpDesc] =
mutable.HashMap[OperatorIdentity, ProgressiveSinkOpDesc]()
private val resultPullingFrequency = AmberConfig.executionResultPollingInSecs
private var resultUpdateCancellable: Cancellable = _

def attachToExecution(
stateStore: ExecutionStateStore,
logicalPlan: LogicalPlan,
physicalPlan: PhysicalPlan,
client: AmberClient
): Unit = {

Expand All @@ -181,7 +189,7 @@ class ExecutionResultService(
2.seconds,
resultPullingFrequency.seconds
) {
onResultUpdate()
onResultUpdate(physicalPlan)
}
}
} else {
Expand All @@ -197,7 +205,7 @@ class ExecutionResultService(
logger.info("Workflow execution terminated. Stop update results.")
if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) {
// immediately perform final update
onResultUpdate()
onResultUpdate(physicalPlan)
}
}
})
Expand Down Expand Up @@ -225,18 +233,15 @@ class ExecutionResultService(
case (opId, info) =>
val oldInfo = oldState.resultInfo.getOrElse(opId, OperatorResultMetadata())
buf(opId.id) = ExecutionResultService.convertWebResultUpdate(
sinkOperators(opId),
workflowIdentity,
physicalPlan.getPhysicalOpsOfLogicalOp(opId),
oldInfo.tupleCount,
info.tupleCount
)
if (
StorageConfig.resultStorageMode.toLowerCase == "mongodb"
&& !opId.id.startsWith("sink")
) {
val sinkOp = sinkOperators(opId)
if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") {
val opStorage = ResultStorage
.getOpResultStorage(sinkOp.getContext.workflowId)
.get(sinkOp.getUpstreamId.get)
.getOpResultStorage(workflowIdentity)
.get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId)
opStorage match {
case mongoDocument: MongoDocument[Tuple] =>
val tableCatStats = mongoDocument.getCategoricalStats
Expand All @@ -262,22 +267,11 @@ class ExecutionResultService(
})
)

// first clear all the results
sinkOperators.clear()
// clear all the result metadata
workflowStateStore.resultStore.updateState { _ =>
WorkflowResultStore() // empty result store
}

// For operators connected to a sink and sinks,
// create result service so that the results can be displayed.
logicalPlan.getTerminalOperatorIds.map(sink => {
logicalPlan.getOperator(sink) match {
case sinkOp: ProgressiveSinkOpDesc =>
sinkOperators += ((sinkOp.getUpstreamId.get, sinkOp))
sinkOperators += ((sink, sinkOp))
case other => // skip other non-texera-managed sinks, if any
}
})
}

def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = {
Expand All @@ -286,16 +280,12 @@ class ExecutionResultService(
val opId = OperatorIdentity(request.operatorID)
val paginationIterable = {

if (sinkOperators.contains(opId)) {
val sinkOp = sinkOperators(opId)
ResultStorage
.getOpResultStorage(sinkOp.getContext.workflowId)
.get(sinkOp.getUpstreamId.get)
.getRange(from, from + request.pageSize)
.to(Iterable)
} else {
Iterable.empty
}
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(opId)
.getRange(from, from + request.pageSize)
.to(Iterable)

}
val mappedResults = paginationIterable
.map(tuple => tuple.asKeyValuePairJson())
Expand All @@ -306,23 +296,42 @@ class ExecutionResultService(
PaginatedResultEvent.apply(request, mappedResults, attributes)
}

private def onResultUpdate(): Unit = {
private def onResultUpdate(physicalPlan: PhysicalPlan): Unit = {
workflowStateStore.resultStore.updateState { _ =>
val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = sinkOperators.map {

case (id, sink) =>
val count = ResultStorage
.getOpResultStorage(sink.getContext.workflowId)
.get(sink.getUpstreamId.get)
.getCount
.toInt
val mode = sink.getOutputMode
val changeDetector =
if (mode == OutputMode.SET_SNAPSHOT) {
UUID.randomUUID.toString
} else ""
(id, OperatorResultMetadata(count, changeDetector))
}.toMap
val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = {
ResultStorage
.getOpResultStorage(workflowIdentity)
.getAllKeys
.filter(!_.id.startsWith("materialized_"))
.map(storageKey => {
val count = ResultStorage
.getOpResultStorage(workflowIdentity)
.get(storageKey)
.getCount
.toInt

val opId = storageKey

// use the first output port's mode
val mode = physicalPlan
.getPhysicalOpsOfLogicalOp(opId)
.flatMap(physicalOp => physicalOp.outputPorts)
.filter({
case (portId, (port, links, schema)) =>
!portId.internal
})
.map({
case (portId, (port, links, schema)) => port.mode
})
.head
val changeDetector =
if (mode == OutputMode.SET_SNAPSHOT) {
UUID.randomUUID.toString
} else ""
(opId, OperatorResultMetadata(count, changeDetector))
})
.toMap
}
WorkflowResultStore(newInfo)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class WorkflowExecutionService(
executionConsoleService = new ExecutionConsoleService(client, executionStateStore, wsInput)

logger.info("Starting the workflow execution.")
resultService.attachToExecution(executionStateStore, workflow.logicalPlan, client)
resultService.attachToExecution(executionStateStore, workflow.physicalPlan, client)
executionStateStore.metadataStore.updateState(metadataStore =>
updateWorkflowState(READY, metadataStore)
.withFatalErrors(Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class WorkflowService(
val stateStore = new WorkflowStateStore()
var executionService: BehaviorSubject[WorkflowExecutionService] = BehaviorSubject.create()

val resultService: ExecutionResultService = new ExecutionResultService(stateStore)
val resultService: ExecutionResultService = new ExecutionResultService(workflowId, stateStore)
val exportService: ResultExportService = new ResultExportService(workflowId)
val lifeCycleManager: WorkflowLifecycleManager = new WorkflowLifecycleManager(
s"workflowId=$workflowId",
Expand Down
Loading

0 comments on commit f90362e

Please sign in to comment.