Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Storage on Output Ports of Operators #2672

Draft
wants to merge 9 commits into
base: xiaozhen-port-storage-master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message OutputPort {
PortIdentity id = 1 [(scalapb.field).no_box = true];
string displayName = 2;
bool blocking = 3;
string storageLocation = 4;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class ControllerProcessor(
() => this.workflowScheduler.getNextRegions,
workflowExecution,
controllerConfig,
asyncRPCClient
asyncRPCClient,
opResultStorage
)

private val initializer = new ControllerAsyncRPCHandlerInitializer(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{
import edu.uci.ics.amber.engine.common.VirtualIdentityUtils
import edu.uci.ics.amber.engine.common.virtualidentity._
import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity}
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.operators.LogicalOp
import edu.uci.ics.texera.workflow.common.storage.OpResultStorage
import edu.uci.ics.texera.workflow.common.tuple.schema.Schema
import edu.uci.ics.texera.workflow.common.workflow._
import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph}
Expand Down Expand Up @@ -440,6 +443,49 @@ case class PhysicalOp(
}
}

def assignOutputPortStorages(
logicalOp: LogicalOp,
context: WorkflowContext,
opResultStorageOptional: Option[OpResultStorage]
): PhysicalOp = {
opResultStorageOptional match {
case Some(opResultStorage: OpResultStorage) =>
logicalOp.outputPorts.zipWithIndex.foldLeft(this) { (currentOp, portWithIndex) =>
{
if (
portWithIndex._1.hasStorage && currentOp.outputPorts
.exists(pred => pred._1.id == portWithIndex._2)
) {
val correspondingPortId =
currentOp.outputPorts.keys.filter(portId => portId.id == portWithIndex._2).head
val existingContent = outputPorts(correspondingPortId)
val storageKey = s"${currentOp.id}_outPort_${correspondingPortId.id}"
val storageType = OpResultStorage.defaultStorageMode
val createdStorageReader = opResultStorage.createPortStorage(
s"${context.executionId}_",
storageKey,
storageType
)
existingContent._3 match {
case Left(_) =>
case Right(schema) => createdStorageReader.setSchema(schema)
}
currentOp.copy(outputPorts =
outputPorts.updated(
correspondingPortId,
existingContent.copy(_1 = existingContent._1.copy(storageLocation = storageKey))
)
)
} else {
currentOp
}
}
}
case _ => this
}

}

/**
* returns all output links. Optionally, if a specific portId is provided, returns the links connected to that portId.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import edu.uci.ics.amber.engine.common.tuple.amber.{SchemaEnforceable, TupleLike
import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity}
import edu.uci.ics.texera.workflow.common.tuple.schema.Schema
import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter

import scala.collection.mutable

Expand Down Expand Up @@ -135,6 +136,21 @@ class OutputManager(
networkOutputBuffers((link, partitioner.allReceivers(bucketIndex))).addTuple(tuple)
}
}

// Save to storage

(outputPortId match {
case Some(portId) => ports.filter(_._1 == portId)
case None => ports
}).foreach(kv => {
val portId = kv._1
kv._2.storage match {
case Some(storageWriter) =>
val tuple = tupleLike.enforceSchema(getPort(portId).schema)
storageWriter.putOne(tuple)
case None =>
}
})
}

/**
Expand Down Expand Up @@ -172,13 +188,16 @@ class OutputManager(
})
}

def addPort(portId: PortIdentity, schema: Schema): Unit = {
def addPort(portId: PortIdentity, schema: Schema, storage: Option[SinkStorageWriter]): Unit = {
// each port can only be added and initialized once.
if (this.ports.contains(portId)) {
return
}
this.ports(portId) = WorkerPort(schema)

this.ports(portId) = WorkerPort(schema, storage = storage)
this.ports(portId).storage match {
case Some(storageWriter) => storageWriter.open()
case None =>
}
}

def getPort(portId: PortIdentity): WorkerPort = ports(portId)
Expand All @@ -193,6 +212,15 @@ class OutputManager(
outputIterator.appendSpecialTupleToEnd(FinalizeExecutor())
}

def closeOutputStorages(): Unit = {
this.ports.values.foreach(workerPort => {
workerPort.storage match {
case Some(storageWriter) => storageWriter.close()
case None =>
}
})
}

def getSingleOutputPortIdentity: PortIdentity = {
assert(ports.size == 1)
ports.head._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer

import edu.uci.ics.amber.engine.common.virtualidentity.ChannelIdentity
import edu.uci.ics.texera.workflow.common.tuple.schema.Schema
import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter

import scala.collection.mutable

case class WorkerPort(
schema: Schema,
// TODO: change it to manage the actual AmberFIFOChannel instead of Boolean
channels: mutable.HashMap[ChannelIdentity, Boolean] = mutable.HashMap()
channels: mutable.HashMap[ChannelIdentity, Boolean] = mutable.HashMap(),
storage: Option[SinkStorageWriter] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient
import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
import edu.uci.ics.amber.engine.common.workflow.PhysicalLink
import edu.uci.ics.texera.web.workflowruntimestate.WorkflowAggregatedState
import edu.uci.ics.texera.workflow.common.storage.OpResultStorage

import scala.collection.Seq
class RegionExecutionCoordinator(
region: Region,
workflowExecution: WorkflowExecution,
asyncRPCClient: AsyncRPCClient,
controllerConfig: ControllerConfig
controllerConfig: ControllerConfig,
opResultStorage: OpResultStorage
) {
def execute(actorService: AkkaActorService): Future[Unit] = {

Expand Down Expand Up @@ -144,23 +146,36 @@ class RegionExecutionCoordinator(
val inputPortMapping = physicalOp.inputPorts
.flatMap {
case (inputPortId, (_, _, Right(schema))) =>
Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> schema)
Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> (schema, None))
case _ => None
}
val outputPortMapping = physicalOp.outputPorts
.flatMap {
case (outputPortId, (_, _, Right(schema))) =>
Some(GlobalPortIdentity(physicalOp.id, outputPortId, input = false) -> schema)
case (outputPortId, (outputPort, _, Right(schema))) =>
Some(
GlobalPortIdentity(
physicalOp.id,
outputPortId,
input = false
) -> (schema, outputPort.storageLocation match {
case "" => None
case location => Some(opResultStorage.getPortStorage(location).getStorageWriter)
case _ => None
})
)
case _ => None
}
inputPortMapping ++ outputPortMapping
}
.flatMap {
case (globalPortId, schema) =>
case (globalPortId, (schema, storage)) =>
resourceConfig.operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId).map {
workerId =>
asyncRPCClient
.send(AssignPort(globalPortId.portId, globalPortId.input, schema), workerId)
.send(
AssignPort(globalPortId.portId, globalPortId.input, schema, storage),
workerId
)
}
}
.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
import edu.uci.ics.amber.engine.architecture.controller.execution.WorkflowExecution
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient
import edu.uci.ics.amber.engine.common.workflow.PhysicalLink
import edu.uci.ics.texera.workflow.common.storage.OpResultStorage

import scala.collection.mutable

class WorkflowExecutionCoordinator(
getNextRegions: () => Set[Region],
workflowExecution: WorkflowExecution,
controllerConfig: ControllerConfig,
asyncRPCClient: AsyncRPCClient
asyncRPCClient: AsyncRPCClient,
opResultStorage: OpResultStorage
) extends LazyLogging {

private val executedRegions: mutable.ListBuffer[Set[Region]] = mutable.ListBuffer()
Expand All @@ -41,7 +43,8 @@ class WorkflowExecutionCoordinator(
region,
workflowExecution,
asyncRPCClient,
controllerConfig
controllerConfig,
opResultStorage
)
regionExecutionCoordinators(region.id)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class DataProcessor(
outputManager.emitEndOfUpstream()
// Send Completed signal to worker actor.
executor.close()
outputManager.closeOutputStorages()
adaptiveBatchingMonitor.stopAdaptiveBatching()
stateManager.transitTo(COMPLETED)
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ControlCommandConvertUtils {
ResumeWorkerV2()
case OpenExecutor() =>
OpenExecutorV2()
case AssignPort(portId, input, schema) =>
case AssignPort(portId, input, schema, _) =>
AssignPortV2(portId, input, schema.toRawSchema)
case AddPartitioning(tag: PhysicalLink, partitioning: Partitioning) =>
AddPartitioningV2(tag, partitioning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AssignPortHa
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.workflow.PortIdentity
import edu.uci.ics.texera.workflow.common.tuple.schema.Schema
import edu.uci.ics.texera.workflow.operators.sink.storage.SinkStorageWriter

object AssignPortHandler {

final case class AssignPort(
portId: PortIdentity,
input: Boolean,
schema: Schema
schema: Schema,
storage: Option[SinkStorageWriter]
) extends ControlCommand[Unit]
}

Expand All @@ -22,7 +24,7 @@ trait AssignPortHandler {
if (msg.input) {
dp.inputManager.addPort(msg.portId, msg.schema)
} else {
dp.outputManager.addPort(msg.portId, msg.schema)
dp.outputManager.addPort(msg.portId, msg.schema, msg.storage)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ case class PortDescription(
allowMultiInputs: Boolean,
isDynamicPort: Boolean,
partitionRequirement: PartitionInfo,
dependencies: List[Int] = List.empty
dependencies: List[Int] = List.empty,
hasStorage: Boolean
)

trait PortDescriptor {
@JsonProperty(required = false)
var inputPorts: List[PortDescription] = null

@JsonProperty(required = false)
var outputPorts: List[PortDescription] = null
var outputPorts: List[PortDescription] = List.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,47 @@ class OpResultStorage extends Serializable with LazyLogging {
val cache: ConcurrentHashMap[OperatorIdentity, SinkStorageReader] =
new ConcurrentHashMap[OperatorIdentity, SinkStorageReader]()

val portStorage: ConcurrentHashMap[String, SinkStorageReader] =
new ConcurrentHashMap[String, SinkStorageReader]()

def getPortStorage(key: String): SinkStorageReader = {
portStorage.get(key)
}

def createPortStorage(
executionId: String = "",
key: String,
mode: String
): SinkStorageReader = {
val storage: SinkStorageReader =
if (mode == "memory") {
new MemoryStorage
} else {
try {
new MongoDBSinkStorage(executionId + key)
} catch {
case t: Throwable =>
logger.warn("Failed to create mongo storage", t)
logger.info(s"Fall back to memory storage for $key")
// fall back to memory
new MemoryStorage
}
}
portStorage.put(key, storage)
storage
}

def containsPortStorage(key: String): Boolean = {
portStorage.contains(key)
}

def removePortStorage(key: String): Unit = {
if (portStorage.contains(key)) {
portStorage.get(key).clear()
}
portStorage.remove(key)
}

/**
* Retrieve the result of an operator from OpResultStorage
* @param key The key used for storage and retrieval.
Expand Down Expand Up @@ -81,6 +122,8 @@ class OpResultStorage extends Serializable with LazyLogging {
def close(): Unit = {
cache.forEach((_, sinkStorageReader) => sinkStorageReader.clear())
cache.clear()
portStorage.forEach((_, sinkStorageReader) => sinkStorageReader.clear())
portStorage.clear()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.{
}
import edu.uci.ics.amber.engine.common.workflow.PhysicalLink
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.storage.OpResultStorage
import org.jgrapht.alg.connectivity.BiconnectivityInspector
import org.jgrapht.alg.shortestpath.AllDirectedPaths
import org.jgrapht.graph.DirectedAcyclicGraph
Expand All @@ -20,7 +21,11 @@ import scala.jdk.CollectionConverters.{IteratorHasAsScala, ListHasAsScala, SetHa

object PhysicalPlan {

def apply(context: WorkflowContext, logicalPlan: LogicalPlan): PhysicalPlan = {
def apply(
context: WorkflowContext,
logicalPlan: LogicalPlan,
opResultStorage: Option[OpResultStorage] = None
): PhysicalPlan = {

var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)

Expand Down Expand Up @@ -49,7 +54,11 @@ object PhysicalPlan {
val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id)

// Add the operator to the physical plan
physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema())
physicalPlan = physicalPlan.addOperator(
physicalOp
.propagateSchema()
.assignOutputPortStorages(logicalOp, context, opResultStorage)
)

// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks)
Expand Down
Loading
Loading