Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For Loop #2345

Draft
wants to merge 35 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
988c5f2
init
aglinxinyuan Feb 2, 2024
e4d4c1a
update
aglinxinyuan Feb 3, 2024
f833d98
Merge branch 'master' into xinyuan-for-loop
aglinxinyuan Feb 5, 2024
247586f
update
aglinxinyuan Feb 5, 2024
54bef8c
update
aglinxinyuan Feb 5, 2024
6fdcf6b
Merge branch 'master' into xinyuan-for-loop
aglinxinyuan Feb 13, 2024
c561279
update
aglinxinyuan Feb 13, 2024
7fb131b
update
aglinxinyuan Feb 13, 2024
b45bdf9
update
aglinxinyuan Feb 13, 2024
e4c76ee
update
aglinxinyuan Feb 13, 2024
a40e619
update
aglinxinyuan Feb 13, 2024
a0af488
fix format
aglinxinyuan Feb 14, 2024
b137edb
fix format
aglinxinyuan Feb 14, 2024
869f049
update
aglinxinyuan Feb 14, 2024
d88b2da
Merge branch 'master' into xinyuan-for-loop
aglinxinyuan Feb 16, 2024
4bb5e1e
fix conflict
aglinxinyuan Feb 16, 2024
46147ea
fix conflict
aglinxinyuan Feb 16, 2024
47a859d
fix conflict
aglinxinyuan Feb 16, 2024
03ca2f6
fix conflict
aglinxinyuan Feb 16, 2024
7ebbd40
Merge branch 'master' into xinyuan-for-loop
aglinxinyuan Feb 20, 2024
e0c392b
fix conflict
aglinxinyuan Feb 21, 2024
34e8c51
Merge branch 'master' into xinyuan-for-loop
aglinxinyuan Feb 21, 2024
d7936fd
update
aglinxinyuan Feb 22, 2024
127e052
Merge branch 'master' into xinyuan-for-loop
aglinxinyuan Feb 23, 2024
c5145c6
update
aglinxinyuan Feb 23, 2024
8ac537b
update
aglinxinyuan Feb 28, 2024
de07aeb
update
aglinxinyuan Feb 28, 2024
a692923
fix it
aglinxinyuan Feb 29, 2024
478f6b4
update
aglinxinyuan Feb 29, 2024
498f46f
update
aglinxinyuan Feb 29, 2024
49e27bc
Remove Iteration at loop end
aglinxinyuan Mar 5, 2024
656be0e
fix
aglinxinyuan Mar 12, 2024
f36bcb4
fix
aglinxinyuan Mar 12, 2024
82abb24
if statement
aglinxinyuan Jun 16, 2024
fc70e5f
if statement
aglinxinyuan Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ message WorkerStatistics {
int64 data_processing_time = 4;
int64 control_processing_time = 5;
int64 idle_time = 6;
int64 loop_i = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ message OperatorRuntimeStats{
int64 data_processing_time = 5;
int64 control_processing_time = 6;
int64 idle_time = 7;
int64 loop_i = 8;
}

message ExecutionStatsStore {
Expand Down
20 changes: 14 additions & 6 deletions core/amber/src/main/python/core/models/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def output_schema(self) -> Schema:
@output_schema.setter
@overrides.final
def output_schema(
self, raw_output_schema: Union[Schema, Mapping[str, str]]
self, raw_output_schema: Union[Schema, Mapping[str, str]]
) -> None:
self.__internal_output_schema = (
raw_output_schema
Expand Down Expand Up @@ -147,8 +147,8 @@ def _validate_batch_size(value):
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
self.__batch_data[port].append(tuple_)
if (
self.BATCH_SIZE is not None
and len(self.__batch_data[port]) >= self.BATCH_SIZE
self.BATCH_SIZE is not None
and len(self.__batch_data[port]) >= self.BATCH_SIZE
):
yield from self._process_batch(port)

Expand Down Expand Up @@ -202,15 +202,23 @@ def __init__(self):
super().__init__()
self.__internal_is_source: bool = False
self.__table_data: Mapping[int, List[Tuple]] = defaultdict(list)
self.__it_table_data: Mapping[int, Mapping[int, List[Tuple]]] = defaultdict(lambda: defaultdict(list))
self.__internal_is_it: bool = False

@overrides.final
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
if "Iteration" in tuple_:
self.__internal_is_it = True
self.__it_table_data[port][tuple_["Iteration"]].append(tuple_)
self.__table_data[port].append(tuple_)
yield

def on_finish(self, port: int) -> Iterator[Optional[TableLike]]:
table = Table(self.__table_data[port])
yield from self.process_table(table, port)
if self.__internal_is_it:
for table in self.__it_table_data[port].values():
yield from self.process_table(Table(table), port)
else:
yield from self.process_table(Table(self.__table_data[port]), port)

@abstractmethod
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
Expand All @@ -235,7 +243,7 @@ class TupleOperator(Operator):

@abstractmethod
def process_tuple(
self, tuple_: Union[Tuple, InputExhausted], input_: int
self, tuple_: Union[Tuple, InputExhausted], input_: int
) -> Iterator[Optional[TupleLike]]:
"""
Process an input Tuple from the given link.
Expand Down
1 change: 1 addition & 0 deletions core/amber/src/main/python/pytexera/udf/udf_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import abstractmethod
from typing import Iterator, Optional, Union
from deprecated import deprecated

from pyamber import *


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ControllerAsyncRPCHandlerInitializer(
with AmberLogging
with LinkWorkersHandler
with WorkerExecutionCompletedHandler
with ForLoopHandler
with WorkerStateUpdatedHandler
with PauseHandler
with QueryWorkerStatisticsHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package edu.uci.ics.amber.engine.architecture.controller.promisehandlers

import edu.uci.ics.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.ForLoopHandler.IterationCompleted
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.ResumeLoopHandler.ResumeLoop
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity

object ForLoopHandler {
final case class IterationCompleted(startWorkerId: ActorVirtualIdentity, endWorkerId: ActorVirtualIdentity) extends ControlCommand[Unit]
}

trait ForLoopHandler {
this: ControllerAsyncRPCHandlerInitializer =>
registerHandler { (msg: IterationCompleted, _) =>
send(ResumeLoop(msg.startWorkerId, msg.endWorkerId), msg.startWorkerId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ case class WorkerExecution() extends Serializable {
private var state: WorkerState = UNINITIALIZED
// TODO: move stats onto ports, and make this as an aggregation func.
// TODO: separate state from stats
private var stats: WorkerStatistics = WorkerStatistics(UNINITIALIZED, 0, 0, 0, 0, 0)
private var stats: WorkerStatistics = WorkerStatistics(UNINITIALIZED, 0, 0, 0, 0, 0, 0)

def getState: WorkerState = state

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging {
}

def getPortId: PortIdentity = {
this.portId.get
if(this.portId.isEmpty){
PortIdentity()
}else{
this.portId.get
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@ package edu.uci.ics.amber.engine.architecture.scheduling
import com.twitter.util.Future
import edu.uci.ics.amber.engine.architecture.common.AkkaActorService
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
import edu.uci.ics.amber.engine.architecture.controller.ControllerEvent.{
WorkerAssignmentUpdate,
WorkflowStatsUpdate
}
import edu.uci.ics.amber.engine.architecture.controller.execution.{
OperatorExecution,
WorkflowExecution
}
import edu.uci.ics.amber.engine.architecture.controller.ControllerEvent.{WorkerAssignmentUpdate, WorkflowStatsUpdate}
import edu.uci.ics.amber.engine.architecture.controller.execution.{OperatorExecution, WorkflowExecution}
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.FatalErrorHandler.FatalError
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.LinkWorkersHandler.LinkWorkers
import edu.uci.ics.amber.engine.architecture.deploysemantics.PhysicalOp
import edu.uci.ics.amber.engine.architecture.pythonworker.promisehandlers.InitializeOperatorLogicHandler.InitializeOperatorLogic
import edu.uci.ics.amber.engine.architecture.scheduling.config.OperatorConfig
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AssignPortHandler.AssignPort
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.OpenOperatorHandler.OpenOperator
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.ResumeLoopHandler
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.StartHandler.StartWorker
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient
import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
import edu.uci.ics.amber.engine.common.workflow.PhysicalLink
import edu.uci.ics.texera.web.workflowruntimestate.WorkflowAggregatedState

import scala.collection.mutable
import scala.collection.Seq
class RegionExecutionController(
region: Region,
Expand Down Expand Up @@ -169,8 +164,9 @@ class RegionExecutionController(
}

private def connectChannels(links: Set[PhysicalLink]): Future[Seq[Unit]] = {
val additionalLinks = mutable.HashSet[PhysicalLink]()
Future.collect(
links.map { link: PhysicalLink => asyncRPCClient.send(LinkWorkers(link), CONTROLLER) }.toSeq
(additionalLinks ++ links).map { link: PhysicalLink => asyncRPCClient.send(LinkWorkers(link), CONTROLLER) }.toSeq
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package edu.uci.ics.amber.engine.architecture.worker
import com.softwaremill.macwire.wire
import edu.uci.ics.amber.engine.architecture.common.AmberProcessor
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.ConsoleMessageHandler.ConsoleMessageTriggered
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.ForLoopHandler.IterationCompleted
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.PortCompletedHandler.PortCompleted
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.WorkerExecutionCompletedHandler.WorkerExecutionCompleted
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.WorkerStateUpdatedHandler.WorkerStateUpdated
Expand All @@ -16,8 +17,10 @@ import edu.uci.ics.amber.engine.architecture.messaginglayer.{OutputManager, Work
import edu.uci.ics.amber.engine.architecture.scheduling.config.OperatorConfig
import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{
DPOutputIterator,
EndOfIteration,
FinalizeOperator,
FinalizePort
FinalizePort,
StartOfIteration
}
import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.PauseHandler.PauseWorker
Expand Down Expand Up @@ -46,6 +49,7 @@ import edu.uci.ics.amber.engine.common.virtualidentity.{
import edu.uci.ics.amber.engine.common.workflow.PortIdentity
import edu.uci.ics.amber.engine.common.{IOperatorExecutor, InputExhausted, VirtualIdentityUtils}
import edu.uci.ics.amber.error.ErrorUtils.{mkConsoleMessage, safely}
import edu.uci.ics.texera.workflow.operators.loop.{LoopEndOpExec, LoopStartOpExec}

import scala.collection.mutable

Expand All @@ -63,6 +67,10 @@ object DataProcessor {
case class FinalizePort(portId: PortIdentity, input: Boolean) extends SpecialDataTuple
case class FinalizeOperator() extends SpecialDataTuple

case class StartOfIteration(workerId: ActorVirtualIdentity) extends SpecialDataTuple
case class EndOfIteration(startWorkerId: ActorVirtualIdentity, endWorkerId: ActorVirtualIdentity)
extends SpecialDataTuple

class DPOutputIterator extends Iterator[(ITuple, Option[PortIdentity])] {
val queue = new mutable.Queue[(ITuple, Option[PortIdentity])]
@transient var outputIter: Iterator[(ITuple, Option[PortIdentity])] = Iterator.empty
Expand Down Expand Up @@ -118,7 +126,6 @@ class DataProcessor(
}
this.operatorConfig = operatorConfig
this.physicalOp = physicalOp

this.outputIterator.setTupleOutput(currentOutputIterator)
}

Expand Down Expand Up @@ -152,6 +159,7 @@ class DataProcessor(
*
* @return (input tuple count, output tuple count)
*/

def collectStatistics(): WorkerStatistics =
statisticsManager.getStatistics(stateManager.getCurrentState, operator)

Expand All @@ -170,7 +178,7 @@ class DataProcessor(
asyncRPCClient
)
)
if (tuple.isLeft) {
if (tuple.isLeft && !tuple.left.get.isInstanceOf[StartOfIteration]) {
statisticsManager.increaseInputTupleCount()
}
} catch safely {
Expand Down Expand Up @@ -204,6 +212,8 @@ class DataProcessor(
if (outputTuple == null) return

outputTuple match {
case EndOfIteration(startWorkerId, endWorkerId) =>
asyncRPCClient.send(IterationCompleted(startWorkerId, endWorkerId), CONTROLLER)
case FinalizeOperator() =>
outputManager.emitEndOfUpstream()
// Send Completed signal to worker actor.
Expand All @@ -219,7 +229,9 @@ class DataProcessor(
case FinalizePort(portId, input) =>
asyncRPCClient.send(PortCompleted(portId, input), CONTROLLER)
case _ =>
statisticsManager.increaseOutputTupleCount()
if (!outputTuple.isInstanceOf[StartOfIteration]) {
statisticsManager.increaseOutputTupleCount()
}
val outLinks = physicalOp.getOutputLinks(outputPortOpt)
outLinks.foreach(link => outputManager.passTupleToDownstream(outputTuple, link))
}
Expand Down Expand Up @@ -293,7 +305,9 @@ class DataProcessor(
.foreach(outputPortId =>
outputIterator.appendSpecialTupleToEnd(FinalizePort(outputPortId, input = false))
)
outputIterator.appendSpecialTupleToEnd(FinalizeOperator())
if (!operator.isInstanceOf[LoopStartOpExec]) {
outputIterator.appendSpecialTupleToEnd(FinalizeOperator())
}
}
}
statisticsManager.increaseDataProcessingTime(System.nanoTime() - dataProcessingStartTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
with QueryCurrentInputTupleHandler
with QueryStatisticsHandler
with ResumeHandler
with ResumeLoopHandler
with StartHandler
with AssignPortHandler
with AddInputChannelHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class StatisticsManager {
displayOut,
dataProcessingTime,
controlProcessingTime,
totalExecutionTime - dataProcessingTime - controlProcessingTime
totalExecutionTime - dataProcessingTime - controlProcessingTime,
0
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package edu.uci.ics.amber.engine.architecture.worker.promisehandlers

import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{EndOfIteration, StartOfIteration}
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.ResumeLoopHandler.{ResumeLoop, loopToSelfChannelId}
import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
import edu.uci.ics.amber.engine.common.ambermessage.{DataFrame, EndOfUpstream}
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, OperatorIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.engine.common.workflow.{PhysicalLink, PortIdentity}
import edu.uci.ics.texera.workflow.operators.loop.LoopStartOpExec

object ResumeLoopHandler {
final case class ResumeLoop(startWorkerId: ActorVirtualIdentity, endWorkerId: ActorVirtualIdentity) extends ControlCommand[Unit]

val loopSelfOp = PhysicalOpIdentity(OperatorIdentity("loopSelf"), "loopSelf")
val loopSelf = ActorVirtualIdentity("loopSelf")
val loopToSelfChannelId = ChannelIdentity(loopSelf, loopSelf, isControl = false)
}

trait ResumeLoopHandler {
this: DataProcessorRPCHandlerInitializer =>
registerHandler { (msg: ResumeLoop, _) =>
{
//val ls = dp.operator.asInstanceOf[LoopStartOpExec]
dp.processDataPayload(
loopToSelfChannelId,
DataFrame(Array(StartOfIteration(dp.actorId)
)))
// val loopStartToEndLink: PhysicalLink = PhysicalLink(
// msg.startWorkerId,
// PortIdentity(),
// ResumeLoopHandler.loopSelfOp,
// PortIdentity()
// )
//if (ls.iteration < ls.termination) {
// dp.processDataPayload(
// loopToSelfChannelId,
// DataFrame(ls.buffer.toArray ++ Array(EndOfIteration(dp.actorId)))
// )
//} else {
// dp.processDataPayload(loopToSelfChannelId, EndOfUpstream())
// }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.uci.ics.amber.engine.common

import edu.uci.ics.amber.engine.architecture.worker.DataProcessor.{EndOfIteration, StartOfIteration}
import edu.uci.ics.amber.engine.architecture.worker.PauseManager
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient
import edu.uci.ics.amber.engine.common.tuple.ITuple
Expand All @@ -13,6 +14,12 @@ trait ISinkOperatorExecutor extends IOperatorExecutor {
pauseManager: PauseManager,
asyncRPCClient: AsyncRPCClient
): Iterator[(ITuple, Option[PortIdentity])] = {
if (tuple.isLeft && tuple.left.get.isInstanceOf[StartOfIteration]) {
return Iterator.empty
}
if (tuple.isLeft && tuple.left.get.isInstanceOf[EndOfIteration]) {
return Iterator.empty
}
consume(tuple, input)
Iterator.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ case class OperatorStatistics(
numWorkers: Long,
aggregatedDataProcessingTime: Long,
aggregatedControlProcessingTime: Long,
aggregatedIdleTime: Long
aggregatedIdleTime: Long,
loopI: Long
)

case class OperatorStatisticsUpdateEvent(operatorStatistics: Map[String, OperatorStatistics])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class ExecutionStatsService(
stats.numWorkers,
stats.dataProcessingTime,
stats.controlProcessingTime,
stats.idleTime
stats.idleTime,
stats.loopI
)
(x._1, res)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object OperatorGroupConstants {
final val UTILITY_GROUP = "Utilities"
final val UDF_GROUP = "User-defined Functions"
final val VISUALIZATION_GROUP = "Visualization"
final val CONTROL_GROUP = "Control"

/**
* The order of the groups to show up in the frontend operator panel.
Expand All @@ -20,7 +21,8 @@ object OperatorGroupConstants {
GroupInfo(JOIN_GROUP, 3),
GroupInfo(UTILITY_GROUP, 4),
GroupInfo(UDF_GROUP, 5),
GroupInfo(VISUALIZATION_GROUP, 6)
GroupInfo(VISUALIZATION_GROUP, 6),
GroupInfo(CONTROL_GROUP, 7)
)

}
Loading