Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove sink desc #3170

Merged
merged 20 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{
PhysicalOp,
PhysicalPlan,
Expand All @@ -14,9 +13,9 @@ import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
}
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity}
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink}
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator
Expand Down Expand Up @@ -159,9 +158,7 @@ abstract class ScheduleGenerator(
.removeLink(physicalLink)

// create cache writer and link
val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get
val matWriterPhysicalOp: PhysicalOp =
createMatWriter(physicalLink, Array(matWriterInputSchema))
val matWriterPhysicalOp: PhysicalOp = createMatWriter(physicalLink)
val sourceToWriterLink =
PhysicalLink(
fromOp.id,
Expand All @@ -173,6 +170,21 @@ abstract class ScheduleGenerator(
.addOperator(matWriterPhysicalOp)
.addLink(sourceToWriterLink)

// expect exactly one input port and one output port
val schema = newPhysicalPlan
shengquan-ni marked this conversation as resolved.
Show resolved Hide resolved
.getOperator(matWriterPhysicalOp.id)
.outputPorts(matWriterPhysicalOp.outputPorts.keys.head)
._3
.toOption
.get
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriterPhysicalOp.id.logicalOpId,
mode = OpResultStorage.defaultStorageMode,
schema = Some(schema)
)

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp =
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink)
Expand Down Expand Up @@ -219,31 +231,16 @@ abstract class ScheduleGenerator(

}

private def createMatWriter(
physicalLink: PhysicalLink,
inputSchema: Array[Schema]
): PhysicalOp = {
val matWriter = new ProgressiveSinkOpDesc()
matWriter.setContext(workflowContext)
matWriter.setOperatorId(s"materialized_${getMatIdFromPhysicalLink(physicalLink)}")
// expect exactly one input port and one output port
val schema = matWriter.getOutputSchema(inputSchema)
ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriter.operatorIdentifier,
mode = OpResultStorage.defaultStorageMode,
schema = Some(schema)
)
matWriter.setUpstreamId(
matWriter.operatorIdentifier
)

matWriter.getPhysicalOp(
private def createMatWriter(physicalLink: PhysicalLink): PhysicalOp = {
val outputMode =
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
val storageKey = s"materialized_${getMatIdFromPhysicalLink(physicalLink)}"
SpecialPhysicalOpFactory.newSinkPhysicalOp(
workflowContext.workflowId,
workflowContext.executionId
workflowContext.executionId,
storageKey,
outputMode
)

}

private def getMatIdFromPhysicalLink(physicalLink: PhysicalLink) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import edu.uci.ics.amber.core.storage.result.{
WorkflowResultStore
}
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan}
import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError}
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
COMPLETED,
Expand All @@ -22,8 +23,7 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.OutputPort.OutputMode
import edu.uci.ics.texera.web.SubscriptionManager
import edu.uci.ics.texera.web.model.websocket.event.{
Expand All @@ -33,7 +33,6 @@ import edu.uci.ics.texera.web.model.websocket.event.{
}
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import edu.uci.ics.texera.workflow.LogicalPlan

import java.util.UUID
import scala.collection.mutable
Expand Down Expand Up @@ -63,12 +62,23 @@ object ExecutionResultService {
* Produces the WebResultUpdate to send to frontend from a result update from the engine.
*/
private def convertWebResultUpdate(
sink: ProgressiveSinkOpDesc,
workflowIdentity: WorkflowIdentity,
physicalOps: List[PhysicalOp],
oldTupleCount: Int,
newTupleCount: Int
): WebResultUpdate = {
val outputMode = physicalOps
.flatMap(op => op.outputPorts)
.filter({
case (portId, (port, links, schema)) => !portId.internal
})
.map({
case (portId, (port, links, schema)) => port.mode
})
.head
shengquan-ni marked this conversation as resolved.
Show resolved Hide resolved

val webOutputMode: WebOutputMode = {
sink.getOutputMode match {
outputMode match {
// currently, only table outputs are using these modes
case OutputMode.SET_DELTA => SetDeltaMode()
case OutputMode.SET_SNAPSHOT => PaginationMode()
Expand All @@ -79,7 +89,7 @@ object ExecutionResultService {
}

val storage =
ResultStorage.getOpResultStorage(sink.getContext.workflowId).get(sink.getUpstreamId.get)
ResultStorage.getOpResultStorage(workflowIdentity).get(physicalOps.head.id.logicalOpId)
val webUpdate = webOutputMode match {
case PaginationMode() =>
val numTuples = storage.getCount
Expand All @@ -98,7 +108,7 @@ object ExecutionResultService {

case _ =>
throw new RuntimeException(
"update mode combination not supported: " + (webOutputMode, sink.getOutputMode)
"update mode combination not supported: " + (webOutputMode, outputMode)
)
}
webUpdate
Expand Down Expand Up @@ -150,18 +160,16 @@ object ExecutionResultService {
* - send result update event to the frontend
*/
class ExecutionResultService(
workflowIdentity: WorkflowIdentity,
val workflowStateStore: WorkflowStateStore
) extends SubscriptionManager
with LazyLogging {

var sinkOperators: mutable.HashMap[OperatorIdentity, ProgressiveSinkOpDesc] =
mutable.HashMap[OperatorIdentity, ProgressiveSinkOpDesc]()
private val resultPullingFrequency = AmberConfig.executionResultPollingInSecs
private var resultUpdateCancellable: Cancellable = _

def attachToExecution(
stateStore: ExecutionStateStore,
logicalPlan: LogicalPlan,
physicalPlan: PhysicalPlan,
client: AmberClient
): Unit = {

Expand All @@ -181,7 +189,7 @@ class ExecutionResultService(
2.seconds,
resultPullingFrequency.seconds
) {
onResultUpdate()
onResultUpdate(physicalPlan)
}
}
} else {
Expand All @@ -197,7 +205,7 @@ class ExecutionResultService(
logger.info("Workflow execution terminated. Stop update results.")
if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) {
// immediately perform final update
onResultUpdate()
onResultUpdate(physicalPlan)
}
}
})
Expand Down Expand Up @@ -225,18 +233,15 @@ class ExecutionResultService(
case (opId, info) =>
val oldInfo = oldState.resultInfo.getOrElse(opId, OperatorResultMetadata())
buf(opId.id) = ExecutionResultService.convertWebResultUpdate(
sinkOperators(opId),
workflowIdentity,
physicalPlan.getPhysicalOpsOfLogicalOp(opId),
oldInfo.tupleCount,
info.tupleCount
)
if (
StorageConfig.resultStorageMode.toLowerCase == "mongodb"
&& !opId.id.startsWith("sink")
) {
val sinkOp = sinkOperators(opId)
if (StorageConfig.resultStorageMode.toLowerCase == "mongodb") {
val opStorage = ResultStorage
.getOpResultStorage(sinkOp.getContext.workflowId)
.get(sinkOp.getUpstreamId.get)
.getOpResultStorage(workflowIdentity)
.get(physicalPlan.getPhysicalOpsOfLogicalOp(opId).head.id.logicalOpId)
shengquan-ni marked this conversation as resolved.
Show resolved Hide resolved
opStorage match {
case mongoDocument: MongoDocument[Tuple] =>
val tableCatStats = mongoDocument.getCategoricalStats
Expand All @@ -262,22 +267,11 @@ class ExecutionResultService(
})
)

// first clear all the results
sinkOperators.clear()
// clear all the result metadata
workflowStateStore.resultStore.updateState { _ =>
WorkflowResultStore() // empty result store
}

// For operators connected to a sink and sinks,
// create result service so that the results can be displayed.
logicalPlan.getTerminalOperatorIds.map(sink => {
logicalPlan.getOperator(sink) match {
case sinkOp: ProgressiveSinkOpDesc =>
sinkOperators += ((sinkOp.getUpstreamId.get, sinkOp))
sinkOperators += ((sink, sinkOp))
case other => // skip other non-texera-managed sinks, if any
}
})
}

def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = {
Expand All @@ -286,16 +280,12 @@ class ExecutionResultService(
val opId = OperatorIdentity(request.operatorID)
val paginationIterable = {

if (sinkOperators.contains(opId)) {
val sinkOp = sinkOperators(opId)
ResultStorage
.getOpResultStorage(sinkOp.getContext.workflowId)
.get(sinkOp.getUpstreamId.get)
.getRange(from, from + request.pageSize)
.to(Iterable)
} else {
Iterable.empty
}
ResultStorage
.getOpResultStorage(workflowIdentity)
.get(opId)
.getRange(from, from + request.pageSize)
.to(Iterable)

}
val mappedResults = paginationIterable
.map(tuple => tuple.asKeyValuePairJson())
Expand All @@ -306,23 +296,42 @@ class ExecutionResultService(
PaginatedResultEvent.apply(request, mappedResults, attributes)
}

private def onResultUpdate(): Unit = {
private def onResultUpdate(physicalPlan: PhysicalPlan): Unit = {
workflowStateStore.resultStore.updateState { _ =>
val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = sinkOperators.map {

case (id, sink) =>
val count = ResultStorage
.getOpResultStorage(sink.getContext.workflowId)
.get(sink.getUpstreamId.get)
.getCount
.toInt
val mode = sink.getOutputMode
val changeDetector =
if (mode == OutputMode.SET_SNAPSHOT) {
UUID.randomUUID.toString
} else ""
(id, OperatorResultMetadata(count, changeDetector))
}.toMap
val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = {
ResultStorage
.getOpResultStorage(workflowIdentity)
.getAllKeys
.filter(!_.id.startsWith("materialized_"))
.map(storageKey => {
val count = ResultStorage
.getOpResultStorage(workflowIdentity)
.get(storageKey)
.getCount
.toInt

val opId = storageKey

// use the first output port's mode
val mode = physicalPlan
.getPhysicalOpsOfLogicalOp(opId)
.flatMap(physicalOp => physicalOp.outputPorts)
.filter({
case (portId, (port, links, schema)) =>
!portId.internal
})
.map({
case (portId, (port, links, schema)) => port.mode
})
.head
val changeDetector =
if (mode == OutputMode.SET_SNAPSHOT) {
UUID.randomUUID.toString
} else ""
(opId, OperatorResultMetadata(count, changeDetector))
})
.toMap
}
WorkflowResultStore(newInfo)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class WorkflowExecutionService(
executionConsoleService = new ExecutionConsoleService(client, executionStateStore, wsInput)

logger.info("Starting the workflow execution.")
resultService.attachToExecution(executionStateStore, workflow.logicalPlan, client)
resultService.attachToExecution(executionStateStore, workflow.physicalPlan, client)
executionStateStore.metadataStore.updateState(metadataStore =>
updateWorkflowState(READY, metadataStore)
.withFatalErrors(Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class WorkflowService(
val stateStore = new WorkflowStateStore()
var executionService: BehaviorSubject[WorkflowExecutionService] = BehaviorSubject.create()

val resultService: ExecutionResultService = new ExecutionResultService(stateStore)
val resultService: ExecutionResultService = new ExecutionResultService(workflowId, stateStore)
val exportService: ResultExportService = new ResultExportService(workflowId)
val lifeCycleManager: WorkflowLifecycleManager = new WorkflowLifecycleManager(
s"workflowId=$workflowId",
Expand Down
Loading
Loading