From fed6cb8f020dd3496499fcccd628c55a4185ea4e Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 26 Mar 2024 09:57:28 -0700 Subject: [PATCH 1/5] refactor output iter --- .../messaginglayer/OutputManager.scala | 42 ++------------- .../engine/architecture/worker/DPThread.scala | 2 +- .../architecture/worker/DataProcessor.scala | 15 +++--- .../worker/TupleProcessingManager.scala | 51 +++++++++++++++++++ .../architecture/worker/WorkflowWorker.scala | 2 +- .../PrepareCheckpointHandler.scala | 4 +- 6 files changed, 68 insertions(+), 48 deletions(-) create mode 100644 core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 03788013efb..690057ddc36 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -1,16 +1,14 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, getBatchSize, toPartitioner } import edu.uci.ics.amber.engine.architecture.sendsemantics.partitioners._ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._ -import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} import edu.uci.ics.amber.engine.common.AmberLogging import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand -import edu.uci.ics.amber.engine.common.tuple.amber.{SchemaEnforceable, TupleLike} +import edu.uci.ics.amber.engine.common.tuple.amber.SchemaEnforceable import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.texera.workflow.common.tuple.schema.Schema @@ -49,32 +47,7 @@ object OutputManager { } } - class DPOutputIterator extends Iterator[(TupleLike, Option[PortIdentity])] { - val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] - @transient var outputIter: Iterator[(TupleLike, Option[PortIdentity])] = Iterator.empty - def setTupleOutput(outputIter: Iterator[(TupleLike, Option[PortIdentity])]): Unit = { - if (outputIter != null) { - this.outputIter = outputIter - } else { - this.outputIter = Iterator.empty - } - } - - override def hasNext: Boolean = outputIter.hasNext || queue.nonEmpty - - override def next(): (TupleLike, Option[PortIdentity]) = { - if (outputIter.hasNext) { - outputIter.next() - } else { - queue.remove(0) - } - } - - def appendSpecialTupleToEnd(tuple: TupleLike): Unit = { - queue.append((tuple, None)) - } - } } /** This class is a container of all the transfer partitioners. @@ -87,7 +60,7 @@ class OutputManager( outputGateway: NetworkOutputGateway ) extends AmberLogging { - val outputIterator: DPOutputIterator = new DPOutputIterator() + private val partitioners: mutable.Map[PhysicalLink, Partitioner] = mutable.HashMap[PhysicalLink, Partitioner]() @@ -183,14 +156,9 @@ class OutputManager( def getPort(portId: PortIdentity): WorkerPort = ports(portId) - def hasUnfinishedOutput: Boolean = outputIterator.hasNext - def finalizeOutput(): Unit = { - this.ports.keys - .foreach(outputPortId => - outputIterator.appendSpecialTupleToEnd(FinalizePort(outputPortId, input = false)) - ) - outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) - } + + def getAllPortIds: Set[PortIdentity] = this.ports.keySet.toSet + } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala index ca2f451dad1..d8c1a88165f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala @@ -140,7 +140,7 @@ class DPThread( var channelId: ChannelIdentity = null var msgOpt: Option[WorkflowFIFOMessage] = None if ( - dp.inputManager.hasUnfinishedInput || dp.outputManager.hasUnfinishedOutput || dp.pauseManager.isPaused + dp.inputManager.hasUnfinishedInput || dp.tupleProcessingManager.hasUnfinishedOutput || dp.pauseManager.isPaused ) { dp.inputGateway.tryPickControlChannel match { case Some(channel) => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index fe066a4ef5c..ace7091b0ac 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -63,6 +63,7 @@ class DataProcessor( val stateManager: WorkerStateManager = new WorkerStateManager() val inputManager: InputManager = new InputManager(actorId) val outputManager: OutputManager = new OutputManager(actorId, outputGateway) + val tupleProcessingManager: TupleProcessingManager = new TupleProcessingManager(actorId) val channelMarkerManager: ChannelMarkerManager = new ChannelMarkerManager(actorId, inputGateway) val serializationManager: SerializationManager = new SerializationManager(actorId) def getQueuedCredit(channelId: ChannelIdentity): Long = { @@ -82,7 +83,7 @@ class DataProcessor( */ private[this] def processInputTuple(tuple: Tuple): Unit = { try { - outputManager.outputIterator.setTupleOutput( + tupleProcessingManager.outputIterator.setTupleOutput( executor.processTupleMultiPort( tuple, this.inputGateway.getChannel(inputManager.currentChannelId).getPortId.id @@ -103,7 +104,7 @@ class DataProcessor( */ private[this] def processInputExhausted(): Unit = { try { - outputManager.outputIterator.setTupleOutput( + tupleProcessingManager.outputIterator.setTupleOutput( executor.onFinishMultiPort( this.inputGateway.getChannel(inputManager.currentChannelId).getPortId.id ) @@ -122,13 +123,13 @@ class DataProcessor( adaptiveBatchingMonitor.startAdaptiveBatching() var out: (TupleLike, Option[PortIdentity]) = null try { - out = outputManager.outputIterator.next() + out = tupleProcessingManager.outputIterator.next() } catch safely { case e => // invalidate current output tuple out = null // also invalidate outputIterator - outputManager.outputIterator.setTupleOutput(Iterator.empty) + tupleProcessingManager.outputIterator.setTupleOutput(Iterator.empty) // forward input tuple to the user and pause DP thread handleExecutorException(e) } @@ -163,7 +164,7 @@ class DataProcessor( def continueDataProcessing(): Unit = { val dataProcessingStartTime = System.nanoTime() - if (outputManager.hasUnfinishedOutput) { + if (tupleProcessingManager.hasUnfinishedOutput) { outputOneTuple() } else { processInputTuple(inputManager.getNextTuple) @@ -199,11 +200,11 @@ class DataProcessor( if (inputManager.isPortCompleted(portId)) { inputManager.initBatch(channelId, Array.empty) processInputExhausted() - outputManager.outputIterator.appendSpecialTupleToEnd(FinalizePort(portId, input = true)) + tupleProcessingManager.outputIterator.appendSpecialTupleToEnd(FinalizePort(portId, input = true)) } if (inputManager.getAllPorts.forall(portId => inputManager.isPortCompleted(portId))) { // assuming all the output ports finalize after all input ports are finalized. - outputManager.finalizeOutput() + tupleProcessingManager.finalizeOutput(outputManager.getAllPortIds) } } statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala new file mode 100644 index 00000000000..cac4e0c267d --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala @@ -0,0 +1,51 @@ +package edu.uci.ics.amber.engine.architecture.worker + +import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} +import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.DPOutputIterator +import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike +import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity +import edu.uci.ics.amber.engine.common.workflow.PortIdentity + +import scala.collection.mutable + +object TupleProcessingManager{ + class DPOutputIterator extends Iterator[(TupleLike, Option[PortIdentity])] { + val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] + @transient var outputIter: Iterator[(TupleLike, Option[PortIdentity])] = Iterator.empty + + def setTupleOutput(outputIter: Iterator[(TupleLike, Option[PortIdentity])]): Unit = { + if (outputIter != null) { + this.outputIter = outputIter + } else { + this.outputIter = Iterator.empty + } + } + + override def hasNext: Boolean = outputIter.hasNext || queue.nonEmpty + + override def next(): (TupleLike, Option[PortIdentity]) = { + if (outputIter.hasNext) { + outputIter.next() + } else { + queue.remove(0) + } + } + + def appendSpecialTupleToEnd(tuple: TupleLike): Unit = { + queue.append((tuple, None)) + } + } +} +class TupleProcessingManager( val actorId: ActorVirtualIdentity) { + val outputIterator: DPOutputIterator = new DPOutputIterator() + + def hasUnfinishedOutput: Boolean = outputIterator.hasNext + + def finalizeOutput(portIds: Set[PortIdentity]): Unit = { + portIds + .foreach(outputPortId => + outputIterator.appendSpecialTupleToEnd(FinalizePort(outputPortId, input = false)) + ) + outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala index 6d9fb570148..bd541ca42b1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala @@ -153,7 +153,7 @@ class WorkflowWorker( val (executor, iter) = dp.serializationManager.restoreExecutorState(chkpt) dp.executor = executor logger.info("re-initialize executor done.") - dp.outputManager.outputIterator.setTupleOutput(iter) + dp.tupleProcessingManager.outputIterator.setTupleOutput(iter) logger.info("set tuple output done.") queuedMessages.foreach(msg => inputQueue.put(FIFOMessageElement(msg))) inflightMessages.foreach(msg => inputQueue.put(FIFOMessageElement(msg))) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala index 36b741d1703..1f5abac6706 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala @@ -43,8 +43,8 @@ trait PrepareCheckpointHandler { // 2. serialize operator state dp.executor match { case support: CheckpointSupport => - dp.outputManager.outputIterator.setTupleOutput( - support.serializeState(dp.outputManager.outputIterator.outputIter, chkpt) + dp.tupleProcessingManager.outputIterator.setTupleOutput( + support.serializeState(dp.tupleProcessingManager.outputIterator.outputIter, chkpt) ) logger.info("Serialized operator state") case _ => From b832a2ae0d50b2a891c98f1879cf18d9c16e3622 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 26 Mar 2024 09:59:51 -0700 Subject: [PATCH 2/5] refactor input --- .../messaginglayer/InputManager.scala | 28 ++----------------- .../engine/architecture/worker/DPThread.scala | 4 +-- .../architecture/worker/DataProcessor.scala | 12 ++++---- .../worker/TupleProcessingManager.scala | 28 ++++++++++++++++++- .../QueryCurrentInputTupleHandler.scala | 2 +- 5 files changed, 39 insertions(+), 35 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala index 2bb65a1954c..893a1682720 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala @@ -1,17 +1,15 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.engine.common.AmberLogging -import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} +import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.engine.common.workflow.PortIdentity -import edu.uci.ics.texera.workflow.common.tuple.Tuple import edu.uci.ics.texera.workflow.common.tuple.schema.Schema import scala.collection.mutable class InputManager(val actorId: ActorVirtualIdentity) extends AmberLogging { - private var inputBatch: Array[Tuple] = _ - private var currentInputIdx: Int = -1 - var currentChannelId: ChannelIdentity = _ + + private val ports: mutable.HashMap[PortIdentity, WorkerPort] = mutable.HashMap() def getAllPorts: Set[PortIdentity] = { @@ -36,25 +34,5 @@ class InputManager(val actorId: ActorVirtualIdentity) extends AmberLogging { this.ports(portId).channels.values.forall(completed => completed) } - def hasUnfinishedInput: Boolean = inputBatch != null && currentInputIdx + 1 < inputBatch.length - def getNextTuple: Tuple = { - currentInputIdx += 1 - inputBatch(currentInputIdx) - } - def getCurrentTuple: Tuple = { - if (inputBatch == null) { - null - } else if (inputBatch.isEmpty) { - null // TODO: create input exhausted - } else { - inputBatch(currentInputIdx) - } - } - - def initBatch(channelId: ChannelIdentity, batch: Array[Tuple]): Unit = { - currentChannelId = channelId - inputBatch = batch - currentInputIdx = -1 - } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala index d8c1a88165f..4b485b93d05 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala @@ -140,7 +140,7 @@ class DPThread( var channelId: ChannelIdentity = null var msgOpt: Option[WorkflowFIFOMessage] = None if ( - dp.inputManager.hasUnfinishedInput || dp.tupleProcessingManager.hasUnfinishedOutput || dp.pauseManager.isPaused + dp.tupleProcessingManager.hasUnfinishedInput || dp.tupleProcessingManager.hasUnfinishedOutput || dp.pauseManager.isPaused ) { dp.inputGateway.tryPickControlChannel match { case Some(channel) => @@ -149,7 +149,7 @@ class DPThread( case None => // continue processing if (!dp.pauseManager.isPaused && !backpressureStatus) { - channelId = dp.inputManager.currentChannelId + channelId = dp.tupleProcessingManager.currentChannelId } else { waitingForInput = true } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index ace7091b0ac..780907cadc5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -86,7 +86,7 @@ class DataProcessor( tupleProcessingManager.outputIterator.setTupleOutput( executor.processTupleMultiPort( tuple, - this.inputGateway.getChannel(inputManager.currentChannelId).getPortId.id + this.inputGateway.getChannel(tupleProcessingManager.currentChannelId).getPortId.id ) ) statisticsManager.increaseInputTupleCount() @@ -106,7 +106,7 @@ class DataProcessor( try { tupleProcessingManager.outputIterator.setTupleOutput( executor.onFinishMultiPort( - this.inputGateway.getChannel(inputManager.currentChannelId).getPortId.id + this.inputGateway.getChannel(tupleProcessingManager.currentChannelId).getPortId.id ) ) } catch safely { @@ -167,7 +167,7 @@ class DataProcessor( if (tupleProcessingManager.hasUnfinishedOutput) { outputOneTuple() } else { - processInputTuple(inputManager.getNextTuple) + processInputTuple(tupleProcessingManager.getNextTuple) } statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime) } @@ -189,8 +189,8 @@ class DataProcessor( ) } ) - inputManager.initBatch(channelId, tuples) - processInputTuple(inputManager.getNextTuple) + tupleProcessingManager.initBatch(channelId, tuples) + processInputTuple(tupleProcessingManager.getNextTuple) case EndOfUpstream() => val channel = this.inputGateway.getChannel(channelId) val portId = channel.getPortId @@ -198,7 +198,7 @@ class DataProcessor( this.inputManager.getPort(portId).channels(channelId) = true if (inputManager.isPortCompleted(portId)) { - inputManager.initBatch(channelId, Array.empty) + tupleProcessingManager.initBatch(channelId, Array.empty) processInputExhausted() tupleProcessingManager.outputIterator.appendSpecialTupleToEnd(FinalizePort(portId, input = true)) } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala index cac4e0c267d..e57d6ffbefe 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala @@ -3,8 +3,9 @@ package edu.uci.ics.amber.engine.architecture.worker import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.DPOutputIterator import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike -import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity +import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import edu.uci.ics.amber.engine.common.workflow.PortIdentity +import edu.uci.ics.texera.workflow.common.tuple.Tuple import scala.collection.mutable @@ -37,7 +38,32 @@ object TupleProcessingManager{ } } class TupleProcessingManager( val actorId: ActorVirtualIdentity) { + private var inputBatch: Array[Tuple] = _ + private var currentInputIdx: Int = -1 + var currentChannelId: ChannelIdentity = _ val outputIterator: DPOutputIterator = new DPOutputIterator() + def hasUnfinishedInput: Boolean = inputBatch != null && currentInputIdx + 1 < inputBatch.length + + def getNextTuple: Tuple = { + currentInputIdx += 1 + inputBatch(currentInputIdx) + } + def getCurrentTuple: Tuple = { + if (inputBatch == null) { + null + } else if (inputBatch.isEmpty) { + null // TODO: create input exhausted + } else { + inputBatch(currentInputIdx) + } + } + + def initBatch(channelId: ChannelIdentity, batch: Array[Tuple]): Unit = { + currentChannelId = channelId + inputBatch = batch + currentInputIdx = -1 + } + def hasUnfinishedOutput: Boolean = outputIterator.hasNext diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala index 94699484c94..d0197176e9e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala @@ -13,6 +13,6 @@ trait QueryCurrentInputTupleHandler { this: DataProcessorRPCHandlerInitializer => registerHandler { (msg: QueryCurrentInputTuple, sender) => - dp.inputManager.getCurrentTuple + dp.tupleProcessingManager.getCurrentTuple } } From b50bd1de14441169103c377cd894f0a5b01669f9 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:13:26 -0700 Subject: [PATCH 3/5] move currentChannelId to InputGateway --- .../messaginglayer/InputGateway.scala | 2 + .../messaginglayer/InputManager.scala | 3 - .../messaginglayer/NetworkInputGateway.scala | 11 +++- .../messaginglayer/OutputManager.scala | 5 -- .../engine/architecture/worker/DPThread.scala | 22 +++---- .../architecture/worker/DataProcessor.scala | 30 +++++----- .../worker/TupleProcessingManager.scala | 60 ++++++++++++------- .../QueryCurrentInputTupleHandler.scala | 2 +- 8 files changed, 79 insertions(+), 56 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala index da7ece8b16b..2809fc449a3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputGateway.scala @@ -8,6 +8,8 @@ trait InputGateway { def tryPickChannel: Option[AmberFIFOChannel] + def getCurrentChannelId: Option[ChannelIdentity] + def getAllChannels: Iterable[AmberFIFOChannel] def getAllDataChannels: Iterable[AmberFIFOChannel] diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala index 893a1682720..074fa8d56a5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/InputManager.scala @@ -9,8 +9,6 @@ import scala.collection.mutable class InputManager(val actorId: ActorVirtualIdentity) extends AmberLogging { - - private val ports: mutable.HashMap[PortIdentity, WorkerPort] = mutable.HashMap() def getAllPorts: Set[PortIdentity] = { this.ports.keys.toSet @@ -34,5 +32,4 @@ class InputManager(val actorId: ActorVirtualIdentity) extends AmberLogging { this.ports(portId).channels.values.forall(completed => completed) } - } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index fbf239a9400..28394295ec7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -14,6 +14,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) private val inputChannels = new mutable.HashMap[ChannelIdentity, AmberFIFOChannel]() + private var currentChannelId: Option[ChannelIdentity] = None + @transient lazy private val enforcers = mutable.ListBuffer[OrderEnforcer]() def tryPickControlChannel: Option[AmberFIFOChannel] = { @@ -33,6 +35,7 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) def tryPickChannel: Option[AmberFIFOChannel] = { val control = tryPickControlChannel val ret = if (control.isDefined) { + this.currentChannelId = control.map(_.channelId) control } else { inputChannels @@ -41,12 +44,18 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) !cid.isControl && channel.isEnabled && channel.hasMessage && enforcers .forall(enforcer => enforcer.isCompleted || enforcer.canProceed(cid)) }) - .map(_._2) + .map { + case (channelId, channel) => + this.currentChannelId = Some(channelId) + channel + } } enforcers.filter(enforcer => enforcer.isCompleted).foreach(enforcer => enforcers -= enforcer) ret } + override def getCurrentChannelId: Option[ChannelIdentity] = this.currentChannelId + def getAllDataChannels: Iterable[AmberFIFOChannel] = inputChannels.filter(!_._1.isControl).values diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index 690057ddc36..d9ae148d477 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -47,7 +47,6 @@ object OutputManager { } } - } /** This class is a container of all the transfer partitioners. @@ -60,7 +59,6 @@ class OutputManager( outputGateway: NetworkOutputGateway ) extends AmberLogging { - private val partitioners: mutable.Map[PhysicalLink, Partitioner] = mutable.HashMap[PhysicalLink, Partitioner]() @@ -156,9 +154,6 @@ class OutputManager( def getPort(portId: PortIdentity): WorkerPort = ports(portId) - - def getAllPortIds: Set[PortIdentity] = this.ports.keySet.toSet - } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala index 4b485b93d05..b299bdf6df9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala @@ -137,19 +137,19 @@ class DPThread( // // Main loop step 2: do input selection // - var channelId: ChannelIdentity = null - var msgOpt: Option[WorkflowFIFOMessage] = None + var channelId: Option[ChannelIdentity] = None + var msg: Option[WorkflowFIFOMessage] = None if ( dp.tupleProcessingManager.hasUnfinishedInput || dp.tupleProcessingManager.hasUnfinishedOutput || dp.pauseManager.isPaused ) { dp.inputGateway.tryPickControlChannel match { case Some(channel) => - channelId = channel.channelId - msgOpt = Some(channel.take) + channelId = dp.inputGateway.getCurrentChannelId + msg = Some(channel.take) case None => // continue processing if (!dp.pauseManager.isPaused && !backpressureStatus) { - channelId = dp.tupleProcessingManager.currentChannelId + channelId = dp.inputGateway.getCurrentChannelId } else { waitingForInput = true } @@ -162,8 +162,8 @@ class DPThread( dp.inputGateway.tryPickChannel } match { case Some(channel) => - channelId = channel.channelId - msgOpt = Some(channel.take) + channelId = dp.inputGateway.getCurrentChannelId + msg = Some(channel.take) case None => waitingForInput = true } } @@ -171,11 +171,11 @@ class DPThread( // // Main loop step 3: process selected message payload // - if (channelId != null) { + if (channelId.isDefined) { // for logging, skip large data frames. - val msgToLog = msgOpt.filter(_.payload.isInstanceOf[ControlPayload]) - logManager.withFaultTolerant(channelId, msgToLog) { - msgOpt match { + val msgToLog = msg.filter(_.payload.isInstanceOf[ControlPayload]) + logManager.withFaultTolerant(channelId.get, msgToLog) { + msg match { case None => dp.continueDataProcessing() case Some(msg) => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index 780907cadc5..20f0d6f0181 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -81,12 +81,12 @@ class DataProcessor( * process currentInputTuple through executor logic. * this function is only called by the DP thread. */ - private[this] def processInputTuple(tuple: Tuple): Unit = { + private[this] def processInputTuple(tuple: Tuple, portId: PortIdentity): Unit = { try { tupleProcessingManager.outputIterator.setTupleOutput( executor.processTupleMultiPort( tuple, - this.inputGateway.getChannel(tupleProcessingManager.currentChannelId).getPortId.id + portId.id ) ) statisticsManager.increaseInputTupleCount() @@ -102,13 +102,10 @@ class DataProcessor( * process end of an input port with Executor.onFinish(). * this function is only called by the DP thread. */ - private[this] def processInputExhausted(): Unit = { + private[this] def processOnFinish(portId: PortIdentity): Unit = { try { - tupleProcessingManager.outputIterator.setTupleOutput( - executor.onFinishMultiPort( - this.inputGateway.getChannel(tupleProcessingManager.currentChannelId).getPortId.id - ) - ) + val output = executor.onFinishMultiPort(portId.id) + tupleProcessingManager.outputIterator.setTupleOutput(output) } catch safely { case e => // forward input tuple to the user and pause DP thread @@ -167,7 +164,8 @@ class DataProcessor( if (tupleProcessingManager.hasUnfinishedOutput) { outputOneTuple() } else { - processInputTuple(tupleProcessingManager.getNextTuple) + val (tuple, portId) = tupleProcessingManager.next + processInputTuple(tuple, portId) } statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime) } @@ -189,8 +187,10 @@ class DataProcessor( ) } ) - tupleProcessingManager.initBatch(channelId, tuples) - processInputTuple(tupleProcessingManager.getNextTuple) + + tupleProcessingManager.setBatch(inputGateway.getChannel(channelId).getPortId, tuples) + val (tuple, portId) = tupleProcessingManager.next + processInputTuple(tuple, portId) case EndOfUpstream() => val channel = this.inputGateway.getChannel(channelId) val portId = channel.getPortId @@ -198,9 +198,11 @@ class DataProcessor( this.inputManager.getPort(portId).channels(channelId) = true if (inputManager.isPortCompleted(portId)) { - tupleProcessingManager.initBatch(channelId, Array.empty) - processInputExhausted() - tupleProcessingManager.outputIterator.appendSpecialTupleToEnd(FinalizePort(portId, input = true)) + tupleProcessingManager.setBatch(portId, Array.empty) + processOnFinish(portId) + tupleProcessingManager.outputIterator.appendSpecialTupleToEnd( + FinalizePort(portId, input = true) + ) } if (inputManager.getAllPorts.forall(portId => inputManager.isPortCompleted(portId))) { // assuming all the output ports finalize after all input ports are finalized. diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala index e57d6ffbefe..a89ec013901 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala @@ -3,13 +3,13 @@ package edu.uci.ics.amber.engine.architecture.worker import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.DPOutputIterator import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike -import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} +import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.texera.workflow.common.tuple.Tuple import scala.collection.mutable -object TupleProcessingManager{ +object TupleProcessingManager { class DPOutputIterator extends Iterator[(TupleLike, Option[PortIdentity])] { val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] @transient var outputIter: Iterator[(TupleLike, Option[PortIdentity])] = Iterator.empty @@ -37,34 +37,52 @@ object TupleProcessingManager{ } } } -class TupleProcessingManager( val actorId: ActorVirtualIdentity) { - private var inputBatch: Array[Tuple] = _ +class TupleProcessingManager(val actorId: ActorVirtualIdentity) { + // Holds the current batch of tuples for processing. + private var inputBatch: Option[Array[Tuple]] = None + + // Tracks the index of the currently processed tuple in the input batch. private var currentInputIdx: Int = -1 - var currentChannelId: ChannelIdentity = _ - val outputIterator: DPOutputIterator = new DPOutputIterator() - def hasUnfinishedInput: Boolean = inputBatch != null && currentInputIdx + 1 < inputBatch.length - def getNextTuple: Tuple = { - currentInputIdx += 1 - inputBatch(currentInputIdx) - } - def getCurrentTuple: Tuple = { - if (inputBatch == null) { - null - } else if (inputBatch.isEmpty) { - null // TODO: create input exhausted + // Identifier for the current port being processed. + private var currentPortId: Option[PortIdentity] = None + + // Checks if there are more tuples in the batch to process. + def hasUnfinishedInput: Boolean = + inputBatch match { + case Some(batch) => currentInputIdx < batch.length - 1 + case None => false + } + + // Advances to the next tuple in the batch and returns it. + def next: (Tuple, PortIdentity) = { + if (hasUnfinishedInput) { + currentInputIdx += 1 + (inputBatch.get(currentInputIdx), currentPortId.get) } else { - inputBatch(currentInputIdx) + throw new NoSuchElementException() } } - def initBatch(channelId: ChannelIdentity, batch: Array[Tuple]): Unit = { - currentChannelId = channelId - inputBatch = batch - currentInputIdx = -1 + // Retrieves the current tuple without advancing the index. + def getCurrentTuple: Option[Tuple] = { + inputBatch match { + case Some(batch) + if batch.nonEmpty && currentInputIdx >= 0 && currentInputIdx < batch.length => + Some(batch(currentInputIdx)) + case _ => + None // Input batch is not initialized, empty, or index is out of bounds. + } } + // Set the batch of tuples for processing and resets the index. + def setBatch(portId: PortIdentity, batch: Array[Tuple]): Unit = { + currentPortId = Some(portId) + inputBatch = Some(batch) + currentInputIdx = -1 + } + val outputIterator: DPOutputIterator = new DPOutputIterator() def hasUnfinishedOutput: Boolean = outputIterator.hasNext def finalizeOutput(portIds: Set[PortIdentity]): Unit = { diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala index d0197176e9e..68e4fe904d9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala @@ -13,6 +13,6 @@ trait QueryCurrentInputTupleHandler { this: DataProcessorRPCHandlerInitializer => registerHandler { (msg: QueryCurrentInputTuple, sender) => - dp.tupleProcessingManager.getCurrentTuple + dp.tupleProcessingManager.getCurrentTuple.orNull } } From 8af893b9c492c57a70aa55333e8755827b3df6d0 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:43:23 -0700 Subject: [PATCH 4/5] refactor --- .../engine/architecture/worker/DPThread.scala | 2 +- .../architecture/worker/DataProcessor.scala | 16 +-- .../worker/TupleProcessingManager.scala | 102 ++++++++---------- .../architecture/worker/WorkflowWorker.scala | 2 +- .../PrepareCheckpointHandler.scala | 4 +- .../QueryCurrentInputTupleHandler.scala | 2 +- 6 files changed, 59 insertions(+), 69 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala index b299bdf6df9..d8f124ba5e9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DPThread.scala @@ -140,7 +140,7 @@ class DPThread( var channelId: Option[ChannelIdentity] = None var msg: Option[WorkflowFIFOMessage] = None if ( - dp.tupleProcessingManager.hasUnfinishedInput || dp.tupleProcessingManager.hasUnfinishedOutput || dp.pauseManager.isPaused + dp.tupleProcessingManager.inputIterator.hasNext || dp.tupleProcessingManager.outputIterator.hasNext || dp.pauseManager.isPaused ) { dp.inputGateway.tryPickControlChannel match { case Some(channel) => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index 20f0d6f0181..798a6560641 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -83,7 +83,7 @@ class DataProcessor( */ private[this] def processInputTuple(tuple: Tuple, portId: PortIdentity): Unit = { try { - tupleProcessingManager.outputIterator.setTupleOutput( + tupleProcessingManager.outputIterator.setInternalIter( executor.processTupleMultiPort( tuple, portId.id @@ -105,7 +105,7 @@ class DataProcessor( private[this] def processOnFinish(portId: PortIdentity): Unit = { try { val output = executor.onFinishMultiPort(portId.id) - tupleProcessingManager.outputIterator.setTupleOutput(output) + tupleProcessingManager.outputIterator.setInternalIter(output) } catch safely { case e => // forward input tuple to the user and pause DP thread @@ -126,7 +126,7 @@ class DataProcessor( // invalidate current output tuple out = null // also invalidate outputIterator - tupleProcessingManager.outputIterator.setTupleOutput(Iterator.empty) + tupleProcessingManager.outputIterator.setInternalIter(Iterator.empty) // forward input tuple to the user and pause DP thread handleExecutorException(e) } @@ -161,10 +161,10 @@ class DataProcessor( def continueDataProcessing(): Unit = { val dataProcessingStartTime = System.nanoTime() - if (tupleProcessingManager.hasUnfinishedOutput) { + if (tupleProcessingManager.outputIterator.hasNext) { outputOneTuple() } else { - val (tuple, portId) = tupleProcessingManager.next + val (tuple, portId) = tupleProcessingManager.inputIterator.next() processInputTuple(tuple, portId) } statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime) @@ -188,8 +188,8 @@ class DataProcessor( } ) - tupleProcessingManager.setBatch(inputGateway.getChannel(channelId).getPortId, tuples) - val (tuple, portId) = tupleProcessingManager.next + tupleProcessingManager.inputIterator.setBatch(inputGateway.getChannel(channelId).getPortId, tuples) + val (tuple, portId) = tupleProcessingManager.inputIterator.next() processInputTuple(tuple, portId) case EndOfUpstream() => val channel = this.inputGateway.getChannel(channelId) @@ -198,7 +198,7 @@ class DataProcessor( this.inputManager.getPort(portId).channels(channelId) = true if (inputManager.isPortCompleted(portId)) { - tupleProcessingManager.setBatch(portId, Array.empty) + tupleProcessingManager.inputIterator.setBatch(portId, Array.empty) processOnFinish(portId) tupleProcessingManager.outputIterator.appendSpecialTupleToEnd( FinalizePort(portId, input = true) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala index a89ec013901..f5c28055444 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala @@ -1,89 +1,79 @@ package edu.uci.ics.amber.engine.architecture.worker import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} -import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.DPOutputIterator +import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.{OutputTupleIterator, InputTupleIterator} import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.texera.workflow.common.tuple.Tuple -import scala.collection.mutable +import scala.collection.{AbstractIterator, mutable} object TupleProcessingManager { - class DPOutputIterator extends Iterator[(TupleLike, Option[PortIdentity])] { - val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] - @transient var outputIter: Iterator[(TupleLike, Option[PortIdentity])] = Iterator.empty + class InputTupleIterator extends AbstractIterator[(Tuple, PortIdentity)] { + private var inputBatch: Array[Tuple] = Array.empty + private var currentPortId: Option[PortIdentity] = None - def setTupleOutput(outputIter: Iterator[(TupleLike, Option[PortIdentity])]): Unit = { - if (outputIter != null) { - this.outputIter = outputIter - } else { - this.outputIter = Iterator.empty - } + // Set the batch of tuples for processing. Resets the iterator to the start of the new batch. + def setBatch(portId: PortIdentity, batch: Array[Tuple]): Unit = { + currentPortId = Some(portId) + inputBatch = batch + currentIndex = 0 // Reset index to the start for the new batch } - override def hasNext: Boolean = outputIter.hasNext || queue.nonEmpty + private var currentIndex: Int = 0 - override def next(): (TupleLike, Option[PortIdentity]) = { - if (outputIter.hasNext) { - outputIter.next() + // Retrieves the current tuple without advancing the index. + def getCurrentTuple: Option[Tuple] = { + if (inputBatch.nonEmpty && currentIndex > 0 && currentIndex <= inputBatch.length) { + Some(inputBatch(currentIndex - 1)) } else { - queue.remove(0) + None } } - def appendSpecialTupleToEnd(tuple: TupleLike): Unit = { - queue.append((tuple, None)) + // Check if there are more tuples in the batch to process. + override def hasNext: Boolean = currentIndex < inputBatch.length + + // Advances to the next tuple in the batch and returns it. + override def next(): (Tuple, PortIdentity) = { + if (!hasNext) throw new NoSuchElementException("No more tuples in the batch") + val tuple = inputBatch(currentIndex) + currentIndex += 1 + (tuple, currentPortId.getOrElse(throw new IllegalStateException("Port ID is not set"))) } } -} -class TupleProcessingManager(val actorId: ActorVirtualIdentity) { - // Holds the current batch of tuples for processing. - private var inputBatch: Option[Array[Tuple]] = None + class OutputTupleIterator extends AbstractIterator[(TupleLike, Option[PortIdentity])] { + val queue = new mutable.ListBuffer[(TupleLike, Option[PortIdentity])] + @transient private var internalIter: Iterator[(TupleLike, Option[PortIdentity])] = + Iterator.empty - // Tracks the index of the currently processed tuple in the input batch. - private var currentInputIdx: Int = -1 + def setInternalIter(outputIter: Iterator[(TupleLike, Option[PortIdentity])]): Unit = { + this.internalIter = Option(outputIter).getOrElse(Iterator.empty) + } - // Identifier for the current port being processed. - private var currentPortId: Option[PortIdentity] = None + def getInternalIter: Iterator[(TupleLike, Option[PortIdentity])] = internalIter - // Checks if there are more tuples in the batch to process. - def hasUnfinishedInput: Boolean = - inputBatch match { - case Some(batch) => currentInputIdx < batch.length - 1 - case None => false - } + override def hasNext: Boolean = internalIter.hasNext || queue.nonEmpty - // Advances to the next tuple in the batch and returns it. - def next: (Tuple, PortIdentity) = { - if (hasUnfinishedInput) { - currentInputIdx += 1 - (inputBatch.get(currentInputIdx), currentPortId.get) - } else { - throw new NoSuchElementException() + override def next(): (TupleLike, Option[PortIdentity]) = { + if (internalIter.hasNext) { + internalIter.next() + } else { + queue.remove(0) + } } - } - // Retrieves the current tuple without advancing the index. - def getCurrentTuple: Option[Tuple] = { - inputBatch match { - case Some(batch) - if batch.nonEmpty && currentInputIdx >= 0 && currentInputIdx < batch.length => - Some(batch(currentInputIdx)) - case _ => - None // Input batch is not initialized, empty, or index is out of bounds. + def appendSpecialTupleToEnd(tuple: TupleLike): Unit = { + queue.append((tuple, None)) } } +} +class TupleProcessingManager(val actorId: ActorVirtualIdentity) { - // Set the batch of tuples for processing and resets the index. - def setBatch(portId: PortIdentity, batch: Array[Tuple]): Unit = { - currentPortId = Some(portId) - inputBatch = Some(batch) - currentInputIdx = -1 - } + val inputIterator: InputTupleIterator = new InputTupleIterator() - val outputIterator: DPOutputIterator = new DPOutputIterator() - def hasUnfinishedOutput: Boolean = outputIterator.hasNext + val outputIterator: OutputTupleIterator = new OutputTupleIterator() def finalizeOutput(portIds: Set[PortIdentity]): Unit = { portIds diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala index bd541ca42b1..086ddf4cf58 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/WorkflowWorker.scala @@ -153,7 +153,7 @@ class WorkflowWorker( val (executor, iter) = dp.serializationManager.restoreExecutorState(chkpt) dp.executor = executor logger.info("re-initialize executor done.") - dp.tupleProcessingManager.outputIterator.setTupleOutput(iter) + dp.tupleProcessingManager.outputIterator.setInternalIter(iter) logger.info("set tuple output done.") queuedMessages.foreach(msg => inputQueue.put(FIFOMessageElement(msg))) inflightMessages.foreach(msg => inputQueue.put(FIFOMessageElement(msg))) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala index 1f5abac6706..364c48bb06c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/PrepareCheckpointHandler.scala @@ -43,8 +43,8 @@ trait PrepareCheckpointHandler { // 2. serialize operator state dp.executor match { case support: CheckpointSupport => - dp.tupleProcessingManager.outputIterator.setTupleOutput( - support.serializeState(dp.tupleProcessingManager.outputIterator.outputIter, chkpt) + dp.tupleProcessingManager.outputIterator.setInternalIter( + support.serializeState(dp.tupleProcessingManager.outputIterator.getInternalIter, chkpt) ) logger.info("Serialized operator state") case _ => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala index 68e4fe904d9..229a2344140 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/QueryCurrentInputTupleHandler.scala @@ -13,6 +13,6 @@ trait QueryCurrentInputTupleHandler { this: DataProcessorRPCHandlerInitializer => registerHandler { (msg: QueryCurrentInputTuple, sender) => - dp.tupleProcessingManager.getCurrentTuple.orNull + dp.tupleProcessingManager.inputIterator.getCurrentTuple.orNull } } From 98a26807905c66da5396728393342e7b6424a9ed Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:43:43 -0700 Subject: [PATCH 5/5] fix format --- .../ics/amber/engine/architecture/worker/DataProcessor.scala | 5 ++++- .../engine/architecture/worker/TupleProcessingManager.scala | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index 798a6560641..51cbc4be2c6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -188,7 +188,10 @@ class DataProcessor( } ) - tupleProcessingManager.inputIterator.setBatch(inputGateway.getChannel(channelId).getPortId, tuples) + tupleProcessingManager.inputIterator.setBatch( + inputGateway.getChannel(channelId).getPortId, + tuples + ) val (tuple, portId) = tupleProcessingManager.inputIterator.next() processInputTuple(tuple, portId) case EndOfUpstream() => diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala index f5c28055444..1e847e141bc 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/TupleProcessingManager.scala @@ -1,7 +1,10 @@ package edu.uci.ics.amber.engine.architecture.worker import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{FinalizeExecutor, FinalizePort} -import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.{OutputTupleIterator, InputTupleIterator} +import edu.uci.ics.amber.engine.architecture.worker.TupleProcessingManager.{ + OutputTupleIterator, + InputTupleIterator +} import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.engine.common.workflow.PortIdentity