Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IcebergDocument as one type of the operator result storage #3147

Merged
merged 73 commits into from
Jan 8, 2025

Conversation

bobbai00
Copy link
Collaborator

@bobbai00 bobbai00 commented Dec 10, 2024

Implement Apache Iceberg for Result Storage

Screenshot 2025-01-06 at 3 18 19 PM

How to Enable Iceberg Result Storage

  1. Update storage-config.yaml:
    • Set result-storage-mode to iceberg.

Major Changes

  • Introduced IcebergDocument: A thread-safe VirtualDocument implementation for storing and reading results in Iceberg tables.
  • Introduced IcebergTableWriter: Append-only writer for Iceberg tables with configurable buffer size.
  • Catalog and Data storage for Iceberg: Uses a local file system (file:/) via HadoopCatalog and HadoopFileIO. This ensures Iceberg operates without relying on external storage services.
  • ProgressiveSinkOpExec with a new parameter workerId is added. Each writer of the result storage will take this workerId as one new parameter.

Dependencies

  • Added Apache Iceberg-related libraries.
  • Introduced Hadoop-related libraries to support Iceberg's HadoopCatalog and HadoopFileIO. These libraries are used for placeholder configuration but do not enforce runtime dependency on HDFS.

Overview of Iceberg Components

IcebergDocument

  • Manages reading and organizing data in Iceberg tables.
  • Supports iterator-based incremental reads with thread-safe operations for reading and clearing data.
  • Initializes or overrides the Iceberg table during construction.

IcebergTableWriter

  • Writes data as immutable Parquet files in an append-only manner.
  • Each writer uniquely prefixes its files to avoid conflicts (workerIndex_fileIndex format).
  • Not thread-safe—single-thread access is recommended.

Data Storage via Iceberg Tables

  • Write:
    • Tables are created per storage key.
    • Writers append Parquet files to the table, ensuring immutability.
  • Read:
    • Readers use IcebergDocument.get to fetch data via an iterator.
    • The iterator reads data incrementally while ensuring data order matches the commit sequence of the data files.

Data Reading Using File Metadata

  • Data files are read using getUsingFileSequenceOrder, which:
    • Retrieves and sorts metadata files (FileScanTask) by sequence numbers.
    • Reads records sequentially, skipping files or records as needed.
    • Supports range-based reading (from, until) and incremental reads.
  • Sorting ensures data consistency and order preservation.

Hadoop Usage Without HDFS

  • The HadoopCatalog uses an empty Hadoop configuration, defaulting to the local file system (file:/).
  • This enables efficient management of Iceberg tables in local or network file systems without requiring HDFS infrastructure.

@bobbai00 bobbai00 self-assigned this Dec 10, 2024
@bobbai00 bobbai00 force-pushed the jiadong-add-file-result-storage branch 2 times, most recently from 6522779 to a83d779 Compare December 14, 2024 00:14
@bobbai00 bobbai00 force-pushed the jiadong-add-file-result-storage branch 2 times, most recently from 1edb551 to cef347b Compare December 21, 2024 02:56
@bobbai00 bobbai00 changed the title Add PartitionDocument and ItemizedFileDocument Add IcebergDocument as one implementation of VirtualDocument that can be used to store operator results Dec 22, 2024
@bobbai00 bobbai00 changed the title Add IcebergDocument as one implementation of VirtualDocument that can be used to store operator results Add IcebergDocument as one implementation of VirtualDocument Dec 22, 2024
Copy link
Collaborator

@shengquan-ni shengquan-ni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bobbai00 bobbai00 changed the title Add IcebergDocument as one implementation of VirtualDocument Add IcebergDocument as one type of the operator result storage Jan 6, 2025
@bobbai00 bobbai00 merged commit 7debf45 into master Jan 8, 2025
8 checks passed
@bobbai00 bobbai00 deleted the jiadong-add-file-result-storage branch January 8, 2025 23:46
Xiao-zhen-Liu added a commit that referenced this pull request Jan 30, 2025
This PR adds a storage layer implementation on the Python side of
Texera's codebase, mirroring the implementation of our Java-based
storage layer.

## Motivation
- The primary motivation of having a storage layer in Python so that we
can let Python UDF operators' ports write directly to result tables
without needing to send the results back to Java.
- In the future we will also use the Python storage layer for UDF logs
and workflow runtime statistics.

## Storage APIs
- There are 3 abstract classes in Java's storage implementation:
  - `ReadOnlyVirtualDocument` for read-only tables
- `VirtualDocument` for tables supporting both read and write
operations.
  - `BufferedItemWriter` as a writer class of `VirtualDocument`
- We mirror the implementation in Python, but keep only the APIs
relevant to table storage (e.g., APIs related to dataset storage are not
kept in Python.)

## Iceberg Document
Following #3147, we add a table-storage implementation based on Apache
Iceberg (pyiceberg), including `IcebergDocument`, `IcebergTableWriter`,
`IcebergCatalogInstance`, and related util functions and tests.

### Limitations of / TODOs for python implementation 
pyiceberg is less mature than its java-based counterpart. As a result
there are a few functionalities not supported in our current Python
storage implementation.

#### Incremental Read
Incremental Read is not supported by pyiceberg. It will be supported [in
the future](apache/iceberg-python#533). Before
then we will not include incremental read in our Python codebase (it is
also not currently needed)
#### Concurrent writers
Iceberg uses optimistic concurrency control for concurrent writers. Java
Iceberg natively supports retry with configurable retry parameters,
using exponential backoff (without randomness). However pyiceberg does
not currently support retry. We implemented an ad-hoc custom retry
mechanism in `IcebergTableWriter`, using exponential random backoff
based on the [tenacity](https://tenacity.readthedocs.io/en/latest/)
library. It has a good speed (~0.6s for 10 concurrent writers writing
20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds
for the same test). We may need to re-evaluate this custom
implementation if pyiceberg supports retry natively in the future.

## Iceberg Catalog
pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST
catalog for production. We use postgresql based SQL catalog in this
implementation for the following reasons:
- It supports local storage.
- We tested that it is works with both Java and Python iceberg storage.
- It is easier to set up for developers (compared to REST services).

### PostgreSQL setup
Python storage layer requires a running postgreSQL service in the
environment, and an empty database for iceberg to work.
- **A script to set up a new postgres database for Texera's iceberg
storage has been added for CI tests.**
- The database will be used by pyiceberg to manage the catalog.
- The logic to setup the database is added in GitHub CI config.
- Java side can continue using Hadoop-based catalog for now until we add
storage on operator ports for both Java and Python.
- As the Python storage is not currently used by Python workers, no
action is required for developers for now.

### REST catalogs (feel free to skip this section)
I also explored 3 major REST catalog implementations
([lakekeeper](https://lakekeeper.io),
[polaris](https://polaris.apache.org), and
[gravitino](https://gravitino.apache.org)) and here are some
observations:
- REST catalogs are the trend primarily because different query engines
(Spark, Flink, Snowflake, etc.) relying on iceberg need a central place
to keep and manage the catalogs. Under the hood they all still use some
database as their storage layer.
- Most of them support / recommend cloud storage only in production and
do not support local storage.
- They are incubating projects and lack documentation. For example I
find it very hard to set up authentication (as pyiceberg requires
authentication to work with REST catalogs) using gravitino, and using
them will add a lot more burden to our developers.
- I have successfully made polaris work with our implementation after
setting up auth, but somehow it was very very slow.
- As postgres catalog is working, we will explore more about REST
catalog in the future if have migrated to cloud storage and have
scalability issues.

## Storage configurations

A static class `StorageConfigs` is added to manage storage-related
configurations. We do NOT read the configs from files. Instead we will
let Java pass the configs to Python worker, and the config will be
filled when initializing the worker. The storage config is hardcoded in
CI tests.

## Other items

`VFSURIFactory` and `DocumentFactory` are added in Python storage layer
mirroring the Java implementations.

## TODO for Java Storage
- Add SQL catalog as another type of iceberg catalog

---------

Co-authored-by: Jiadong Bai <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants