Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Output Ports of an Operator to Write Storage #3295

Open
wants to merge 65 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
758096b
java works.
Xiao-zhen-Liu Feb 5, 2025
a932abf
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 5, 2025
f747691
Update python proto.
Xiao-zhen-Liu Feb 5, 2025
d4f3535
Python works.
Xiao-zhen-Liu Feb 5, 2025
48937bf
remove sinkOpExec actions.
Xiao-zhen-Liu Feb 5, 2025
f29523e
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 5, 2025
10e6d75
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 10, 2025
8b8e117
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 12, 2025
83797ed
Add async result writing on Java.
Xiao-zhen-Liu Feb 12, 2025
b375367
Add async result writing on Java.
Xiao-zhen-Liu Feb 12, 2025
edf211c
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 13, 2025
254e283
Merge branch 'master' into xiaozhen-use-output-port-for-storage
Xiao-zhen-Liu Feb 14, 2025
104885d
Add threaded port result writer in python.
Xiao-zhen-Liu Feb 19, 2025
a6302fc
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 19, 2025
77d9fc4
Temp with materialized port storage.
Xiao-zhen-Liu Feb 20, 2025
72d8212
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 20, 2025
6812468
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 24, 2025
6b459d5
Modify psql scripts and joop generated code.
Xiao-zhen-Liu Feb 24, 2025
8c65ea8
Ensure the position of new column; use layer_name instead of layer_id.
Xiao-zhen-Liu Feb 25, 2025
52994ca
Working version with a fix on sinkOp.
Xiao-zhen-Liu Feb 25, 2025
9ea150d
Fix test.
Xiao-zhen-Liu Feb 25, 2025
de79dd2
Merge branch 'master' into xiaozhen-phy-op-id-for-storage
Xiao-zhen-Liu Feb 25, 2025
06eed21
Fix fmt.
Xiao-zhen-Liu Feb 25, 2025
afac1e9
Merge branch 'refs/heads/xiaozhen-phy-op-id-for-storage' into xiaozhe…
Xiao-zhen-Liu Feb 25, 2025
304e70c
Fix.
Xiao-zhen-Liu Feb 26, 2025
13dc90c
Merge branch 'master' into xiaozhen-phy-op-id-for-storage
Xiao-zhen-Liu Feb 26, 2025
b9b26fa
Move storage object creation to resource allocation.
Xiao-zhen-Liu Feb 26, 2025
a356938
Remove sink operator generation.
Xiao-zhen-Liu Feb 26, 2025
5073b04
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 26, 2025
4cef39c
Merge branch 'master' into xiaozhen-phy-op-id-for-storage
Xiao-zhen-Liu Feb 26, 2025
528cd31
Merge branch 'refs/heads/xiaozhen-phy-op-id-for-storage' into xiaozhe…
Xiao-zhen-Liu Feb 26, 2025
3e09731
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 27, 2025
6947da0
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 28, 2025
689521d
Working version.
Xiao-zhen-Liu Feb 28, 2025
cb11da6
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Feb 28, 2025
8cef489
Remove storageUri from OutputPort.
Xiao-zhen-Liu Feb 28, 2025
564d580
Java multiple writer threads.
Xiao-zhen-Liu Mar 1, 2025
9c5d360
Fix materialized ports not being added in regions.
Xiao-zhen-Liu Mar 1, 2025
30b70d7
Fix view result ports not being added in regions because of dependee …
Xiao-zhen-Liu Mar 2, 2025
a920071
Change python to multi-thread writers.
Xiao-zhen-Liu Mar 2, 2025
12a4e8f
Refactor GlobalPortIdentity
Xiao-zhen-Liu Mar 2, 2025
d92bb9d
Refactor region outputPortResultURIs
Xiao-zhen-Liu Mar 2, 2025
9598418
Refactor.
Xiao-zhen-Liu Mar 2, 2025
27150d3
fmt.
Xiao-zhen-Liu Mar 2, 2025
3abe8c6
Merge branch 'master' into xiaozhen-use-output-port-for-storage
Xiao-zhen-Liu Mar 2, 2025
19b2795
Refactor.
Xiao-zhen-Liu Mar 2, 2025
b69d687
java test fix
Xiao-zhen-Liu Mar 3, 2025
b975b38
fix ExpansionGreedyScheduleGenerator
Xiao-zhen-Liu Mar 4, 2025
a59cdb8
fix dev mode
Xiao-zhen-Liu Mar 4, 2025
0bd4a0d
Fix circular python module dependency.
Xiao-zhen-Liu Mar 4, 2025
905d4a0
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Mar 4, 2025
e7fec23
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Mar 4, 2025
0151a7f
python minor comments.
Xiao-zhen-Liu Mar 5, 2025
d2887a8
Scala minor comments.
Xiao-zhen-Liu Mar 5, 2025
291dbe0
Merge branch 'refs/heads/master' into xiaozhen-use-output-port-for-st…
Xiao-zhen-Liu Mar 5, 2025
01a570c
fmt.
Xiao-zhen-Liu Mar 5, 2025
d69a6b0
Python main changes.
Xiao-zhen-Liu Mar 5, 2025
a3df014
Scala writer thread changes.
Xiao-zhen-Liu Mar 5, 2025
d10316b
Address comments.
Xiao-zhen-Liu Mar 5, 2025
7bdaa3c
Address comments.
Xiao-zhen-Liu Mar 5, 2025
c78591d
Fix.
Xiao-zhen-Liu Mar 5, 2025
aade79a
Minor refactoring.
Xiao-zhen-Liu Mar 7, 2025
e395b70
Renaming.
Xiao-zhen-Liu Mar 7, 2025
5d293c6
Refactor ResourceConfig.
Xiao-zhen-Liu Mar 7, 2025
0172723
Merge branch 'master' into xiaozhen-use-output-port-for-storage
Xiao-zhen-Liu Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ message AssignPortRequest {
core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
bool input = 2;
map<string, string> schema = 3;
string storageUri = 4;
}

message FinalizeCheckpointRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ async def assign_port(self, req: AssignPortRequest) -> EmptyReturn:
req.port_id, Schema(raw_schema=req.schema)
)
else:
storage_uri = None
if req.storage_uri != "":
storage_uri = req.storage_uri
self.context.output_manager.add_output_port(
req.port_id, Schema(raw_schema=req.schema)
req.port_id, Schema(raw_schema=req.schema), storage_uri
)
return EmptyReturn()
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import threading
import typing
from collections import OrderedDict
from itertools import chain
from queue import Queue
from threading import Thread
from typing import Iterable, Iterator

from loguru import logger
from pyarrow import Table
from typing import Iterable, Iterator

from core.architecture.packaging.input_manager import WorkerPort, Channel
from core.architecture.sendsemantics.broad_cast_partitioner import (
Expand All @@ -23,7 +27,19 @@
from core.models import Tuple, Schema, MarkerFrame
from core.models.marker import Marker
from core.models.payload import DataPayload, DataFrame
from core.storage.document_factory import DocumentFactory
from core.storage.runnables.port_storage_writer import (
PortStorageWriter,
PortStorageWriterElement,
)
from core.util import get_one_of
from core.util.virtual_identity import get_worker_index
from proto.edu.uci.ics.amber.core import (
ActorVirtualIdentity,
PhysicalLink,
PortIdentity,
ChannelIdentity,
)
from proto.edu.uci.ics.amber.engine.architecture.rpc import ChannelMarkerPayload
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import (
HashBasedShufflePartitioning,
Expand All @@ -33,12 +49,6 @@
RangeBasedShufflePartitioning,
BroadcastPartitioning,
)
from proto.edu.uci.ics.amber.core import (
ActorVirtualIdentity,
PhysicalLink,
PortIdentity,
ChannelIdentity,
)


class OutputManager:
Expand All @@ -56,23 +66,89 @@ def __init__(self, worker_id: str):
}
self._ports: typing.Dict[PortIdentity, WorkerPort] = dict()
self._channels: typing.Dict[ChannelIdentity, Channel] = dict()
self._port_storage_writers: typing.Dict[
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

def add_output_port(self, port_id: PortIdentity, schema: Schema) -> None:
def add_output_port(
self,
port_id: PortIdentity,
schema: Schema,
storage_uri: typing.Optional[str] = None,
) -> None:
if port_id.id is None:
port_id.id = 0
if port_id.internal is None:
port_id.internal = False

if storage_uri is not None:
self.set_up_port_storage_writer(port_id, storage_uri)

# each port can only be added and initialized once.
if port_id not in self._ports:
self._ports[port_id] = WorkerPort(schema)

def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
"""
Create a separate thread for saving output tuples of a port
to storage in batch.
"""
document, _ = DocumentFactory.open_document(storage_uri)
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
port_storage_writer = PortStorageWriter(
buffered_item_writer=buffered_item_writer, queue=writer_queue
)
writer_thread = threading.Thread(
target=port_storage_writer.run,
daemon=True,
name=f"port_storage_writer_thread_{port_id}",
)
writer_thread.start()
self._port_storage_writers[port_id] = (
writer_queue,
port_storage_writer,
writer_thread,
)

def get_port(self, port_id=None) -> WorkerPort:
return list(self._ports.values())[0]

def get_output_channel_ids(self):
return self._channels.keys()

def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
"""
Optionally write the tuple to storage if the specified output port
is determined by the scheduler to need storage. This method is not blocking
because a separate thread is used to flush the tuple to storage in batch.
:param tuple_: A tuple produced by the data processor.
:param port_id: If not specified, the tuple will be written to all
output ports that need storage.
:return:
"""
if port_id is None:
for writer_queue, _, _ in self._port_storage_writers.values():
writer_queue.put(PortStorageWriterElement(data_tuple=tuple_))
elif port_id in self._port_storage_writers.keys():
self._port_storage_writers[port_id][0].put(
PortStorageWriterElement(data_tuple=tuple_)
)

def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
writer threads to finish, which indicates the port storage writing
are finished.
"""
for _, writer, _ in self._port_storage_writers.values():
# This non-blocking stop call will let the storage writers
# flush the remaining buffer
writer.stop()
for _, _, writer_thread in self._port_storage_writers.values():
# This blocking call will wait for all the writer to finish commit
writer_thread.join()

def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None:
"""
Add down stream operator and its transfer policy
Expand Down
5 changes: 5 additions & 0 deletions core/amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def process_input_tuple(self) -> None:
payload=batch,
)
)
self.context.output_manager.save_tuple_to_storage_if_needed(
output_tuple
)

def process_input_state(self) -> None:
self._switch_context()
Expand Down Expand Up @@ -288,6 +291,8 @@ def _process_end_of_output_ports(self, _: EndOfOutputPorts) -> None:

:param _: EndOfOutputPorts
"""
self.context.output_manager.close_port_storage_writers()

for to, batch in self.context.output_manager.emit_marker(EndOfInputChannel()):
self._output_queue.put(
DataElement(
Expand Down
3 changes: 2 additions & 1 deletion core/amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing
from urllib.parse import urlparse

from typing import Optional
Expand Down Expand Up @@ -65,7 +66,7 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument:
)

@staticmethod
def open_document(uri: str) -> (VirtualDocument, Optional[Schema]):
def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]:
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "vfs":
_, _, _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dataclasses import dataclass

from overrides import overrides

from core.models import Tuple
from core.storage.model.buffered_item_writer import BufferedItemWriter
from core.util import StoppableQueueBlockingRunnable, IQueue
from core.util.customized_queue.queue_base import QueueElement


@dataclass
class PortStorageWriterElement(QueueElement):
data_tuple: Tuple


class PortStorageWriter(StoppableQueueBlockingRunnable):
def __init__(self, buffered_item_writer: BufferedItemWriter, queue: IQueue):
super().__init__(name=self.__class__.__name__, queue=queue)
self.buffered_item_writer: BufferedItemWriter = buffered_item_writer

@overrides
def receive(self, next_entry: QueueElement) -> None:
if isinstance(next_entry, PortStorageWriterElement):
self.buffered_item_writer.put_one(next_entry.data_tuple)
else:
raise TypeError(f"Unexpected entry {next_entry}")

@overrides
def pre_start(self) -> None:
self.buffered_item_writer.open()

@overrides
def post_stop(self) -> None:
self.buffered_item_writer.close()
2 changes: 1 addition & 1 deletion core/amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def initialize(
cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = commit_batch_size
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
cls._initialized = True

def __new__(cls, *args, **kwargs):
Expand Down
10 changes: 10 additions & 0 deletions core/amber/src/main/python/core/util/virtual_identity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import re

worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)")


def get_worker_index(worker_id: str) -> int:
match = worker_name_pattern.match(worker_id)
if match:
return int(match.group(2))
raise ValueError("Invalid worker ID format")

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion core/amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from loguru import logger

from core.python_worker import PythonWorker
from core.storage.storage_config import StorageConfig


def init_loguru_logger(stream_log_level) -> None:
Expand All @@ -22,8 +23,28 @@ def init_loguru_logger(stream_log_level) -> None:


if __name__ == "__main__":
_, worker_id, output_port, logger_level, r_path = sys.argv
(
_,
worker_id,
output_port,
logger_level,
r_path,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
)

# Setting R_HOME environment variable for R-UDF usage
if r_path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package edu.uci.ics.amber.engine.architecture.controller.promisehandlers

import com.twitter.util.Future
import edu.uci.ics.amber.core.WorkflowRuntimeException
import edu.uci.ics.amber.core.workflow.GlobalPortIdentity
import edu.uci.ics.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
FatalError
Expand All @@ -12,7 +13,6 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{
QueryStatisticsRequest
}
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import edu.uci.ics.amber.engine.architecture.scheduling.GlobalPortIdentity
import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
import edu.uci.ics.amber.util.VirtualIdentityUtils

Expand Down
Loading
Loading