Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Output Ports of an Operator to Write Storage #3295

Open
wants to merge 65 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
758096b
java works.
Xiao-zhen-Liu Feb 5, 2025
a932abf
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 5, 2025
f747691
Update python proto.
Xiao-zhen-Liu Feb 5, 2025
d4f3535
Python works.
Xiao-zhen-Liu Feb 5, 2025
48937bf
remove sinkOpExec actions.
Xiao-zhen-Liu Feb 5, 2025
f29523e
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 5, 2025
10e6d75
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 10, 2025
8b8e117
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 12, 2025
83797ed
Add async result writing on Java.
Xiao-zhen-Liu Feb 12, 2025
b375367
Add async result writing on Java.
Xiao-zhen-Liu Feb 12, 2025
edf211c
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 13, 2025
254e283
Merge branch 'master' into xiaozhen-use-output-port-for-storage
Xiao-zhen-Liu Feb 14, 2025
104885d
Add threaded port result writer in python.
Xiao-zhen-Liu Feb 19, 2025
a6302fc
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 19, 2025
77d9fc4
Temp with materialized port storage.
Xiao-zhen-Liu Feb 20, 2025
72d8212
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 20, 2025
6812468
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 24, 2025
6b459d5
Modify psql scripts and joop generated code.
Xiao-zhen-Liu Feb 24, 2025
8c65ea8
Ensure the position of new column; use layer_name instead of layer_id.
Xiao-zhen-Liu Feb 25, 2025
52994ca
Working version with a fix on sinkOp.
Xiao-zhen-Liu Feb 25, 2025
9ea150d
Fix test.
Xiao-zhen-Liu Feb 25, 2025
de79dd2
Merge branch 'master' into xiaozhen-phy-op-id-for-storage
Xiao-zhen-Liu Feb 25, 2025
06eed21
Fix fmt.
Xiao-zhen-Liu Feb 25, 2025
afac1e9
Merge branch 'refs/heads/xiaozhen-phy-op-id-for-storage' into xiaozhe…
Xiao-zhen-Liu Feb 25, 2025
304e70c
Fix.
Xiao-zhen-Liu Feb 26, 2025
13dc90c
Merge branch 'master' into xiaozhen-phy-op-id-for-storage
Xiao-zhen-Liu Feb 26, 2025
b9b26fa
Move storage object creation to resource allocation.
Xiao-zhen-Liu Feb 26, 2025
a356938
Remove sink operator generation.
Xiao-zhen-Liu Feb 26, 2025
5073b04
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 26, 2025
4cef39c
Merge branch 'master' into xiaozhen-phy-op-id-for-storage
Xiao-zhen-Liu Feb 26, 2025
528cd31
Merge branch 'refs/heads/xiaozhen-phy-op-id-for-storage' into xiaozhe…
Xiao-zhen-Liu Feb 26, 2025
3e09731
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 27, 2025
6947da0
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 28, 2025
689521d
Working version.
Xiao-zhen-Liu Feb 28, 2025
cb11da6
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 28, 2025
8cef489
Remove storageUri from OutputPort.
Xiao-zhen-Liu Feb 28, 2025
564d580
Java multiple writer threads.
Xiao-zhen-Liu Mar 1, 2025
9c5d360
Fix materialized ports not being added in regions.
Xiao-zhen-Liu Mar 1, 2025
30b70d7
Fix view result ports not being added in regions because of dependee …
Xiao-zhen-Liu Mar 2, 2025
a920071
Change python to multi-thread writers.
Xiao-zhen-Liu Mar 2, 2025
12a4e8f
Refactor GlobalPortIdentity
Xiao-zhen-Liu Mar 2, 2025
d92bb9d
Refactor region outputPortResultURIs
Xiao-zhen-Liu Mar 2, 2025
9598418
Refactor.
Xiao-zhen-Liu Mar 2, 2025
27150d3
fmt.
Xiao-zhen-Liu Mar 2, 2025
3abe8c6
Merge branch 'master' into xiaozhen-use-output-port-for-storage
Xiao-zhen-Liu Mar 2, 2025
19b2795
Refactor.
Xiao-zhen-Liu Mar 2, 2025
b69d687
java test fix
Xiao-zhen-Liu Mar 3, 2025
b975b38
fix ExpansionGreedyScheduleGenerator
Xiao-zhen-Liu Mar 4, 2025
a59cdb8
fix dev mode
Xiao-zhen-Liu Mar 4, 2025
0bd4a0d
Fix circular python module dependency.
Xiao-zhen-Liu Mar 4, 2025
905d4a0
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Mar 4, 2025
e7fec23
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Mar 4, 2025
0151a7f
python minor comments.
Xiao-zhen-Liu Mar 5, 2025
d2887a8
Scala minor comments.
Xiao-zhen-Liu Mar 5, 2025
291dbe0
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Mar 5, 2025
01a570c
fmt.
Xiao-zhen-Liu Mar 5, 2025
d69a6b0
Python main changes.
Xiao-zhen-Liu Mar 5, 2025
a3df014
Scala writer thread changes.
Xiao-zhen-Liu Mar 5, 2025
d10316b
Address comments.
Xiao-zhen-Liu Mar 5, 2025
7bdaa3c
Address comments.
Xiao-zhen-Liu Mar 5, 2025
c78591d
Fix.
Xiao-zhen-Liu Mar 5, 2025
aade79a
Minor refactoring.
Xiao-zhen-Liu Mar 7, 2025
e395b70
Renaming.
Xiao-zhen-Liu Mar 7, 2025
5d293c6
Refactor ResourceConfig.
Xiao-zhen-Liu Mar 7, 2025
0172723
Merge branch 'master' into xiaozhen-use-output-port-for-storage
Xiao-zhen-Liu Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ message AssignPortRequest {
core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
bool input = 2;
map<string, string> schema = 3;
string storageUri = 4;
}

message FinalizeCheckpointRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ async def assign_port(self, req: AssignPortRequest) -> EmptyReturn:
)
else:
self.context.output_manager.add_output_port(
req.port_id, Schema(raw_schema=req.schema)
req.port_id, Schema(raw_schema=req.schema), req.storage_uri
)
return EmptyReturn()
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import threading
import typing
from collections import OrderedDict
from itertools import chain
from typing import Iterable, Iterator

from loguru import logger
from pyarrow import Table
from typing import Iterable, Iterator

from core.architecture.packaging.input_manager import WorkerPort, Channel
from core.architecture.sendsemantics.broad_cast_partitioner import (
Expand All @@ -23,7 +25,17 @@
from core.models import Tuple, Schema, MarkerFrame
from core.models.marker import Marker
from core.models.payload import DataPayload, DataFrame
from core.storage.runnables.port_result_writer import PortResultWriter
from core.storage.document_factory import DocumentFactory
from core.storage.model.virtual_document import VirtualDocument
from core.util import get_one_of
from core.util.virtual_identity import get_worker_index
from proto.edu.uci.ics.amber.core import (
ActorVirtualIdentity,
PhysicalLink,
PortIdentity,
ChannelIdentity,
)
from proto.edu.uci.ics.amber.engine.architecture.rpc import ChannelMarkerPayload
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import (
HashBasedShufflePartitioning,
Expand All @@ -33,12 +45,6 @@
RangeBasedShufflePartitioning,
BroadcastPartitioning,
)
from proto.edu.uci.ics.amber.core import (
ActorVirtualIdentity,
PhysicalLink,
PortIdentity,
ChannelIdentity,
)


class OutputManager:
Expand All @@ -56,13 +62,28 @@ def __init__(self, worker_id: str):
}
self._ports: typing.Dict[PortIdentity, WorkerPort] = dict()
self._channels: typing.Dict[ChannelIdentity, Channel] = dict()
self._port_result_writers: typing.Dict[PortIdentity, PortResultWriter] = dict()

def add_output_port(self, port_id: PortIdentity, schema: Schema) -> None:
def add_output_port(
self, port_id: PortIdentity, schema: Schema, storage_uri: str
) -> None:
if port_id.id is None:
port_id.id = 0
if port_id.internal is None:
port_id.internal = False

if storage_uri != "":
document: VirtualDocument[Tuple]
document, _ = DocumentFactory.open_document(storage_uri)
writer = document.writer(str(get_worker_index(self.worker_id)))
writer_thread = PortResultWriter(writer)
threading.Thread(
target=writer_thread.run,
daemon=True,
name=f"port_storage_writer_thread_{port_id}",
).start()
self._port_result_writers[port_id] = writer_thread

# each port can only be added and initialized once.
if port_id not in self._ports:
self._ports[port_id] = WorkerPort(schema)
Expand All @@ -73,6 +94,17 @@ def get_port(self, port_id=None) -> WorkerPort:
def get_output_channel_ids(self):
return self._channels.keys()

def save_tuple_to_storage_if_needed(self, amber_tuple: Tuple, port_id=None) -> None:
if port_id is None:
for writer_thread in self._port_result_writers.values():
writer_thread.put_tuple(amber_tuple)
elif port_id in self._port_result_writers.keys():
self._port_result_writers[port_id].put_tuple(amber_tuple)

def close_output_storage_writers(self) -> None:
for writer_thread in self._port_result_writers.values():
writer_thread.stop()

def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None:
"""
Add down stream operator and its transfer policy
Expand Down
5 changes: 5 additions & 0 deletions core/amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def process_input_tuple(self) -> None:
payload=batch,
)
)
self.context.output_manager.save_tuple_to_storage_if_needed(
output_tuple
)

def process_input_state(self) -> None:
self._switch_context()
Expand Down Expand Up @@ -288,6 +291,8 @@ def _process_end_of_output_ports(self, _: EndOfOutputPorts) -> None:

:param _: EndOfOutputPorts
"""
self.context.output_manager.close_output_storage_writers()

for to, batch in self.context.output_manager.emit_marker(EndOfInputChannel()):
self._output_queue.put(
DataElement(
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import queue
import threading

from core.models import Tuple
from core.storage.model.buffered_item_writer import BufferedItemWriter
from core.util import Stoppable
from core.util.runnable.runnable import Runnable


class PortResultWriter(Runnable, Stoppable):
def __init__(self, writer: BufferedItemWriter):
self.writer: BufferedItemWriter = writer
self.queue = queue.Queue()
self.stopped = False
self.stop_event = threading.Event()

def put_tuple(self, amber_tuple: Tuple):
assert not self.stopped, "Cannot put tuple after termination."
self.queue.put(amber_tuple)

def run(self) -> None:
internal_stop = False
while not internal_stop:
queue_content = self.queue.get(block=True)
if queue_content is None:
internal_stop = True
else:
self.writer.put_one(queue_content)
self.writer.close()
self.stop_event.set() # Signal that run() has fully stopped

def stop(self) -> None:
self.stopped = True
self.queue.put(None) # Signal termination
self.stop_event.wait() # Block until run() completes
2 changes: 1 addition & 1 deletion core/amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def initialize(
cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = commit_batch_size
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
cls._initialized = True

def __new__(cls, *args, **kwargs):
Expand Down
10 changes: 10 additions & 0 deletions core/amber/src/main/python/core/util/virtual_identity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import re

worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)")


def get_worker_index(worker_id: str) -> int:
match = worker_name_pattern.match(worker_id)
if match:
return int(match.group(2))
raise ValueError("Invalid worker ID format")

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion core/amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from loguru import logger

from core.python_worker import PythonWorker
from core.storage.storage_config import StorageConfig


def init_loguru_logger(stream_log_level) -> None:
Expand All @@ -22,8 +23,28 @@ def init_loguru_logger(stream_log_level) -> None:


if __name__ == "__main__":
_, worker_id, output_port, logger_level, r_path = sys.argv
(
_,
worker_id,
output_port,
logger_level,
r_path,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
)

# Setting R_HOME environment variable for R-UDF usage
if r_path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package edu.uci.ics.amber.engine.architecture.controller.promisehandlers

import com.twitter.util.Future
import edu.uci.ics.amber.core.WorkflowRuntimeException
import edu.uci.ics.amber.core.workflow.GlobalPortIdentity
import edu.uci.ics.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
FatalError
Expand All @@ -12,7 +13,6 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{
QueryStatisticsRequest
}
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import edu.uci.ics.amber.engine.architecture.scheduling.GlobalPortIdentity
import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
import edu.uci.ics.amber.util.VirtualIdentityUtils

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package edu.uci.ics.amber.engine.architecture.messaginglayer

import edu.uci.ics.amber.core.marker.Marker
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.BufferedItemWriter
import edu.uci.ics.amber.core.tuple.{
FinalizeExecutor,
FinalizePort,
Schema,
SchemaEnforceable,
Tuple,
TupleLike
}
import edu.uci.ics.amber.engine.architecture.messaginglayer.OutputManager.{
Expand All @@ -18,7 +21,10 @@ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings._
import edu.uci.ics.amber.engine.common.AmberLogging
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity}
import edu.uci.ics.amber.engine.architecture.worker.managers.OutputPortResultWriterThread
import edu.uci.ics.amber.util.VirtualIdentityUtils

import java.net.URI
import scala.collection.mutable

object OutputManager {
Expand Down Expand Up @@ -99,6 +105,10 @@ class OutputManager(
private val networkOutputBuffers =
mutable.HashMap[(PhysicalLink, ActorVirtualIdentity), NetworkOutputBuffer]()

private val outputPortResultWriterThreads
: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()

/**
* Add down stream operator and its corresponding Partitioner.
*
Expand Down Expand Up @@ -142,6 +152,25 @@ class OutputManager(
}
}

def saveTupleToStorageIfNeeded(
tupleLike: SchemaEnforceable,
outputPortId: Option[PortIdentity] = None
): Unit = {
(outputPortId match {
case Some(portId) =>
this.outputPortResultWriterThreads.get(portId) match {
case Some(_) => this.outputPortResultWriterThreads.filter(_._1 == portId)
case None => Map.empty
}
case None => this.outputPortResultWriterThreads
}).foreach({
case (portId, writerThread) =>
val tuple = tupleLike.enforceSchema(this.getPort(portId).schema)
// write to storage in a separate thread
writerThread.putTuple(tuple)
})
}

/**
* Flushes the network output buffers based on the specified set of physical links.
*
Expand Down Expand Up @@ -170,13 +199,26 @@ class OutputManager(
networkOutputBuffers.foreach(kv => kv._2.sendMarker(marker))
}

def addPort(portId: PortIdentity, schema: Schema): Unit = {
def addPort(portId: PortIdentity, schema: Schema, storageUriOptional: Option[URI]): Unit = {
// each port can only be added and initialized once.
if (this.ports.contains(portId)) {
return
}
this.ports(portId) = WorkerPort(schema)

// set up result storage writer
storageUriOptional match {
case Some(storageUri) =>
val writer = DocumentFactory
.openDocument(storageUri)
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
val writerThread = new OutputPortResultWriterThread(writer)
this.outputPortResultWriterThreads(portId) = writerThread
writerThread.start()
case None => // No need to add a writer
}
}

def getPort(portId: PortIdentity): WorkerPort = ports(portId)
Expand All @@ -191,6 +233,10 @@ class OutputManager(
outputIterator.appendSpecialTupleToEnd(FinalizeExecutor())
}

def closeOutputStorageWriters(): Unit = {
this.outputPortResultWriterThreads.values.foreach(writer => writer.terminate())
}

def getSingleOutputPortIdentity: PortIdentity = {
assert(ports.size == 1, "expect 1 output port, got " + ports.size)
ports.head._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.Props
import com.twitter.util.Promise
import com.typesafe.config.{Config, ConfigFactory}
import edu.uci.ics.amber.core.virtualidentity.ChannelIdentity
import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.engine.architecture.common.WorkflowActor
import edu.uci.ics.amber.engine.architecture.common.WorkflowActor.NetworkAck
import edu.uci.ics.amber.engine.architecture.messaginglayer.{
Expand Down Expand Up @@ -159,7 +160,13 @@ class PythonWorkflowWorker(
workerConfig.workerId.name,
Integer.toString(pythonProxyServer.getPortNumber.get()),
config.getString("python.log.streamHandler.level"),
RENVPath
RENVPath,
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
StorageConfig.icebergPostgresCatalogUsername,
StorageConfig.icebergPostgresCatalogPassword,
StorageConfig.icebergTableResultNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString
)
).run(BasicIO.standard(false))
}
Expand Down
Loading
Loading