diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala index da7ece8b16b..2809fc449a3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala @@ -8,6 +8,8 @@ trait InputGateway { def tryPickChannel: Option[AmberFIFOChannel] + def getCurrentChannelId: Option[ChannelIdentity] + def getAllChannels: Iterable[AmberFIFOChannel] def getAllDataChannels: Iterable[AmberFIFOChannel] diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala index 2bb65a1954c..074fa8d56a5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala @@ -1,17 +1,13 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.engine.common.AmberLogging -import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} +import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.engine.common.workflow.PortIdentity -import edu.uci.ics.texera.workflow.common.tuple.Tuple import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import scala.collection.mutable class InputManager(val actorId: ActorVirtualIdentity) extends AmberLogging { - private var inputBatch: Array[Tuple] = _ - private var currentInputIdx: Int = -1 - var currentChannelId: ChannelIdentity = _ private val ports: mutable.HashMap[PortIdentity, WorkerPort] = mutable.HashMap() def getAllPorts: Set[PortIdentity] = { @@ -36,25 +32,4 @@ class InputManager(val actorId: ActorVirtualIdentity) extends AmberLogging { this.ports(portId).channels.values.forall(completed => completed) } - def hasUnfinishedInput: Boolean = inputBatch != null && currentInputIdx + 1 < inputBatch.length - - def getNextTuple: Tuple = { - currentInputIdx += 1 - inputBatch(currentInputIdx) - } - def getCurrentTuple: Tuple = { - if (inputBatch == null) { - null - } else if (inputBatch.isEmpty) { - null // TODO: create input exhausted - } else { - inputBatch(currentInputIdx) - } - } - - def initBatch(channelId: ChannelIdentity, batch: Array[Tuple]): Unit = { - currentChannelId = channelId - inputBatch = batch - currentInputIdx = -1 - } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index fbf239a9400..28394295ec7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -14,6 +14,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) private val inputChannels = new mutable.HashMap[ChannelIdentity, AmberFIFOChannel]() + private var currentChannelId: Option[ChannelIdentity] = None + @transient lazy private val enforcers = mutable.ListBuffer[OrderEnforcer]() def tryPickControlChannel: Option[AmberFIFOChannel] = { @@ -33,6 +35,7 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) def tryPickChannel: Option[AmberFIFOChannel] = { val control = tryPickControlChannel val ret = if (control.isDefined) { + this.currentChannelId = control.map(_.channelId) control } else { inputChannels @@ -41,12 +44,18 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) !cid.isControl && channel.isEnabled && channel.hasMessage && enforcers .forall(enforcer => enforcer.isCompleted || enforcer.canProceed(cid)) }) - .map(_._2) + .map { + case (channelId, channel) => + this.currentChannelId = Some(channelId) + channel + } } enforcers.filter(enforcer => enforcer.isCompleted).foreach(enforcer => enforcers -= enforcer) ret } + override def getCurrentChannelId: Option[ChannelIdentity] = this.currentChannelId + def getAllDataChannels: Iterable[AmberFIFOChannel] = inputChannels.filter(!_._1.isControl).values diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 03788013efb..d9ae148d477 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -1,16 +1,14 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, getBatchSize, toPartitioner } import edu.uci.ics.amber.engine.architecture.sendsemantics.partitioners._ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._ -import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} import edu.uci.ics.amber.engine.common.AmberLogging import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand -import edu.uci.ics.amber.engine.common.tuple.amber.{SchemaEnforceable, TupleLike} +import edu.uci.ics.amber.engine.common.tuple.amber.SchemaEnforceable import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.workflow.common.tuple.schema.Schema @@ -49,32 +47,6 @@ object OutputManager { } } - class DPOutputIterator extends Iterator[(TupleLike, Option[PortIdentity])] { - val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] - @transient var outputIter: Iterator[(TupleLike, Option[PortIdentity])] = Iterator.empty - - def setTupleOutput(outputIter: Iterator[(TupleLike, Option[PortIdentity])]): Unit = { - if (outputIter != null) { - this.outputIter = outputIter - } else { - this.outputIter = Iterator.empty - } - } - - override def hasNext: Boolean = outputIter.hasNext || queue.nonEmpty - - override def next(): (TupleLike, Option[PortIdentity]) = { - if (outputIter.hasNext) { - outputIter.next() - } else { - queue.remove(0) - } - } - - def appendSpecialTupleToEnd(tuple: TupleLike): Unit = { - queue.append((tuple, None)) - } - } } /** This class is a container of all the transfer partitioners. @@ -87,7 +59,6 @@ class OutputManager( outputGateway: NetworkOutputGateway ) extends AmberLogging { - val outputIterator: DPOutputIterator = new DPOutputIterator() private val partitioners: mutable.Map[PhysicalLink, Partitioner] = mutable.HashMap[PhysicalLink, Partitioner]() @@ -183,14 +154,6 @@ class OutputManager( def getPort(portId: PortIdentity): WorkerPort = ports(portId) - def hasUnfinishedOutput: Boolean = outputIterator.hasNext - - def finalizeOutput(): Unit = { - this.ports.keys - .foreach(outputPortId => - outputIterator.appendSpecialTupleToEnd(FinalizePort(outputPortId, input = false)) - ) - outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) - } + def getAllPortIds: Set[PortIdentity] = this.ports.keySet.toSet } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala index ca2f451dad1..d8f124ba5e9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala @@ -137,19 +137,19 @@ class DPThread( // // Main loop step 2: do input selection // - var channelId: ChannelIdentity = null - var msgOpt: Option[WorkflowFIFOMessage] = None + var channelId: Option[ChannelIdentity] = None + var msg: Option[WorkflowFIFOMessage] = None if ( - dp.inputManager.hasUnfinishedInput || dp.outputManager.hasUnfinishedOutput || dp.pauseManager.isPaused + dp.tupleProcessingManager.inputIterator.hasNext || dp.tupleProcessingManager.outputIterator.hasNext || dp.pauseManager.isPaused ) { dp.inputGateway.tryPickControlChannel match { case Some(channel) => - channelId = channel.channelId - msgOpt = Some(channel.take) + channelId = dp.inputGateway.getCurrentChannelId + msg = Some(channel.take) case None => // continue processing if (!dp.pauseManager.isPaused && !backpressureStatus) { - channelId = dp.inputManager.currentChannelId + channelId = dp.inputGateway.getCurrentChannelId } else { waitingForInput = true } @@ -162,8 +162,8 @@ class DPThread( dp.inputGateway.tryPickChannel } match { case Some(channel) => - channelId = channel.channelId - msgOpt = Some(channel.take) + channelId = dp.inputGateway.getCurrentChannelId + msg = Some(channel.take) case None => waitingForInput = true } } @@ -171,11 +171,11 @@ class DPThread( // // Main loop step 3: process selected message payload // - if (channelId != null) { + if (channelId.isDefined) { // for logging, skip large data frames. - val msgToLog = msgOpt.filter(_.payload.isInstanceOf[ControlPayload]) - logManager.withFaultTolerant(channelId, msgToLog) { - msgOpt match { + val msgToLog = msg.filter(_.payload.isInstanceOf[ControlPayload]) + logManager.withFaultTolerant(channelId.get, msgToLog) { + msg match { case None => dp.continueDataProcessing() case Some(msg) => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index fe066a4ef5c..51cbc4be2c6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -63,6 +63,7 @@ class DataProcessor( val stateManager: WorkerStateManager = new WorkerStateManager() val inputManager: InputManager = new InputManager(actorId) val outputManager: OutputManager = new OutputManager(actorId, outputGateway) + val tupleProcessingManager: TupleProcessingManager = new TupleProcessingManager(actorId) val channelMarkerManager: ChannelMarkerManager = new ChannelMarkerManager(actorId, inputGateway) val serializationManager: SerializationManager = new SerializationManager(actorId) def getQueuedCredit(channelId: ChannelIdentity): Long = { @@ -80,12 +81,12 @@ class DataProcessor( * process currentInputTuple through executor logic. * this function is only called by the DP thread. */ - private[this] def processInputTuple(tuple: Tuple): Unit = { + private[this] def processInputTuple(tuple: Tuple, portId: PortIdentity): Unit = { try { - outputManager.outputIterator.setTupleOutput( + tupleProcessingManager.outputIterator.setInternalIter( executor.processTupleMultiPort( tuple, - this.inputGateway.getChannel(inputManager.currentChannelId).getPortId.id + portId.id ) ) statisticsManager.increaseInputTupleCount() @@ -101,13 +102,10 @@ class DataProcessor( * process end of an input port with Executor.onFinish(). * this function is only called by the DP thread. */ - private[this] def processInputExhausted(): Unit = { + private[this] def processOnFinish(portId: PortIdentity): Unit = { try { - outputManager.outputIterator.setTupleOutput( - executor.onFinishMultiPort( - this.inputGateway.getChannel(inputManager.currentChannelId).getPortId.id - ) - ) + val output = executor.onFinishMultiPort(portId.id) + tupleProcessingManager.outputIterator.setInternalIter(output) } catch safely { case e => // forward input tuple to the user and pause DP thread @@ -122,13 +120,13 @@ class DataProcessor( adaptiveBatchingMonitor.startAdaptiveBatching() var out: (TupleLike, Option[PortIdentity]) = null try { - out = outputManager.outputIterator.next() + out = tupleProcessingManager.outputIterator.next() } catch safely { case e => // invalidate current output tuple out = null // also invalidate outputIterator - outputManager.outputIterator.setTupleOutput(Iterator.empty) + tupleProcessingManager.outputIterator.setInternalIter(Iterator.empty) // forward input tuple to the user and pause DP thread handleExecutorException(e) } @@ -163,10 +161,11 @@ class DataProcessor( def continueDataProcessing(): Unit = { val dataProcessingStartTime = System.nanoTime() - if (outputManager.hasUnfinishedOutput) { + if (tupleProcessingManager.outputIterator.hasNext) { outputOneTuple() } else { - processInputTuple(inputManager.getNextTuple) + val (tuple, portId) = tupleProcessingManager.inputIterator.next() + processInputTuple(tuple, portId) } statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime) } @@ -188,8 +187,13 @@ class DataProcessor( ) } ) - inputManager.initBatch(channelId, tuples) - processInputTuple(inputManager.getNextTuple) + + tupleProcessingManager.inputIterator.setBatch( + inputGateway.getChannel(channelId).getPortId, + tuples + ) + val (tuple, portId) = tupleProcessingManager.inputIterator.next() + processInputTuple(tuple, portId) case EndOfUpstream() => val channel = this.inputGateway.getChannel(channelId) val portId = channel.getPortId @@ -197,13 +201,15 @@ class DataProcessor( this.inputManager.getPort(portId).channels(channelId) = true if (inputManager.isPortCompleted(portId)) { - inputManager.initBatch(channelId, Array.empty) - processInputExhausted() - outputManager.outputIterator.appendSpecialTupleToEnd(FinalizePort(portId, input = true)) + tupleProcessingManager.inputIterator.setBatch(portId, Array.empty) + processOnFinish(portId) + tupleProcessingManager.outputIterator.appendSpecialTupleToEnd( + FinalizePort(portId, input = true) + ) } if (inputManager.getAllPorts.forall(portId => inputManager.isPortCompleted(portId))) { // assuming all the output ports finalize after all input ports are finalized. - outputManager.finalizeOutput() + tupleProcessingManager.finalizeOutput(outputManager.getAllPortIds) } } statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala new file mode 100644 index 00000000000..1e847e141bc --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala @@ -0,0 +1,88 @@ +package edu.uci.ics.amber.engine.architecture.worker + +import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} +import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.{ + OutputTupleIterator, + InputTupleIterator +} +import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike +import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity +import edu.uci.ics.amber.engine.common.workflow.PortIdentity +import edu.uci.ics.texera.workflow.common.tuple.Tuple + +import scala.collection.{AbstractIterator, mutable} + +object TupleProcessingManager { + class InputTupleIterator extends AbstractIterator[(Tuple, PortIdentity)] { + private var inputBatch: Array[Tuple] = Array.empty + private var currentPortId: Option[PortIdentity] = None + + // Set the batch of tuples for processing. Resets the iterator to the start of the new batch. + def setBatch(portId: PortIdentity, batch: Array[Tuple]): Unit = { + currentPortId = Some(portId) + inputBatch = batch + currentIndex = 0 // Reset index to the start for the new batch + } + + private var currentIndex: Int = 0 + + // Retrieves the current tuple without advancing the index. + def getCurrentTuple: Option[Tuple] = { + if (inputBatch.nonEmpty && currentIndex > 0 && currentIndex <= inputBatch.length) { + Some(inputBatch(currentIndex - 1)) + } else { + None + } + } + + // Check if there are more tuples in the batch to process. + override def hasNext: Boolean = currentIndex < inputBatch.length + + // Advances to the next tuple in the batch and returns it. + override def next(): (Tuple, PortIdentity) = { + if (!hasNext) throw new NoSuchElementException("No more tuples in the batch") + val tuple = inputBatch(currentIndex) + currentIndex += 1 + (tuple, currentPortId.getOrElse(throw new IllegalStateException("Port ID is not set"))) + } + } + class OutputTupleIterator extends AbstractIterator[(TupleLike, Option[PortIdentity])] { + val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] + @transient private var internalIter: Iterator[(TupleLike, Option[PortIdentity])] = + Iterator.empty + + def setInternalIter(outputIter: Iterator[(TupleLike, Option[PortIdentity])]): Unit = { + this.internalIter = Option(outputIter).getOrElse(Iterator.empty) + } + + def getInternalIter: Iterator[(TupleLike, Option[PortIdentity])] = internalIter + + override def hasNext: Boolean = internalIter.hasNext || queue.nonEmpty + + override def next(): (TupleLike, Option[PortIdentity]) = { + if (internalIter.hasNext) { + internalIter.next() + } else { + queue.remove(0) + } + } + + def appendSpecialTupleToEnd(tuple: TupleLike): Unit = { + queue.append((tuple, None)) + } + } +} +class TupleProcessingManager(val actorId: ActorVirtualIdentity) { + + val inputIterator: InputTupleIterator = new InputTupleIterator() + + val outputIterator: OutputTupleIterator = new OutputTupleIterator() + + def finalizeOutput(portIds: Set[PortIdentity]): Unit = { + portIds + .foreach(outputPortId => + outputIterator.appendSpecialTupleToEnd(FinalizePort(outputPortId, input = false)) + ) + outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala index 6d9fb570148..086ddf4cf58 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala @@ -153,7 +153,7 @@ class WorkflowWorker( val (executor, iter) = dp.serializationManager.restoreExecutorState(chkpt) dp.executor = executor logger.info("re-initialize executor done.") - dp.outputManager.outputIterator.setTupleOutput(iter) + dp.tupleProcessingManager.outputIterator.setInternalIter(iter) logger.info("set tuple output done.") queuedMessages.foreach(msg => inputQueue.put(FIFOMessageElement(msg))) inflightMessages.foreach(msg => inputQueue.put(FIFOMessageElement(msg))) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala index 36b741d1703..364c48bb06c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala @@ -43,8 +43,8 @@ trait PrepareCheckpointHandler { // 2. serialize operator state dp.executor match { case support: CheckpointSupport => - dp.outputManager.outputIterator.setTupleOutput( - support.serializeState(dp.outputManager.outputIterator.outputIter, chkpt) + dp.tupleProcessingManager.outputIterator.setInternalIter( + support.serializeState(dp.tupleProcessingManager.outputIterator.getInternalIter, chkpt) ) logger.info("Serialized operator state") case _ => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala index 94699484c94..229a2344140 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala @@ -13,6 +13,6 @@ trait QueryCurrentInputTupleHandler { this: DataProcessorRPCHandlerInitializer => registerHandler { (msg: QueryCurrentInputTuple, sender) => - dp.inputManager.getCurrentTuple + dp.tupleProcessingManager.inputIterator.getCurrentTuple.orNull } }