diff --git a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto index 5df9e7ab47..d9819affa2 100644 --- a/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto +++ b/core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto @@ -227,6 +227,7 @@ message AssignPortRequest { core.PortIdentity portId = 1 [(scalapb.field).no_box = true]; bool input = 2; map schema = 3; + string storageUri = 4; } message FinalizeCheckpointRequest { diff --git a/core/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py b/core/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py index 05b412d983..5691434a21 100644 --- a/core/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py +++ b/core/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py @@ -14,7 +14,10 @@ async def assign_port(self, req: AssignPortRequest) -> EmptyReturn: req.port_id, Schema(raw_schema=req.schema) ) else: + storage_uri = None + if req.storage_uri != "": + storage_uri = req.storage_uri self.context.output_manager.add_output_port( - req.port_id, Schema(raw_schema=req.schema) + req.port_id, Schema(raw_schema=req.schema), storage_uri ) return EmptyReturn() diff --git a/core/amber/src/main/python/core/architecture/packaging/output_manager.py b/core/amber/src/main/python/core/architecture/packaging/output_manager.py index b4ae3263ae..a28f93974b 100644 --- a/core/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/core/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -1,9 +1,13 @@ +import threading import typing from collections import OrderedDict from itertools import chain +from queue import Queue +from threading import Thread +from typing import Iterable, Iterator + from loguru import logger from pyarrow import Table -from typing import Iterable, Iterator from core.architecture.packaging.input_manager import WorkerPort, Channel from core.architecture.sendsemantics.broad_cast_partitioner import ( @@ -23,7 +27,19 @@ from core.models import Tuple, Schema, MarkerFrame from core.models.marker import Marker from core.models.payload import DataPayload, DataFrame +from core.storage.document_factory import DocumentFactory +from core.storage.runnables.port_storage_writer import ( + PortStorageWriter, + PortStorageWriterElement, +) from core.util import get_one_of +from core.util.virtual_identity import get_worker_index +from proto.edu.uci.ics.amber.core import ( + ActorVirtualIdentity, + PhysicalLink, + PortIdentity, + ChannelIdentity, +) from proto.edu.uci.ics.amber.engine.architecture.rpc import ChannelMarkerPayload from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import ( HashBasedShufflePartitioning, @@ -33,12 +49,6 @@ RangeBasedShufflePartitioning, BroadcastPartitioning, ) -from proto.edu.uci.ics.amber.core import ( - ActorVirtualIdentity, - PhysicalLink, - PortIdentity, - ChannelIdentity, -) class OutputManager: @@ -56,23 +66,89 @@ def __init__(self, worker_id: str): } self._ports: typing.Dict[PortIdentity, WorkerPort] = dict() self._channels: typing.Dict[ChannelIdentity, Channel] = dict() + self._port_storage_writers: typing.Dict[ + PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread] + ] = dict() - def add_output_port(self, port_id: PortIdentity, schema: Schema) -> None: + def add_output_port( + self, + port_id: PortIdentity, + schema: Schema, + storage_uri: typing.Optional[str] = None, + ) -> None: if port_id.id is None: port_id.id = 0 if port_id.internal is None: port_id.internal = False + if storage_uri is not None: + self.set_up_port_storage_writer(port_id, storage_uri) + # each port can only be added and initialized once. if port_id not in self._ports: self._ports[port_id] = WorkerPort(schema) + def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str): + """ + Create a separate thread for saving output tuples of a port + to storage in batch. + """ + document, _ = DocumentFactory.open_document(storage_uri) + buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) + writer_queue = Queue() + port_storage_writer = PortStorageWriter( + buffered_item_writer=buffered_item_writer, queue=writer_queue + ) + writer_thread = threading.Thread( + target=port_storage_writer.run, + daemon=True, + name=f"port_storage_writer_thread_{port_id}", + ) + writer_thread.start() + self._port_storage_writers[port_id] = ( + writer_queue, + port_storage_writer, + writer_thread, + ) + def get_port(self, port_id=None) -> WorkerPort: return list(self._ports.values())[0] def get_output_channel_ids(self): return self._channels.keys() + def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None: + """ + Optionally write the tuple to storage if the specified output port + is determined by the scheduler to need storage. This method is not blocking + because a separate thread is used to flush the tuple to storage in batch. + :param tuple_: A tuple produced by the data processor. + :param port_id: If not specified, the tuple will be written to all + output ports that need storage. + :return: + """ + if port_id is None: + for writer_queue, _, _ in self._port_storage_writers.values(): + writer_queue.put(PortStorageWriterElement(data_tuple=tuple_)) + elif port_id in self._port_storage_writers.keys(): + self._port_storage_writers[port_id][0].put( + PortStorageWriterElement(data_tuple=tuple_) + ) + + def close_port_storage_writers(self) -> None: + """ + Flush the buffers of port storage writers and wait for all the + writer threads to finish, which indicates the port storage writing + are finished. + """ + for _, writer, _ in self._port_storage_writers.values(): + # This non-blocking stop call will let the storage writers + # flush the remaining buffer + writer.stop() + for _, _, writer_thread in self._port_storage_writers.values(): + # This blocking call will wait for all the writer to finish commit + writer_thread.join() + def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None: """ Add down stream operator and its transfer policy diff --git a/core/amber/src/main/python/core/runnables/main_loop.py b/core/amber/src/main/python/core/runnables/main_loop.py index f51da6b3e5..51c8c5c586 100644 --- a/core/amber/src/main/python/core/runnables/main_loop.py +++ b/core/amber/src/main/python/core/runnables/main_loop.py @@ -189,6 +189,9 @@ def process_input_tuple(self) -> None: payload=batch, ) ) + self.context.output_manager.save_tuple_to_storage_if_needed( + output_tuple + ) def process_input_state(self) -> None: self._switch_context() @@ -288,6 +291,8 @@ def _process_end_of_output_ports(self, _: EndOfOutputPorts) -> None: :param _: EndOfOutputPorts """ + self.context.output_manager.close_port_storage_writers() + for to, batch in self.context.output_manager.emit_marker(EndOfInputChannel()): self._output_queue.put( DataElement( diff --git a/core/amber/src/main/python/core/storage/document_factory.py b/core/amber/src/main/python/core/storage/document_factory.py index 9c0c9e058d..d6687906bb 100644 --- a/core/amber/src/main/python/core/storage/document_factory.py +++ b/core/amber/src/main/python/core/storage/document_factory.py @@ -1,3 +1,4 @@ +import typing from urllib.parse import urlparse from typing import Optional @@ -65,7 +66,7 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument: ) @staticmethod - def open_document(uri: str) -> (VirtualDocument, Optional[Schema]): + def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]: parsed_uri = urlparse(uri) if parsed_uri.scheme == "vfs": _, _, _, _, _, resource_type = VFSURIFactory.decode_uri(uri) diff --git a/core/amber/src/main/python/core/storage/runnables/__init__.py b/core/amber/src/main/python/core/storage/runnables/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/amber/src/main/python/core/storage/runnables/port_storage_writer.py b/core/amber/src/main/python/core/storage/runnables/port_storage_writer.py new file mode 100644 index 0000000000..e5edf24129 --- /dev/null +++ b/core/amber/src/main/python/core/storage/runnables/port_storage_writer.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass + +from overrides import overrides + +from core.models import Tuple +from core.storage.model.buffered_item_writer import BufferedItemWriter +from core.util import StoppableQueueBlockingRunnable, IQueue +from core.util.customized_queue.queue_base import QueueElement + + +@dataclass +class PortStorageWriterElement(QueueElement): + data_tuple: Tuple + + +class PortStorageWriter(StoppableQueueBlockingRunnable): + def __init__(self, buffered_item_writer: BufferedItemWriter, queue: IQueue): + super().__init__(name=self.__class__.__name__, queue=queue) + self.buffered_item_writer: BufferedItemWriter = buffered_item_writer + + @overrides + def receive(self, next_entry: QueueElement) -> None: + if isinstance(next_entry, PortStorageWriterElement): + self.buffered_item_writer.put_one(next_entry.data_tuple) + else: + raise TypeError(f"Unexpected entry {next_entry}") + + @overrides + def pre_start(self) -> None: + self.buffered_item_writer.open() + + @overrides + def post_stop(self) -> None: + self.buffered_item_writer.close() diff --git a/core/amber/src/main/python/core/storage/storage_config.py b/core/amber/src/main/python/core/storage/storage_config.py index 4e81072a8e..89375041b9 100644 --- a/core/amber/src/main/python/core/storage/storage_config.py +++ b/core/amber/src/main/python/core/storage/storage_config.py @@ -34,7 +34,7 @@ def initialize( cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path - cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = commit_batch_size + cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size) cls._initialized = True def __new__(cls, *args, **kwargs): diff --git a/core/amber/src/main/python/core/util/virtual_identity/__init__.py b/core/amber/src/main/python/core/util/virtual_identity/__init__.py new file mode 100644 index 0000000000..22132bd5d3 --- /dev/null +++ b/core/amber/src/main/python/core/util/virtual_identity/__init__.py @@ -0,0 +1,10 @@ +import re + +worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)") + + +def get_worker_index(worker_id: str) -> int: + match = worker_name_pattern.match(worker_id) + if match: + return int(match.group(2)) + raise ValueError("Invalid worker ID format") diff --git a/core/amber/src/main/python/proto/edu/uci/ics/amber/core/__init__.py b/core/amber/src/main/python/proto/edu/uci/ics/amber/core/__init__.py index 31c15ddbc9..051eecbea1 100644 --- a/core/amber/src/main/python/proto/edu/uci/ics/amber/core/__init__.py +++ b/core/amber/src/main/python/proto/edu/uci/ics/amber/core/__init__.py @@ -74,6 +74,13 @@ class PortIdentity(betterproto.Message): internal: bool = betterproto.bool_field(2) +@dataclass(eq=False, repr=False) +class GlobalPortIdentity(betterproto.Message): + op_id: "PhysicalOpIdentity" = betterproto.message_field(1) + port_id: "PortIdentity" = betterproto.message_field(2) + input: bool = betterproto.bool_field(3) + + @dataclass(eq=False, repr=False) class InputPort(betterproto.Message): id: "PortIdentity" = betterproto.message_field(1) diff --git a/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py b/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py index 9320b54e36..8f91bc7882 100644 --- a/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py +++ b/core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/__init__.py @@ -344,6 +344,7 @@ class AssignPortRequest(betterproto.Message): schema: Dict[str, str] = betterproto.map_field( 3, betterproto.TYPE_STRING, betterproto.TYPE_STRING ) + storage_uri: str = betterproto.string_field(4) @dataclass(eq=False, repr=False) diff --git a/core/amber/src/main/python/texera_run_python_worker.py b/core/amber/src/main/python/texera_run_python_worker.py index eac23c8d7b..b975dafe58 100644 --- a/core/amber/src/main/python/texera_run_python_worker.py +++ b/core/amber/src/main/python/texera_run_python_worker.py @@ -3,6 +3,7 @@ from loguru import logger from core.python_worker import PythonWorker +from core.storage.storage_config import StorageConfig def init_loguru_logger(stream_log_level) -> None: @@ -22,8 +23,28 @@ def init_loguru_logger(stream_log_level) -> None: if __name__ == "__main__": - _, worker_id, output_port, logger_level, r_path = sys.argv + ( + _, + worker_id, + output_port, + logger_level, + r_path, + iceberg_postgres_catalog_uri_without_scheme, + iceberg_postgres_catalog_username, + iceberg_postgres_catalog_password, + iceberg_table_namespace, + iceberg_file_storage_directory_path, + iceberg_table_commit_batch_size, + ) = sys.argv init_loguru_logger(logger_level) + StorageConfig.initialize( + iceberg_postgres_catalog_uri_without_scheme, + iceberg_postgres_catalog_username, + iceberg_postgres_catalog_password, + iceberg_table_namespace, + iceberg_file_storage_directory_path, + iceberg_table_commit_batch_size, + ) # Setting R_HOME environment variable for R-UDF usage if r_path: diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index 988a93d0a8..b2aa7aa90c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -2,6 +2,7 @@ package edu.uci.ics.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future import edu.uci.ics.amber.core.WorkflowRuntimeException +import edu.uci.ics.amber.core.workflow.GlobalPortIdentity import edu.uci.ics.amber.engine.architecture.controller.{ ControllerAsyncRPCHandlerInitializer, FatalError @@ -12,7 +13,6 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ QueryStatisticsRequest } import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import edu.uci.ics.amber.engine.architecture.scheduling.GlobalPortIdentity import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER import edu.uci.ics.amber.util.VirtualIdentityUtils diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala index e6ce0c8423..50af5c780a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -1,13 +1,11 @@ package edu.uci.ics.amber.engine.architecture.messaginglayer import edu.uci.ics.amber.core.marker.Marker -import edu.uci.ics.amber.core.tuple.{ - FinalizeExecutor, - FinalizePort, - Schema, - SchemaEnforceable, - TupleLike -} +import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.storage.model.BufferedItemWriter +import edu.uci.ics.amber.core.tuple._ +import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} +import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity} import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{ DPOutputIterator, getBatchSize, @@ -15,10 +13,14 @@ import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{ } import edu.uci.ics.amber.engine.architecture.sendsemantics.partitioners._ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._ +import edu.uci.ics.amber.engine.architecture.worker.managers.{ + OutputPortResultWriterThread, + PortStorageWriterTerminateSignal +} import edu.uci.ics.amber.engine.common.AmberLogging -import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} -import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity} +import edu.uci.ics.amber.util.VirtualIdentityUtils +import java.net.URI import scala.collection.mutable object OutputManager { @@ -99,6 +101,10 @@ class OutputManager( private val networkOutputBuffers = mutable.HashMap[(PhysicalLink, ActorVirtualIdentity), NetworkOutputBuffer]() + private val outputPortResultWriterThreads + : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = + mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -121,12 +127,12 @@ class OutputManager( * Push one tuple to the downstream, will be batched by each transfer partitioning. * Should ONLY be called by DataProcessor. * - * @param tupleLike TupleLike to be passed. + * @param tuple TupleLike to be passed. * @param outputPortId Optionally specifies the output port from which the tuple should be emitted. * If None, the tuple is broadcast to all output ports. */ def passTupleToDownstream( - tupleLike: SchemaEnforceable, + tuple: Tuple, outputPortId: Option[PortIdentity] = None ): Unit = { (outputPortId match { @@ -134,8 +140,6 @@ class OutputManager( case None => partitioners // send to all ports }).foreach { case (link, partitioner) => - // Enforce schema based on the port's schema - val tuple = tupleLike.enforceSchema(getPort(link.fromPortId).schema) partitioner.getBucketIndex(tuple).foreach { bucketIndex => networkOutputBuffers((link, partitioner.allReceivers(bucketIndex))).addTuple(tuple) } @@ -170,13 +174,56 @@ class OutputManager( networkOutputBuffers.foreach(kv => kv._2.sendMarker(marker)) } - def addPort(portId: PortIdentity, schema: Schema): Unit = { + def addPort(portId: PortIdentity, schema: Schema, storageURIOption: Option[URI]): Unit = { // each port can only be added and initialized once. if (this.ports.contains(portId)) { return } this.ports(portId) = WorkerPort(schema) + // if a storage URI is provided, set up a storage writer thread + storageURIOption match { + case Some(storageUri) => setupOutputStorageWriterThread(portId, storageUri) + case None => // No need to add a writer + } + } + + /** + * Optionally write the tuple to storage if the specified output port is determined by the scheduler to need storage. + * This method is not blocking because a separate thread is used to flush the tuple to storage in batch. + * + * @param tuple TupleLike to be written to storage. + * @param outputPortId If not specified, the tuple will be written to all output ports that need storage. + */ + def saveTupleToStorageIfNeeded( + tuple: Tuple, + outputPortId: Option[PortIdentity] = None + ): Unit = { + (outputPortId match { + case Some(portId) => + this.outputPortResultWriterThreads.get(portId) match { + case Some(_) => this.outputPortResultWriterThreads.filter(_._1 == portId) + case None => Map.empty + } + case None => this.outputPortResultWriterThreads + }).foreach({ + case (portId, writerThread) => + // write to storage in a separate thread + writerThread.queue.put(Left(tuple)) + }) + } + + /** + * Singal the port storage writers to flush the remaining buffer and wait for commits to finish so that + * the output ports are properly completed. + */ + def closeOutputStorageWriters(): Unit = { + // Non-blocking calls + this.outputPortResultWriterThreads.values.foreach(writerThread => + writerThread.queue.put(Right(PortStorageWriterTerminateSignal)) + ) + // Blocking calls + this.outputPortResultWriterThreads.values.foreach(writerThread => writerThread.join()) } def getPort(portId: PortIdentity): WorkerPort = ports(portId) @@ -196,4 +243,15 @@ class OutputManager( ports.head._1 } + private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { + val bufferedItemWriter = DocumentFactory + .openDocument(storageUri) + ._1 + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) + this.outputPortResultWriterThreads(portId) = writerThread + writerThread.start() + } + } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index c3abd65285..0ab6402637 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -4,6 +4,7 @@ import akka.actor.Props import com.twitter.util.Promise import com.typesafe.config.{Config, ConfigFactory} import edu.uci.ics.amber.core.virtualidentity.ChannelIdentity +import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.engine.architecture.common.WorkflowActor import edu.uci.ics.amber.engine.architecture.common.WorkflowActor.NetworkAck import edu.uci.ics.amber.engine.architecture.messaginglayer.{ @@ -159,7 +160,13 @@ class PythonWorkflowWorker( workerConfig.workerId.name, Integer.toString(pythonProxyServer.getPortNumber.get()), config.getString("python.log.streamHandler.level"), - RENVPath + RENVPath, + StorageConfig.icebergPostgresCatalogUriWithoutScheme, + StorageConfig.icebergPostgresCatalogUsername, + StorageConfig.icebergPostgresCatalogPassword, + StorageConfig.icebergTableResultNamespace, + StorageConfig.fileStorageDirectoryPath.toString, + StorageConfig.icebergTableCommitBatchSize.toString ) ).run(BasicIO.standard(false)) } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index eb848c1d8f..da3b0a16b3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -1,9 +1,20 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} -import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging} +import edu.uci.ics.amber.core.storage.VFSURIFactory.createResultURI import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.core.workflow.PhysicalLink +import edu.uci.ics.amber.core.workflow.{ + GlobalPortIdentity, + PhysicalLink, + PhysicalPlan, + WorkflowContext +} +import edu.uci.ics.amber.engine.architecture.scheduling.config.{ + LinkConfig, + OperatorConfig, + PortConfig, + ResourceConfig +} +import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging} import org.jgrapht.alg.connectivity.BiconnectivityInspector import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph} @@ -83,16 +94,37 @@ class CostBasedScheduleGenerator( }) .filter(link => operatorIds.contains(link.fromOpId)) val operators = operatorIds.map(operatorId => physicalPlan.getOperator(operatorId)) - val materializedPortIds: Set[GlobalPortIdentity] = matEdges.flatMap(link => - List( - GlobalPortIdentity(link.fromOpId, link.fromPortId, input = false) + val portIdsToViewResult: Set[GlobalPortIdentity] = + workflowContext.workflowSettings.outputPortsNeedingStorage + .filter(outputPort => operatorIds.contains(outputPort.opId)) + val portIdsNeedingStorage: Set[GlobalPortIdentity] = matEdges + .diff(physicalPlan.getDependeeLinks) + .filter(matLink => operatorIds.contains(matLink.fromOpId)) + .flatMap(link => + List( + GlobalPortIdentity(link.fromOpId, link.fromPortId) + ) + ) ++ portIdsToViewResult + val portConfigs = portIdsNeedingStorage + .map(outputPortId => + outputPortId -> { + val uri = createResultURI( + workflowId = workflowContext.workflowId, + executionId = workflowContext.executionId, + operatorId = outputPortId.opId.logicalOpId, + layerName = Some(outputPortId.opId.layerName), + portIdentity = outputPortId.portId + ) + PortConfig(storageURI = uri) + } ) - ) + .toMap + val resourceConfig = ResourceConfig(portConfigs = portConfigs) Region( id = RegionIdentity(idx), physicalOps = operators, physicalLinks = links, - materializedPortIds = materializedPortIds + resourceConfig = Some(resourceConfig) ) } } @@ -181,10 +213,24 @@ class CostBasedScheduleGenerator( ) ) } - // Since the plan is now schedulable, calling the search directly returns a region DAG. + // Calling the search again to include cache read ops in the regions. + // This new search is only needed because of additional cache read operators which alters the physical plan. + // However, as the new physical plan is already schedulable, the original materialized ports of each region will not + // be included in the new region DAG. + // TODO: remove this step after cache read is removed. val regionDAG = bottomUpSearch().regionDAG + addMaterializationsAsRegionLinks(linksToMaterialize, regionDAG) populateDependeeLinks(regionDAG) + // The materialized ports of each region are already decided by the first RPG search + // However they will be lost after the second search as a consequence of the modified physical plan. + // The second search is only needed because of additional cache read operators. + // Need to add the original materialized ports back. This will not be needed after removal of cache read ops. + // TODO: remove this step after cache read is removed. + val outputPortsToMaterialize = linksToMaterialize.map(link => + GlobalPortIdentity(opId = link.fromOpId, portId = link.fromPortId) + ) + updateRegionsWithOutputPortStorage(outputPortsToMaterialize, regionDAG) allocateResource(regionDAG) regionDAG } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala index c0f8231359..dc405a71d1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -67,9 +67,13 @@ class DefaultCostEstimator( val longestRunningOpExecutionTime = opExecutionTimes.max longestRunningOpExecutionTime case None => - // Without past statistics (e.g., first execution), we use number of materialized ports as the cost. + // Without past statistics (e.g., first execution), we use number of ports needing storage as the cost. + // Each port needing storage has a portConfig. // This is independent of the schedule / resource allocator. - region.materializedPortIds.size + region.resourceConfig match { + case Some(config) => config.portConfigs.size + case None => 0 + } } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala index 2457d09b4c..b5aba53b87 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala @@ -2,9 +2,13 @@ package edu.uci.ics.amber.engine.architecture.scheduling import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.WorkflowRuntimeException -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{ + GlobalPortIdentity, + PhysicalLink, + PhysicalPlan, + WorkflowContext +} import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import edu.uci.ics.amber.core.workflow.PhysicalLink import org.jgrapht.alg.connectivity.BiconnectivityInspector import org.jgrapht.graph.DirectedAcyclicGraph @@ -164,6 +168,8 @@ class ExpansionGreedyScheduleGenerator( val matReaderWriterPairs = new mutable.HashMap[PhysicalOpIdentity, PhysicalOpIdentity]() + val outputPortsToMaterialize = new mutable.HashSet[GlobalPortIdentity]() + @tailrec def recConnectRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = { tryConnectRegionDAG() match { @@ -174,6 +180,10 @@ class ExpansionGreedyScheduleGenerator( link, matReaderWriterPairs ) + outputPortsToMaterialize += GlobalPortIdentity( + opId = link.fromOpId, + portId = link.fromPortId + ) } recConnectRegionDAG() } @@ -202,6 +212,8 @@ class ExpansionGreedyScheduleGenerator( // mark links that go to downstream regions populateDependeeLinks(regionDAG) + updateRegionsWithOutputPortStorage(outputPortsToMaterialize.toSet, regionDAG) + // allocate resources on regions allocateResource(regionDAG) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Region.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Region.scala index f90bae23cc..7a5be55159 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Region.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Region.scala @@ -1,26 +1,22 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.workflow.PhysicalOp -import edu.uci.ics.amber.engine.architecture.scheduling.config.ResourceConfig import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity} +import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp} +import edu.uci.ics.amber.engine.architecture.scheduling.config.ResourceConfig import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} import org.jgrapht.traverse.TopologicalOrderIterator +import java.net.URI import scala.jdk.CollectionConverters.IteratorHasAsScala case class RegionLink(fromRegionId: RegionIdentity, toRegionId: RegionIdentity) case class RegionIdentity(id: Long) - -case class GlobalPortIdentity(opId: PhysicalOpIdentity, portId: PortIdentity, input: Boolean) - case class Region( id: RegionIdentity, physicalOps: Set[PhysicalOp], physicalLinks: Set[PhysicalLink], - resourceConfig: Option[ResourceConfig] = None, - materializedPortIds: Set[GlobalPortIdentity] = Set.empty + resourceConfig: Option[ResourceConfig] = None ) { private val operators: Map[PhysicalOpIdentity, PhysicalOp] = diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 3d58c3635e..20de6a84a8 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -1,7 +1,10 @@ package edu.uci.ics.amber.engine.architecture.scheduling import com.twitter.util.Future -import edu.uci.ics.amber.core.workflow.PhysicalOp +import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.storage.VFSURIFactory.decodeURI +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping +import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp} import edu.uci.ics.amber.engine.architecture.common.{AkkaActorService, ExecutorDeployment} import edu.uci.ics.amber.engine.architecture.controller.execution.{ OperatorExecution, @@ -22,10 +25,17 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.{ EmptyReturn, WorkflowAggregatedState } -import edu.uci.ics.amber.engine.architecture.scheduling.config.{OperatorConfig, ResourceConfig} +import edu.uci.ics.amber.engine.architecture.scheduling.config.{ + OperatorConfig, + PortConfig, + ResourceConfig +} +import edu.uci.ics.amber.engine.common.AmberConfig import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER -import edu.uci.ics.amber.core.workflow.PhysicalLink +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource + +import java.net.URI class RegionExecutionCoordinator( region: Region, @@ -38,6 +48,9 @@ class RegionExecutionCoordinator( // fetch resource config val resourceConfig = region.resourceConfig.get + // Create storage objects for output ports of the region + createOutputPortStorageObjects(resourceConfig.portConfigs) + val regionExecution = workflowExecution.getRegionExecution(region.id) region.getOperators.foreach(physicalOp => { @@ -150,23 +163,39 @@ class RegionExecutionCoordinator( val inputPortMapping = physicalOp.inputPorts .flatMap { case (inputPortId, (_, _, Right(schema))) => - Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> schema) + // Currently input ports do not have URIs associated with them because + // we are using cache read operators to read materialized port storage. + // TODO: also add storageURI for input ports when cache read ops are removed. + Some(GlobalPortIdentity(physicalOp.id, inputPortId, input = true) -> ("", schema)) case _ => None } val outputPortMapping = physicalOp.outputPorts .flatMap { case (outputPortId, (_, _, Right(schema))) => - Some(GlobalPortIdentity(physicalOp.id, outputPortId, input = false) -> schema) + val storageURI = resourceConfig.portConfigs.get( + GlobalPortIdentity(opId = physicalOp.id, portId = outputPortId) + ) match { + case Some(portConfig) => portConfig.storageURI.toString + case None => "" + } + Some( + GlobalPortIdentity(physicalOp.id, outputPortId) -> (storageURI, schema) + ) case _ => None } inputPortMapping ++ outputPortMapping } .flatMap { - case (globalPortId, schema) => + case (globalPortId, (storageUri, schema)) => resourceConfig.operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId).map { workerId => asyncRPCClient.workerInterface.assignPort( - AssignPortRequest(globalPortId.portId, globalPortId.input, schema.toRawSchema), + AssignPortRequest( + globalPortId.portId, + globalPortId.input, + schema.toRawSchema, + storageUri + ), asyncRPCClient.mkContext(workerId) ) } @@ -233,4 +262,43 @@ class RegionExecutionCoordinator( ) } + private def createOutputPortStorageObjects( + portConfigs: Map[GlobalPortIdentity, PortConfig] + ): Unit = { + portConfigs.foreach { + case (outputPortId, portConfig: PortConfig) => + val storageUriToAdd = portConfig.storageURI + val (wid, eid, _, _, _, _) = decodeURI(storageUriToAdd) + val existingStorageUri = + WorkflowExecutionsResource.getResultUriByExecutionAndPort( + wid = wid, + eid = eid, + opId = outputPortId.opId.logicalOpId, + layerName = Some(outputPortId.opId.layerName), + portId = outputPortId.portId + ) + if ( + (!AmberConfig.isUserSystemEnabled && !ExecutionResourcesMapping + .getResourceURIs(eid) + .contains( + existingStorageUri + )) || (AmberConfig.isUserSystemEnabled && existingStorageUri.isEmpty) + ) { + // Avoid duplicate creation bacause of operators with dependee inputs belonging to two regions + val schemaOptional = + region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 + val schema = + schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) + DocumentFactory.createDocument(storageUriToAdd, schema) + WorkflowExecutionsResource.insertOperatorPortResultUri( + eid = eid, + opId = outputPortId.opId.logicalOpId, + layerName = outputPortId.opId.layerName, + portId = outputPortId.portId, + uri = storageUriToAdd + ) + } + } + } + } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlan.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlan.scala index 331bad8788..963664ca99 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/RegionPlan.scala @@ -1,6 +1,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling -import edu.uci.ics.amber.core.workflow.PhysicalLink +import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 75a3b68bf9..6682260ed6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,17 +1,23 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.executor.OpExecSink -import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.storage.VFSURIFactory +import edu.uci.ics.amber.core.storage.VFSURIFactory.createResultURI +import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity +import edu.uci.ics.amber.core.workflow.{ + GlobalPortIdentity, + PhysicalLink, + PhysicalOp, + PhysicalPlan, + WorkflowContext +} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex +import edu.uci.ics.amber.engine.architecture.scheduling.config.{PortConfig, ResourceConfig} import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ DefaultResourceAllocator, ExecutionClusterInfo } import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory -import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import edu.uci.ics.amber.core.workflow.PhysicalLink -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator @@ -75,10 +81,13 @@ abstract class ScheduleGenerator( def allocateResource( regionDAG: DirectedAcyclicGraph[Region, RegionLink] ): Unit = { - val dataTransferBatchSize = workflowContext.workflowSettings.dataTransferBatchSize val resourceAllocator = - new DefaultResourceAllocator(physicalPlan, executionClusterInfo, dataTransferBatchSize) + new DefaultResourceAllocator( + physicalPlan, + executionClusterInfo, + workflowContext.workflowSettings + ) // generate the resource configs new TopologicalOrderIterator(regionDAG).asScala .foreach(region => { @@ -153,18 +162,18 @@ abstract class ScheduleGenerator( .removeLink(physicalLink) // create the uri of the materialization storage - val storageUri = VFSURIFactory.createMaterializedResultURI( + val storageURI = VFSURIFactory.createResultURI( workflowContext.workflowId, workflowContext.executionId, physicalLink.fromOpId.logicalOpId, - s"${physicalLink.fromOpId.layerName}_materialization", + Some(physicalLink.fromOpId.layerName), physicalLink.fromPortId ) val fromPortOutputMode = physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - storageUri, + storageURI, fromPortOutputMode ) @@ -172,7 +181,7 @@ abstract class ScheduleGenerator( val existingOperator = newPhysicalPlan.operators.find { case op if op.opExecInitInfo.isInstanceOf[OpExecSink] => val OpExecSink(uri, _, _) = op.opExecInitInfo - uri == storageUri.toString + uri == storageURI.toString case _ => false } @@ -188,32 +197,24 @@ abstract class ScheduleGenerator( newPhysicalPlan = newPhysicalPlan .addOperator(matWriterPhysicalOp) .addLink(sourceToWriterLink) - - // sink has exactly one input port and one output port - val schema = newPhysicalPlan - .getOperator(matWriterPhysicalOp.id) - .outputPorts(matWriterPhysicalOp.outputPorts.keys.head) - ._3 - .toOption - .get - // create the document - DocumentFactory.createDocument(storageUri, schema) - WorkflowExecutionsResource.insertOperatorPortResultUri( - workflowContext.executionId, - physicalLink.fromOpId.logicalOpId, - s"${physicalLink.fromOpId.layerName}_materialization", - physicalLink.fromPortId, - storageUri - ) } // create cache reader and link + + val schema = newPhysicalPlan + .getOperator(fromOp.id) + .outputPorts(fromPortId) + ._3 + .toOption + .get + val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp( workflowContext.workflowId, workflowContext.executionId, - storageUri, + storageURI, toOp.id, - toPortId + toPortId, + schema ) val readerToDestLink = PhysicalLink( @@ -228,4 +229,32 @@ abstract class ScheduleGenerator( .addOperator(matReaderPhysicalOp) .addLink(readerToDestLink) } + + def updateRegionsWithOutputPortStorage( + outputPortsToMaterialize: Set[GlobalPortIdentity], + regionDAG: DirectedAcyclicGraph[Region, RegionLink] + ): Unit = { + (outputPortsToMaterialize ++ workflowContext.workflowSettings.outputPortsNeedingStorage) + .foreach(outputPortId => { + getRegions(outputPortId.opId, regionDAG).foreach(fromRegion => { + val portConfigToAdd = outputPortId -> { + val uriToAdd = createResultURI( + workflowId = workflowContext.workflowId, + executionId = workflowContext.executionId, + operatorId = outputPortId.opId.logicalOpId, + layerName = Some(outputPortId.opId.layerName), + portIdentity = outputPortId.portId + ) + PortConfig(storageURI = uriToAdd) + } + val newResourceConfig = fromRegion.resourceConfig match { + case Some(existingConfig) => + existingConfig.copy(portConfigs = existingConfig.portConfigs + portConfigToAdd) + case None => ResourceConfig(portConfigs = Map(portConfigToAdd)) + } + val newFromRegion = fromRegion.copy(resourceConfig = Some(newResourceConfig)) + replaceVertex(regionDAG, fromRegion, newFromRegion) + }) + }) + } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index e1bcb2289e..291d2a778a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -6,7 +6,7 @@ import edu.uci.ics.amber.engine.architecture.common.AkkaActorService import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig import edu.uci.ics.amber.engine.architecture.controller.execution.WorkflowExecution import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient -import edu.uci.ics.amber.core.workflow.PhysicalLink +import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} import scala.collection.mutable diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/PortConfig.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/PortConfig.scala new file mode 100644 index 0000000000..15a600a122 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/PortConfig.scala @@ -0,0 +1,5 @@ +package edu.uci.ics.amber.engine.architecture.scheduling.config + +import java.net.URI + +case class PortConfig(storageURI: URI) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/ResourceConfig.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/ResourceConfig.scala index fd91ccac38..fb836fab12 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/ResourceConfig.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/ResourceConfig.scala @@ -1,9 +1,12 @@ package edu.uci.ics.amber.engine.architecture.scheduling.config import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import edu.uci.ics.amber.core.workflow.PhysicalLink +import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} + +import java.net.URI case class ResourceConfig( - operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig], - linkConfigs: Map[PhysicalLink, LinkConfig] + operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig] = Map.empty, + linkConfigs: Map[PhysicalLink, LinkConfig] = Map.empty, + portConfigs: Map[GlobalPortIdentity, PortConfig] = Map.empty ) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala index 255d417776..e19d12c1df 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala @@ -1,6 +1,7 @@ package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies -import edu.uci.ics.amber.core.workflow.{PartitionInfo, PhysicalPlan, UnknownPartition} +import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity +import edu.uci.ics.amber.core.workflow._ import edu.uci.ics.amber.engine.architecture.scheduling.Region import edu.uci.ics.amber.engine.architecture.scheduling.config.ChannelConfig.generateChannelConfigs import edu.uci.ics.amber.engine.architecture.scheduling.config.LinkConfig.toPartitioning @@ -8,10 +9,9 @@ import edu.uci.ics.amber.engine.architecture.scheduling.config.WorkerConfig.gene import edu.uci.ics.amber.engine.architecture.scheduling.config.{ LinkConfig, OperatorConfig, + PortConfig, ResourceConfig } -import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity -import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity} import scala.collection.mutable @@ -22,7 +22,7 @@ trait ResourceAllocator { class DefaultResourceAllocator( physicalPlan: PhysicalPlan, executionClusterInfo: ExecutionClusterInfo, - dataTransferBatchSize: Int + workflowSettings: WorkflowSettings ) extends ResourceAllocator { // a map of a physical link to the partition info of the upstream/downstream of this link @@ -70,14 +70,25 @@ class DefaultResourceAllocator( operatorConfigs(physicalLink.fromOpId).workerConfigs.map(_.workerId), operatorConfigs(physicalLink.toOpId).workerConfigs.map(_.workerId), linkPartitionInfos(physicalLink), - this.dataTransferBatchSize + workflowSettings.dataTransferBatchSize ) ) }.toMap linkConfigs ++= linkToLinkConfigMapping - val resourceConfig = ResourceConfig(opToOperatorConfigMapping, linkToLinkConfigMapping) + val portConfigs = region.resourceConfig match { + case Some(existingResourceConfig) => existingResourceConfig.portConfigs + case None => + val newPortConfigs: Map[GlobalPortIdentity, PortConfig] = Map.empty + newPortConfigs + } + + val resourceConfig = ResourceConfig( + opToOperatorConfigMapping, + linkToLinkConfigMapping, + portConfigs + ) (region.copy(resourceConfig = Some(resourceConfig)), 0) } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala index 098749d9fb..12cd87862e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala @@ -165,6 +165,7 @@ class DataProcessor( outputTuple match { case FinalizeExecutor() => + outputManager.closeOutputStorageWriters() outputManager.emitMarker(EndOfInputChannel()) // Send Completed signal to worker actor. executor.close() @@ -188,7 +189,8 @@ class DataProcessor( val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) - outputManager.passTupleToDownstream(schemaEnforceable, outputPortOpt) + outputManager.passTupleToDownstream(tuple, outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt) case other => // skip for now } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala new file mode 100644 index 0000000000..927f12ced3 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -0,0 +1,30 @@ +package edu.uci.ics.amber.engine.architecture.worker.managers + +import com.google.common.collect.Queues +import edu.uci.ics.amber.core.storage.model.BufferedItemWriter +import edu.uci.ics.amber.core.tuple.Tuple + +import java.util.concurrent.LinkedBlockingQueue + +sealed trait TerminateSignal +case object PortStorageWriterTerminateSignal extends TerminateSignal + +class OutputPortResultWriterThread( + bufferedItemWriter: BufferedItemWriter[Tuple] +) extends Thread { + + val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = + Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() + + override def run(): Unit = { + var internalStop = false + while (!internalStop) { + val queueContent = queue.take() + queueContent match { + case Left(tuple) => bufferedItemWriter.putOne(tuple) + case Right(_) => internalStop = true + } + } + bufferedItemWriter.close() + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index 790d0c3df7..c319957363 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -9,6 +9,8 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import java.net.URI + trait AssignPortHandler { this: DataProcessorRPCHandlerInitializer => @@ -17,7 +19,11 @@ trait AssignPortHandler { if (msg.input) { dp.inputManager.addPort(msg.portId, schema) } else { - dp.outputManager.addPort(msg.portId, schema) + val storageURIOption: Option[URI] = msg.storageUri match { + case "" => None + case uriString => Some(URI.create(uriString)) + } + dp.outputManager.addPort(msg.portId, schema, storageURIOption) } EmptyReturn() } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 3cbe308539..43e81bd5a3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -179,7 +179,13 @@ object WorkflowExecutionsResource { /** * @param layerName optional, if not specified, the method will return only the first layer (physicalop) stored with * external ports - * @return + * @return If user system is enabled, this method trys to find a URI regardless of whether layerName is provided. + * None will be returned only if the specified storage URI for the port does not exist in the database. + * If user system is not enabled, when layerName is provided, this method creates the URI directly; else + * this method also trys to find the URI from ExecutionResourcesMapping. + * TODO: Get rid of layerName and optimize the lookup (currently if no layerName is provided, the lookup is + * O(n), where n is the number of physical ops of the specified logical op. + * TODO: Refactor this method when user system is permenantly enabled even in dev mode */ def getResultUriByExecutionAndPort( wid: WorkflowIdentity, @@ -236,15 +242,27 @@ object WorkflowExecutionsResource { } } } else { - Option( - VFSURIFactory.createResultURI( - wid, - eid, - opId, - layerName, - portId + if (layerName.nonEmpty) { + Option( + VFSURIFactory.createResultURI( + wid, + eid, + opId, + layerName, + portId + ) ) - ) + } else { + // Dev mode without user system. Use ExecutionResourcesMapping to find URI by using eid, opId and portId + // to match a URI, and the port should only be an external port as this is requested by the frontend. + ExecutionResourcesMapping + .getResourceURIs(eid) + .find(uri => { + val (_, _, retrievedOpId, _, retrievedPortId, _) = VFSURIFactory.decodeURI(uri) + retrievedOpId.nonEmpty && retrievedOpId.get == opId && + retrievedPortId.nonEmpty && retrievedPortId.get == portId && !retrievedPortId.get.internal + }) + } } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala index 8467e002d2..5953b029db 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala @@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.DocumentFactory.ICEBERG -import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} import edu.uci.ics.amber.core.storage.result._ @@ -93,37 +92,51 @@ object ExecutionResultService { } } - val storageUri = WorkflowExecutionsResource.getResultUriByExecutionAndPort( + // Cannot assume the storage is available at this point. The storage object is only available + // after a region is scheduled to execute. + val storageUriOption = WorkflowExecutionsResource.getResultUriByExecutionAndPort( workflowIdentity, executionId, physicalOps.head.id.logicalOpId, None, PortIdentity() ) - val storage: VirtualDocument[Tuple] = - DocumentFactory.openDocument(storageUri.get)._1.asInstanceOf[VirtualDocument[Tuple]] - val webUpdate = webOutputMode match { - case PaginationMode() => - val numTuples = storage.getCount - val maxPageIndex = - Math.ceil(numTuples / defaultPageSize.toDouble).toInt + storageUriOption match { + case Some(storageUri) => + val storage: VirtualDocument[Tuple] = + DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]] + val webUpdate = webOutputMode match { + case PaginationMode() => + val numTuples = storage.getCount + val maxPageIndex = + Math.ceil(numTuples / defaultPageSize.toDouble).toInt + // This can be extremly expensive when we have a lot of pages. + // It causes delays in some obseved cases. + // TODO: try to optimize this. + WebPaginationUpdate( + PaginationMode(), + newTupleCount, + (1 to maxPageIndex).toList + ) + case SetSnapshotMode() => + tuplesToWebData(webOutputMode, storage.get().toList) + case SetDeltaMode() => + val deltaList = storage.getAfter(oldTupleCount).toList + tuplesToWebData(webOutputMode, deltaList) + + case _ => + throw new RuntimeException( + "update mode combination not supported: " + (webOutputMode, outputMode) + ) + } + webUpdate + case None => WebPaginationUpdate( PaginationMode(), - newTupleCount, - (1 to maxPageIndex).toList - ) - case SetSnapshotMode() => - tuplesToWebData(webOutputMode, storage.get().toList) - case SetDeltaMode() => - val deltaList = storage.getAfter(oldTupleCount).toList - tuplesToWebData(webOutputMode, deltaList) - - case _ => - throw new RuntimeException( - "update mode combination not supported: " + (webOutputMode, outputMode) + 0, + List.empty ) } - webUpdate } /** @@ -264,6 +277,15 @@ class ExecutionResultService( } if (StorageConfig.resultStorageMode == ICEBERG && !hasSingleSnapshot) { + val layerName = physicalPlan.operators + .filter(physicalOp => + physicalOp.id.logicalOpId == opId && + physicalOp.outputPorts.keys.forall(outputPortId => !outputPortId.internal) + ) // TODO: Remove layerName and use GlobalPortIdentity for storage URIs + .headOption match { + case Some(physicalOp: PhysicalOp) => physicalOp.id.layerName + case None => "main" + } val storageUri = WorkflowExecutionsResource .getResultUriByExecutionAndPort( workflowIdentity, @@ -272,8 +294,10 @@ class ExecutionResultService( None, PortIdentity() ) - val opStorage = DocumentFactory.openDocument(storageUri.get)._1 - allTableStats(opId.id) = opStorage.getTableStatistics + if (storageUri.nonEmpty) { + val opStorage = DocumentFactory.openDocument(storageUri.get)._1 + allTableStats(opId.id) = opStorage.getTableStatistics + } } } Iterable( @@ -299,6 +323,7 @@ class ExecutionResultService( val latestExecutionId = getLatestExecutionId(workflowIdentity).getOrElse( throw new IllegalStateException("No execution is recorded") ) + val storageUriOption = WorkflowExecutionsResource.getResultUriByExecutionAndPort( workflowIdentity, latestExecutionId, @@ -336,10 +361,6 @@ class ExecutionResultService( val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = { WorkflowExecutionsResource .getResultUrisByExecutionId(executionId) - .filter(uri => { - val (_, _, _, _, _, resourceType) = VFSURIFactory.decodeURI(uri) - resourceType != MATERIALIZED_RESULT - }) .map(uri => { val count = DocumentFactory.openDocument(uri)._1.getCount.toInt diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala index a7fb16340b..d965aff393 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala @@ -95,14 +95,14 @@ class WorkflowExecutionService( } client = ComputingUnitMaster.createAmberRuntime( - workflowContext, + workflow.context, workflow.physicalPlan, controllerConfig, errorHandler ) executionReconfigurationService = new ExecutionReconfigurationService(client, executionStateStore, workflow) - executionStatsService = new ExecutionStatsService(client, executionStateStore, workflowContext) + executionStatsService = new ExecutionStatsService(client, executionStateStore, workflow.context) executionRuntimeService = new ExecutionRuntimeService( client, executionStateStore, @@ -115,11 +115,11 @@ class WorkflowExecutionService( sessionUri ) executionConsoleService = - new ExecutionConsoleService(client, executionStateStore, wsInput, workflowContext) + new ExecutionConsoleService(client, executionStateStore, wsInput, workflow.context) logger.info("Starting the workflow execution.") resultService.attachToExecution( - workflowContext.executionId, + workflow.context.executionId, executionStateStore, workflow.physicalPlan, client diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala index 875b91d6ea..a4bb928cfa 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala @@ -153,8 +153,8 @@ class WorkflowService( .toSeq localDisposable.addAll(subscriptions: _*) } - localDisposable.add(disposable) - localDisposable + // Note: this new CompositeDisposable is necessary. DO NOT OPTIMIZE. + new CompositeDisposable(localDisposable, disposable) } def disconnect(): Unit = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 1755705dbf..998b017fd0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -1,20 +1,18 @@ package edu.uci.ics.texera.workflow import com.typesafe.scalalogging.LazyLogging +import edu.uci.ics.amber.core.storage.VFSURIFactory import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping -import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory} -import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.core.workflow._ import edu.uci.ics.amber.engine.architecture.controller.Workflow +import edu.uci.ics.amber.engine.common.AmberConfig import edu.uci.ics.amber.engine.common.Utils.objectMapper import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory -import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode.SINGLE_SNAPSHOT -import edu.uci.ics.amber.core.workflow.PhysicalLink -import edu.uci.ics.amber.engine.common.AmberConfig import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource -import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.util.{Failure, Success, Try} @@ -23,18 +21,19 @@ class WorkflowCompiler( context: WorkflowContext ) extends LazyLogging { - // function to expand logical plan to physical plan + /** + * Function to expand logical plan to physical plan + * @return the expanded physical plan and a set of output ports that need storage + */ private def expandLogicalPlan( logicalPlan: LogicalPlan, logicalOpsToViewResult: List[String], errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] - ): PhysicalPlan = { + ): (PhysicalPlan, Set[GlobalPortIdentity]) = { val terminalLogicalOps = logicalPlan.getTerminalOperatorIds val toAddSink = (terminalLogicalOps ++ logicalOpsToViewResult.map(OperatorIdentity(_))).toSet var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty) - // create a JSON object that holds pointers to the workflow's results in Mongo - val resultsJSON = objectMapper.createObjectNode() - val sinksPointers = objectMapper.createArrayNode() + val outputPortsNeedingStorage: mutable.HashSet[GlobalPortIdentity] = mutable.HashSet() logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => Try { @@ -121,35 +120,9 @@ class WorkflowCompiler( outputPortId ) ) - // Determine the storage type, defaulting to iceberg for large HTML visualizations - val storageType = - if (outputPort.mode == SINGLE_SNAPSHOT) DocumentFactory.ICEBERG - else StorageConfig.resultStorageMode - - // Create storage if it doesn't exist - val sinkStorageSchema = - schema.getOrElse(throw new IllegalStateException("Schema is missing")) - - // create the storage resource and record the URI - DocumentFactory.createDocument(storageUri.get, sinkStorageSchema) - WorkflowExecutionsResource.insertOperatorPortResultUri( - context.executionId, - physicalOp.id.logicalOpId, - physicalOp.id.layerName, - outputPortId, - storageUri.get - ) - - // Add sink collection name to the JSON array of sinks - sinksPointers.add( - objectMapper - .createObjectNode() - .put("storageType", storageType) - .put("storageKey", storageUri.get.toString) - ) } - // TODO: remove + // TODO: remove sink operator in the next PR // Create and link the sink operator val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( storageUri.get, @@ -163,6 +136,11 @@ class WorkflowCompiler( ) physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) + + outputPortsNeedingStorage += GlobalPortIdentity( + opId = physicalOp.id, + portId = outputPortId + ) } } } match { @@ -175,13 +153,7 @@ class WorkflowCompiler( } } ) - - // update execution entry in MySQL to have pointers to the mongo collections - resultsJSON.set("results", sinksPointers) - ExecutionsMetadataPersistService.tryUpdateExistingExecution(context.executionId) { - _.setResult(resultsJSON.toString) - } - physicalPlan + (physicalPlan, outputPortsNeedingStorage.toSet) } /** @@ -203,8 +175,12 @@ class WorkflowCompiler( // 2. resolve the file name in each scan source operator logicalPlan.resolveScanSourceOpFileName(None) - // 3. expand the logical plan to the physical plan, and assign storage - val physicalPlan = expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) + // 3. expand the logical plan to the physical plan, and get a set of output ports that need storage + val (physicalPlan, outputPortsNeedingStorage) = + expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None) + + context.workflowSettings = + WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage) Workflow(context, logicalPlan, physicalPlan) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala index 4fb4e45ce8..7ad5b600df 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManagerSpec.scala @@ -47,7 +47,7 @@ class OutputManagerSpec extends AnyFlatSpec with MockFactory { "OutputManager" should "aggregate tuples and output" in { val outputManager = wire[OutputManager] val mockPortId = PortIdentity() - outputManager.addPort(mockPortId, schema) + outputManager.addPort(mockPortId, schema, None) val tuples = Array.fill(21)( TupleLike(1, 2, 3, 4, "5", 9.8).enforceSchema(schema) @@ -76,7 +76,7 @@ class OutputManagerSpec extends AnyFlatSpec with MockFactory { OneToOnePartitioning(10, fakeReceiver.toSeq) ) tuples.foreach { t => - outputManager.passTupleToDownstream(TupleLike(t.getFields), None) + outputManager.passTupleToDownstream(TupleLike(t.getFields).enforceSchema(schema), None) } outputManager.emitMarker(EndOfInputChannel()) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala index 2ef61da21e..3cd1baac03 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorSpec.scala @@ -95,7 +95,7 @@ class DataProcessorSpec extends AnyFlatSpec with MockFactory with BeforeAndAfter dp.inputGateway .getChannel(ChannelIdentity(senderWorkerId, testWorkerId, isControl = false)) .setPortId(inputPortId) - dp.outputManager.addPort(outputPortId, schema) + dp.outputManager.addPort(outputPortId, schema, None) dp.processControlPayload( ChannelIdentity(CONTROLLER, testWorkerId, isControl = true), ControlInvocation( @@ -154,7 +154,7 @@ class DataProcessorSpec extends AnyFlatSpec with MockFactory with BeforeAndAfter dp.inputGateway .getChannel(ChannelIdentity(senderWorkerId, testWorkerId, isControl = false)) .setPortId(inputPortId) - dp.outputManager.addPort(outputPortId, schema) + dp.outputManager.addPort(outputPortId, schema, None) dp.processControlPayload( ChannelIdentity(CONTROLLER, testWorkerId, isControl = true), ControlInvocation( diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala index 85ac8aa0fb..4bd25e67b1 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/worker/WorkerSpec.scala @@ -147,13 +147,13 @@ class WorkerSpec ) val addPort1 = AsyncRPCClient.ControlInvocation( METHOD_ASSIGN_PORT, - AssignPortRequest(mockPortId, input = true, mkSchema(1).toRawSchema), + AssignPortRequest(mockPortId, input = true, mkSchema(1).toRawSchema, ""), AsyncRPCContext(CONTROLLER, identifier1), 1 ) val addPort2 = AsyncRPCClient.ControlInvocation( METHOD_ASSIGN_PORT, - AssignPortRequest(mockPortId, input = false, mkSchema(1).toRawSchema), + AssignPortRequest(mockPortId, input = false, mkSchema(1).toRawSchema, ""), AsyncRPCContext(CONTROLLER, identifier1), 2 ) diff --git a/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/core/workflow.proto b/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/core/workflow.proto index 55180f3aa6..5cb385327a 100644 --- a/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/core/workflow.proto +++ b/core/workflow-core/src/main/protobuf/edu/uci/ics/amber/core/workflow.proto @@ -16,6 +16,12 @@ message PortIdentity { bool internal = 2; } +message GlobalPortIdentity{ + PhysicalOpIdentity opId = 1 [(scalapb.field).no_box = true]; + PortIdentity portId = 2 [(scalapb.field).no_box = true]; + bool input = 3; +} + message InputPort { PortIdentity id = 1 [(scalapb.field).no_box = true]; string displayName = 2; diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala index 489c6d9f46..5d057a8743 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/DocumentFactory.scala @@ -50,9 +50,9 @@ object DocumentFactory { val storageKey = sanitizeURIPath(uri) val namespace = resourceType match { - case RESULT | MATERIALIZED_RESULT => StorageConfig.icebergTableResultNamespace - case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace - case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case RESULT => StorageConfig.icebergTableResultNamespace + case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace + case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -104,9 +104,9 @@ object DocumentFactory { val storageKey = sanitizeURIPath(uri) val namespace = resourceType match { - case RESULT | MATERIALIZED_RESULT => StorageConfig.icebergTableResultNamespace - case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace - case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case RESULT => StorageConfig.icebergTableResultNamespace + case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace + case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala index 3e36092cbc..b822144a2a 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/VFSURIFactory.scala @@ -11,7 +11,6 @@ import java.net.URI object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") - val MATERIALIZED_RESULT: Value = Value("materializedResult") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") } @@ -104,26 +103,6 @@ object VFSURIFactory { ) } - /** - * Create a URI pointing to a materialized storage - */ - def createMaterializedResultURI( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity, - operatorId: OperatorIdentity, - layerName: String, - portIdentity: PortIdentity - ): URI = { - createVFSURI( - VFSResourceType.MATERIALIZED_RESULT, - workflowId, - executionId, - Some(operatorId), - Some(layerName), - Some(portIdentity) - ) - } - /** * Create a URI pointing to runtime statistics */ @@ -174,9 +153,7 @@ object VFSURIFactory { portIdentity: Option[PortIdentity] = None ): URI = { - if ( - (resourceType == VFSResourceType.RESULT || resourceType == VFSResourceType.MATERIALIZED_RESULT) && (portIdentity.isEmpty || operatorId.isEmpty) - ) { + if (resourceType == VFSResourceType.RESULT && (portIdentity.isEmpty || operatorId.isEmpty)) { throw new IllegalArgumentException( "PortIdentity must be provided when resourceType is RESULT or MATERIALIZED_RESULT." ) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala index 91eae41e7c..e9a8942052 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala @@ -2,7 +2,7 @@ package edu.uci.ics.amber.core.workflow import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties} import com.typesafe.scalalogging.LazyLogging -import edu.uci.ics.amber.core.executor.{OpExecWithCode, OpExecInitInfo} +import edu.uci.ics.amber.core.executor.{OpExecInitInfo, OpExecWithCode} import edu.uci.ics.amber.core.tuple.Schema import edu.uci.ics.amber.core.virtualidentity.{ ExecutionIdentity, @@ -13,6 +13,7 @@ import edu.uci.ics.amber.core.virtualidentity.{ import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} import org.jgrapht.traverse.TopologicalOrderIterator +import java.net.URI import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala index 28b9bb3185..474f66d670 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala @@ -11,7 +11,7 @@ object WorkflowContext { val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L) val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( - 400 // TODO: make this configurable + dataTransferBatchSize = 400 // TODO: make this configurable ) } class WorkflowContext( diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala index d8d5d67ee9..7506d9089a 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala @@ -1,3 +1,6 @@ package edu.uci.ics.amber.core.workflow -case class WorkflowSettings(dataTransferBatchSize: Int) +case class WorkflowSettings( + dataTransferBatchSize: Int, + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty +) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index e2855b3328..8af44f8d91 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -70,7 +70,8 @@ object SpecialPhysicalOpFactory { executionIdentity: ExecutionIdentity, uri: URI, downstreamOperator: PhysicalOpIdentity, - downstreamPort: PortIdentity + downstreamPort: PortIdentity, + schema: Schema ): PhysicalOp = { val (_, _, opId, layerName, portId, _) = VFSURIFactory.decodeURI(uri) @@ -89,11 +90,7 @@ object SpecialPhysicalOpFactory { .withInputPorts(List.empty) .withOutputPorts(List(outputPort)) .withPropagateSchema( - SchemaPropagationFunc(_ => - Map(outputPort.id -> { - DocumentFactory.openDocument(uri)._2.get - }) - ) + SchemaPropagationFunc(_ => Map(outputPort.id -> schema)) ) .propagateSchema() diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala index 02f337e746..2532f5b01c 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala @@ -14,12 +14,12 @@ class ProgressiveSinkOpExec( outputMode: OutputMode, storageURI: URI ) extends SinkOperatorExecutor { - val (doc, _) = DocumentFactory.openDocument(storageURI) - val writer: BufferedItemWriter[Tuple] = - doc.writer(workerId.toString).asInstanceOf[BufferedItemWriter[Tuple]] +// val (doc, _) = DocumentFactory.openDocument(storageURI) +// val writer: BufferedItemWriter[Tuple] = +// doc.writer(workerId.toString).asInstanceOf[BufferedItemWriter[Tuple]] override def open(): Unit = { - writer.open() +// writer.open() } override def consumeTuple( @@ -28,7 +28,7 @@ class ProgressiveSinkOpExec( ): Unit = { outputMode match { case OutputMode.SET_SNAPSHOT | OutputMode.SINGLE_SNAPSHOT => updateSetSnapshot(tuple) - case OutputMode.SET_DELTA => writer.putOne(tuple) + case OutputMode.SET_DELTA => case _ => throw new UnsupportedOperationException("Unsupported output mode") } } @@ -37,14 +37,14 @@ class ProgressiveSinkOpExec( val (isInsertion, tupleValue) = ProgressiveUtils.getTupleFlagAndValue(deltaUpdate) if (isInsertion) { - writer.putOne(tupleValue) +// writer.putOne(tupleValue) } else { - writer.removeOne(tupleValue) +// writer.removeOne(tupleValue) } } override def onFinishMultiPort(port: Int): Iterator[(TupleLike, Option[PortIdentity])] = { - writer.close() +// writer.close() Iterator.empty } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala index e2fb7fb95d..ae78ee0ecd 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpExec.scala @@ -2,17 +2,16 @@ package edu.uci.ics.amber.operator.source.cache import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.executor.SourceOperatorExecutor -import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} -import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSResourceType, VFSURIFactory} import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} import java.net.URI class CacheSourceOpExec(storageUri: URI) extends SourceOperatorExecutor with LazyLogging { val (_, _, _, _, _, resourceType) = VFSURIFactory.decodeURI(storageUri) - if (resourceType != MATERIALIZED_RESULT) { - throw new RuntimeException("The storage URI must point to an materialized result storage") + if (resourceType != VFSResourceType.RESULT) { + throw new RuntimeException("The storage URI must point to a result storage") } private val storage =