From 5f40859e4140c7e4b4b88bd9aa4c84ee0b664d7c Mon Sep 17 00:00:00 2001 From: coufon Date: Sat, 3 Feb 2024 04:58:31 +0000 Subject: [PATCH] Move file options to the main options class --- python/src/space/core/datasets.py | 3 +- python/src/space/core/fs/array_record.py | 15 ------- python/src/space/core/fs/parquet.py | 10 ----- python/src/space/core/loaders/array_record.py | 2 +- python/src/space/core/ops/append.py | 2 +- python/src/space/core/ops/delete.py | 2 +- python/src/space/core/ops/insert.py | 3 +- python/src/space/core/ops/utils.py | 12 ----- python/src/space/core/options.py | 44 ++++++++++++++++++- python/src/space/core/runners.py | 3 +- python/src/space/core/views.py | 4 +- python/src/space/ray/ops/append.py | 2 +- python/src/space/ray/ops/delete.py | 2 +- python/src/space/ray/ops/insert.py | 2 +- python/src/space/ray/runners.py | 3 +- python/tests/core/ops/test_append.py | 5 +-- python/tests/core/ops/test_delete.py | 2 +- python/tests/core/ops/test_read.py | 3 +- 18 files changed, 58 insertions(+), 61 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 9c24cde..994a671 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -20,8 +20,7 @@ import pyarrow as pa from substrait.algebra_pb2 import ReadRel, Rel -from space.core.ops.utils import FileOptions -from space.core.options import JoinOptions, ReadOptions +from space.core.options import FileOptions, JoinOptions, ReadOptions from space.core.runners import LocalRunner from space.core.storage import Storage from space.core.transform.plans import LogicalPlanBuilder diff --git a/python/src/space/core/fs/array_record.py b/python/src/space/core/fs/array_record.py index 0032983..f56efe2 100644 --- a/python/src/space/core/fs/array_record.py +++ b/python/src/space/core/fs/array_record.py @@ -14,26 +14,11 @@ # """ArrayRecord file utilities.""" -from dataclasses import dataclass from typing import List, Optional from space.core.utils.lazy_imports_utils import array_record_module as ar -# pylint: disable=line-too-long -@dataclass -class ArrayRecordOptions: - """Options of ArrayRecord file writer.""" - # Max uncompressed bytes per file. - max_uncompressed_file_bytes = 100 * 1024 * 1024 - # ArrayRecord lib options. - # See https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83 - # Group size 1 maximizes random read performance. - # Match the options of TFDS: - # https://github.com/tensorflow/datasets/blob/92ebd18102b62cf85557ba4b905c970203d8914d/tensorflow_datasets/core/sequential_writer.py#L108 - options: str = "group_size:1" - - def read_record_file(file_path: str, positions: Optional[List[int]] = None) -> List[bytes]: """Read records of an ArrayRecord file. diff --git a/python/src/space/core/fs/parquet.py b/python/src/space/core/fs/parquet.py index 8d9d89b..6a1391c 100644 --- a/python/src/space/core/fs/parquet.py +++ b/python/src/space/core/fs/parquet.py @@ -14,22 +14,12 @@ # """Parquet file utilities.""" -from dataclasses import dataclass from typing import List import pyarrow as pa import pyarrow.parquet as pq -@dataclass -class ParquetWriterOptions: - """Options of Parquet file writer.""" - # Max uncompressed bytes per row group. - max_uncompressed_row_group_bytes = 100 * 1024 - # Max uncompressed bytes per file. - max_uncompressed_file_bytes: int = 1 * 1024 * 1024 - - def write_parquet_file(file_path: str, schema: pa.Schema, data: List[pa.Table]) -> pq.FileMetaData: """Materialize a single Parquet file.""" diff --git a/python/src/space/core/loaders/array_record.py b/python/src/space/core/loaders/array_record.py index caddcd0..be02221 100644 --- a/python/src/space/core/loaders/array_record.py +++ b/python/src/space/core/loaders/array_record.py @@ -24,8 +24,8 @@ from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.ops import utils -from space.core.ops.utils import FileOptions from space.core.ops.append import LocalAppendOp +from space.core.options import FileOptions from space.core.schema import arrow from space.core.serializers import DictSerializer from space.core.utils.paths import StoragePathsMixin diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index 0c39c5d..6672323 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -25,8 +25,8 @@ from space.core.manifests import IndexManifestWriter from space.core.manifests import RecordManifestWriter from space.core.ops import utils -from space.core.ops.utils import FileOptions from space.core.ops.base import BaseOp, InputData +from space.core.options import FileOptions from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.schema import arrow diff --git a/python/src/space/core/ops/delete.py b/python/src/space/core/ops/delete.py index 0f18e04..18a2be1 100644 --- a/python/src/space/core/ops/delete.py +++ b/python/src/space/core/ops/delete.py @@ -24,9 +24,9 @@ from pyroaring import BitMap # type: ignore[import-not-found] from space.core.ops import utils -from space.core.ops.utils import FileOptions from space.core.ops.append import LocalAppendOp from space.core.ops.base import BaseOp +from space.core.options import FileOptions from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.utils import errors diff --git a/python/src/space/core/ops/insert.py b/python/src/space/core/ops/insert.py index 0dec1c7..b954bff 100644 --- a/python/src/space/core/ops/insert.py +++ b/python/src/space/core/ops/insert.py @@ -27,8 +27,7 @@ from space.core.ops.base import BaseOp, InputData from space.core.ops.delete import FileSetDeleteOp from space.core.ops.read import FileSetReadOp -from space.core.ops.utils import FileOptions -from space.core.options import ReadOptions +from space.core.options import FileOptions, ReadOptions import space.core.proto.metadata_pb2 as meta import space.core.proto.runtime_pb2 as rt from space.core.storage import Storage diff --git a/python/src/space/core/ops/utils.py b/python/src/space/core/ops/utils.py index 1c489ba..82244be 100644 --- a/python/src/space/core/ops/utils.py +++ b/python/src/space/core/ops/utils.py @@ -14,30 +14,18 @@ # """Utilities for operation classes.""" -from dataclasses import dataclass, field as dataclass_field from typing import List, Optional, Set import numpy as np import pyarrow as pa import pyarrow.compute as pc -from space.core.fs.array_record import ArrayRecordOptions -from space.core.fs.parquet import ParquetWriterOptions from space.core.schema import arrow from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.utils import errors -@dataclass -class FileOptions: - """Options of file IO.""" - parquet_options: ParquetWriterOptions = dataclass_field( - default_factory=ParquetWriterOptions) - array_record_options: ArrayRecordOptions = dataclass_field( - default_factory=ArrayRecordOptions) - - def update_index_storage_stats( base: meta.StorageStatistics, update: meta.StorageStatistics, diff --git a/python/src/space/core/options.py b/python/src/space/core/options.py index b0f79d2..579f42f 100644 --- a/python/src/space/core/options.py +++ b/python/src/space/core/options.py @@ -14,7 +14,7 @@ # """Options of Space core lib.""" -from dataclasses import dataclass +from dataclasses import dataclass, field as dataclass_field from typing import Any, Callable, List, Optional import pyarrow.compute as pc @@ -56,6 +56,48 @@ def __post_init__(self): self.batch_size = self.batch_size or DEFAULT_READ_BATCH_SIZE +@dataclass +class ParquetWriterOptions: + """Options of Parquet file writer.""" + # Max uncompressed bytes per row group. + max_uncompressed_row_group_bytes: int = 100 * 1024 + + # Max uncompressed bytes per file. + max_uncompressed_file_bytes: int = 1 * 1024 * 1024 + + +# pylint: disable=line-too-long +@dataclass +class ArrayRecordOptions: + """Options of ArrayRecord file writer.""" + # Max uncompressed bytes per file. + max_uncompressed_file_bytes: int = 100 * 1024 * 1024 + + # ArrayRecord lib options. + # + # See https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83 + # Default group size 1 maximizes random read performance. + # It matches the options of TFDS: + # https://github.com/tensorflow/datasets/blob/92ebd18102b62cf85557ba4b905c970203d8914d/tensorflow_datasets/core/sequential_writer.py#L108 + # + # A larger group size improves read throughput from Cloud Storage, because + # each RPC reads a larger chunk of data, which performs better on Cloud + # Storage. + options: str = "group_size:1" + + +@dataclass +class FileOptions: + """Options of file IO.""" + # Parquet file options. + parquet_options: ParquetWriterOptions = dataclass_field( + default_factory=ParquetWriterOptions) + + # ArrayRecord file options. + array_record_options: ArrayRecordOptions = dataclass_field( + default_factory=ArrayRecordOptions) + + @dataclass class Range: """A range of a field.""" diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 855e459..32e0744 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -27,13 +27,12 @@ from space.core.loaders.array_record import LocalArrayRecordLoadOp from space.core.loaders.parquet import LocalParquetLoadOp from space.core.ops.append import LocalAppendOp -from space.core.ops.append import FileOptions from space.core.ops.base import InputData, InputIteratorFn from space.core.ops.change_data import ChangeData, read_change_data from space.core.ops.delete import FileSetDeleteOp from space.core.ops.insert import InsertOptions, LocalInsertOp from space.core.ops.read import FileSetReadOp -from space.core.options import JoinOptions, ReadOptions +from space.core.options import FileOptions, JoinOptions, ReadOptions import space.core.proto.runtime_pb2 as rt from space.core.storage import Storage, Version from space.core.utils import errors diff --git a/python/src/space/core/views.py b/python/src/space/core/views.py index 7ef57a7..be15326 100644 --- a/python/src/space/core/views.py +++ b/python/src/space/core/views.py @@ -22,10 +22,8 @@ import pyarrow as pa from substrait.algebra_pb2 import Rel -from space.core.options import JoinOptions from space.core.fs.factory import create_fs -from space.core.ops.utils import FileOptions -from space.core.options import ReadOptions +from space.core.options import FileOptions, JoinOptions, ReadOptions import space.core.proto.metadata_pb2 as meta from space.core.schema import FieldIdManager from space.core.storage import Storage diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py index ce87144..406c342 100644 --- a/python/src/space/ray/ops/append.py +++ b/python/src/space/ray/ops/append.py @@ -21,9 +21,9 @@ import ray from space.core.ops import utils -from space.core.ops.utils import FileOptions from space.core.ops.append import BaseAppendOp, LocalAppendOp from space.core.ops.base import InputData, InputIteratorFn +from space.core.options import FileOptions from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.ray.options import RayOptions diff --git a/python/src/space/ray/ops/delete.py b/python/src/space/ray/ops/delete.py index 7a12316..ee85a33 100644 --- a/python/src/space/ray/ops/delete.py +++ b/python/src/space/ray/ops/delete.py @@ -21,8 +21,8 @@ import ray from space.core.ops import utils -from space.core.ops.utils import FileOptions from space.core.ops.delete import BaseDeleteOp, FileSetDeleteOp +from space.core.options import FileOptions from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.storage import Storage diff --git a/python/src/space/ray/ops/insert.py b/python/src/space/ray/ops/insert.py index 56c59ff..43cb3d5 100644 --- a/python/src/space/ray/ops/insert.py +++ b/python/src/space/ray/ops/insert.py @@ -23,7 +23,7 @@ from space.ray.ops.append import RayAppendOp from space.core.ops.insert import InsertOptions, LocalInsertOp from space.core.ops.insert import filter_matched -from space.core.ops.utils import FileOptions +from space.core.options import FileOptions import space.core.proto.metadata_pb2 as meta import space.core.proto.runtime_pb2 as rt from space.core.storage import Storage diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index ce5c01e..0cfa206 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -29,12 +29,11 @@ from space.core.runners import BaseReadOnlyRunner, BaseReadWriteRunner from space.core.runners import StorageMixin from space.core.ops import utils -from space.core.ops.utils import FileOptions from space.core.ops.base import InputData, InputIteratorFn from space.core.ops.change_data import ChangeData, ChangeType from space.core.ops.delete import FileSetDeleteOp from space.core.ops.insert import InsertOptions -from space.core.options import JoinOptions, ReadOptions +from space.core.options import FileOptions, JoinOptions, ReadOptions import space.core.proto.runtime_pb2 as rt from space.core.utils import errors from space.ray.ops.append import RayAppendOp diff --git a/python/tests/core/ops/test_append.py b/python/tests/core/ops/test_append.py index 04dcbad..25ab461 100644 --- a/python/tests/core/ops/test_append.py +++ b/python/tests/core/ops/test_append.py @@ -17,7 +17,7 @@ import pyarrow.parquet as pq from space.core.ops.append import LocalAppendOp -from space.core.ops.utils import FileOptions +from space.core.options import FileOptions import space.core.proto.metadata_pb2 as meta from space.core.storage import Storage @@ -64,8 +64,7 @@ def test_write_pydict_all_types(self, tmp_path, all_types_schema, assert patch.storage_statistics_update == meta.StorageStatistics( num_rows=5, index_compressed_bytes=114, index_uncompressed_bytes=126) - def test_write_pydict_with_record_fields(self, tmp_path, - record_fields_schema, + def test_write_pydict_with_record_fields(self, tmp_path, record_fields_schema, record_fields_input_data): location = tmp_path / "dataset" storage = Storage.create(location=str(location), diff --git a/python/tests/core/ops/test_delete.py b/python/tests/core/ops/test_delete.py index db616d5..44c6a1d 100644 --- a/python/tests/core/ops/test_delete.py +++ b/python/tests/core/ops/test_delete.py @@ -16,9 +16,9 @@ import pyarrow.compute as pc from space.core.ops.append import LocalAppendOp -from space.core.ops.utils import FileOptions from space.core.ops.delete import FileSetDeleteOp from space.core.ops.read import FileSetReadOp +from space.core.options import FileOptions from space.core.storage import Storage _default_file_options = FileOptions() diff --git a/python/tests/core/ops/test_read.py b/python/tests/core/ops/test_read.py index f6c053c..c9fb71f 100644 --- a/python/tests/core/ops/test_read.py +++ b/python/tests/core/ops/test_read.py @@ -16,9 +16,8 @@ import pyarrow.compute as pc from space.core.ops.append import LocalAppendOp -from space.core.ops.utils import FileOptions from space.core.ops.read import FileSetReadOp -from space.core.options import ReadOptions +from space.core.options import FileOptions, ReadOptions from space.core.storage import Storage _default_file_options = FileOptions()