diff --git a/python/pyproject.toml b/python/pyproject.toml index 6c5bb0c..02731e8 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -16,6 +16,7 @@ requires-python = ">=3.8" dependencies = [ "absl-py", "array-record", + "cloudpickle", "numpy", "protobuf", "pyarrow >= 14.0.0", diff --git a/python/src/space/__init__.py b/python/src/space/__init__.py index f52d704..a447609 100644 --- a/python/src/space/__init__.py +++ b/python/src/space/__init__.py @@ -14,6 +14,7 @@ # """Space is a storage framework for ML datasets.""" +from space.core.views import View from space.core.datasets import Dataset from space.core.runners import LocalRunner from space.core.schema.types import TfFeatures diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index fc01cba..fe89779 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -15,16 +15,19 @@ """Space dataset is the interface to interact with underlying storage.""" from __future__ import annotations -from typing import List +from typing import Dict, List import pyarrow as pa +from substrait.algebra_pb2 import ReadRel, Rel from space.core.runners import LocalRunner from space.core.serializers.base import DictSerializer from space.core.storage import Storage +from space.core.utils.plans import LogicalPlanBuilder +from space.core.views import View -class Dataset: +class Dataset(View): """Dataset is the interface to interact with Space storage.""" def __init__(self, storage: Storage): @@ -54,6 +57,14 @@ def schema(self) -> pa.Schema: """Return the dataset schema.""" return self._storage.logical_schema + @property + def primary_keys(self) -> List[str]: + return self._storage.primary_keys + + @property + def record_fields(self) -> List[str]: + return self._storage.record_fields + def serializer(self) -> DictSerializer: """Return a serializer (deserializer) for the dataset.""" return DictSerializer(self.schema) @@ -71,3 +82,12 @@ def index_files(self) -> List[str]: def snapshot_ids(self) -> List[int]: """A list of all alive snapshot IDs in the dataset.""" return self._storage.snapshot_ids + + @property + def sources(self) -> Dict[str, Dataset]: + return {self._storage.location: self} + + def to_relation(self, builder: LogicalPlanBuilder) -> Rel: + location = self._storage.location + return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]), + base_schema=self._storage.metadata.schema.fields)) diff --git a/python/src/space/core/proto/metadata.proto b/python/src/space/core/proto/metadata.proto index 82fad91..080e112 100644 --- a/python/src/space/core/proto/metadata.proto +++ b/python/src/space/core/proto/metadata.proto @@ -17,6 +17,7 @@ syntax = "proto3"; package space.proto; import "google/protobuf/timestamp.proto"; +import "substrait/plan.proto"; import "substrait/type.proto"; // Record the current storage metadata path in a static local file. @@ -32,7 +33,7 @@ message EntryPoint { // Metadata persisting the current status of a storage, including logical // metadata such as schema, and physical metadata persisted as a history of // snapshots -// NEXT_ID: 7 +// NEXT_ID: 8 message StorageMetadata { // Create time of the storage. google.protobuf.Timestamp create_time = 1; @@ -43,8 +44,10 @@ message StorageMetadata { // The storage type. enum Type { TYPE_UNSPECIFIED = 0; - // The dataset type supports fully managed storage features. + // Dataset type supports fully managed storage features. DATASET = 1; + // Materialized view type supports synchronizing changes from sources. + MATERIALIZED_VIEW = 2; } Type type = 3; @@ -56,6 +59,9 @@ message StorageMetadata { // All alive snapshots with snapshot ID as key. map snapshots = 6; + + // Store the logical plan for materialized views. + LogicalPlan logical_plan = 7; } // The storage logical schema where user provided types are persisted instead @@ -148,3 +154,14 @@ message RowBitmap { bytes roaring_bitmap = 3; } } + +// Store the logical plan of a transform. +// NEXT_ID: 3 +message LogicalPlan { + // Stores the logical plan. + substrait.Plan logical_plan = 1; + + // Registry of user defined functions. + // Key is UDF name; value is pickle file path. + map udfs = 2; +} diff --git a/python/src/space/core/proto/metadata_pb2.py b/python/src/space/core/proto/metadata_pb2.py index ae41786..2dbf572 100644 --- a/python/src/space/core/proto/metadata_pb2.py +++ b/python/src/space/core/proto/metadata_pb2.py @@ -12,10 +12,11 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 +from substrait import plan_pb2 as substrait_dot_plan__pb2 from substrait import type_pb2 as substrait_dot_type__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\x9f\x03\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\")\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xe8\x01\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\tB\x0b\n\tdata_info\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"O\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x42\x08\n\x06\x62itmapb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/plan.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\xe6\x03\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x12.\n\x0clogical_plan\x18\x07 \x01(\x0b\x32\x18.space.proto.LogicalPlan\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\"@\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\x12\x15\n\x11MATERIALIZED_VIEW\x10\x02\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xe8\x01\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\tB\x0b\n\tdata_info\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"O\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x42\x08\n\x06\x62itmap\"\x93\x01\n\x0bLogicalPlan\x12%\n\x0clogical_plan\x18\x01 \x01(\x0b\x32\x0f.substrait.Plan\x12\x30\n\x04udfs\x18\x02 \x03(\x0b\x32\".space.proto.LogicalPlan.UdfsEntry\x1a+\n\tUdfsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.metadata_pb2', globals()) @@ -24,24 +25,30 @@ DESCRIPTOR._options = None _STORAGEMETADATA_SNAPSHOTSENTRY._options = None _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_options = b'8\001' - _ENTRYPOINT._serialized_start=103 - _ENTRYPOINT._serialized_end=138 - _STORAGEMETADATA._serialized_start=141 - _STORAGEMETADATA._serialized_end=556 - _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_start=442 - _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_end=513 - _STORAGEMETADATA_TYPE._serialized_start=515 - _STORAGEMETADATA_TYPE._serialized_end=556 - _SCHEMA._serialized_start=558 - _SCHEMA._serialized_end=651 - _SNAPSHOT._serialized_start=654 - _SNAPSHOT._serialized_end=886 - _MANIFESTFILES._serialized_start=888 - _MANIFESTFILES._serialized_end=964 - _STORAGESTATISTICS._serialized_start=967 - _STORAGESTATISTICS._serialized_end=1105 - _CHANGELOG._serialized_start=1107 - _CHANGELOG._serialized_end=1208 - _ROWBITMAP._serialized_start=1210 - _ROWBITMAP._serialized_end=1289 + _LOGICALPLAN_UDFSENTRY._options = None + _LOGICALPLAN_UDFSENTRY._serialized_options = b'8\001' + _ENTRYPOINT._serialized_start=125 + _ENTRYPOINT._serialized_end=160 + _STORAGEMETADATA._serialized_start=163 + _STORAGEMETADATA._serialized_end=649 + _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_start=512 + _STORAGEMETADATA_SNAPSHOTSENTRY._serialized_end=583 + _STORAGEMETADATA_TYPE._serialized_start=585 + _STORAGEMETADATA_TYPE._serialized_end=649 + _SCHEMA._serialized_start=651 + _SCHEMA._serialized_end=744 + _SNAPSHOT._serialized_start=747 + _SNAPSHOT._serialized_end=979 + _MANIFESTFILES._serialized_start=981 + _MANIFESTFILES._serialized_end=1057 + _STORAGESTATISTICS._serialized_start=1060 + _STORAGESTATISTICS._serialized_end=1198 + _CHANGELOG._serialized_start=1200 + _CHANGELOG._serialized_end=1301 + _ROWBITMAP._serialized_start=1303 + _ROWBITMAP._serialized_end=1382 + _LOGICALPLAN._serialized_start=1385 + _LOGICALPLAN._serialized_end=1532 + _LOGICALPLAN_UDFSENTRY._serialized_start=1489 + _LOGICALPLAN_UDFSENTRY._serialized_end=1532 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/metadata_pb2.pyi b/python/src/space/core/proto/metadata_pb2.pyi index f397354..a730b20 100644 --- a/python/src/space/core/proto/metadata_pb2.pyi +++ b/python/src/space/core/proto/metadata_pb2.pyi @@ -22,6 +22,7 @@ import google.protobuf.internal.containers import google.protobuf.internal.enum_type_wrapper import google.protobuf.message import google.protobuf.timestamp_pb2 +import substrait.plan_pb2 import substrait.type_pb2 import sys import typing @@ -61,7 +62,7 @@ class StorageMetadata(google.protobuf.message.Message): """Metadata persisting the current status of a storage, including logical metadata such as schema, and physical metadata persisted as a history of snapshots - NEXT_ID: 7 + NEXT_ID: 8 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -74,14 +75,18 @@ class StorageMetadata(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor TYPE_UNSPECIFIED: StorageMetadata._Type.ValueType # 0 DATASET: StorageMetadata._Type.ValueType # 1 - """The dataset type supports fully managed storage features.""" + """Dataset type supports fully managed storage features.""" + MATERIALIZED_VIEW: StorageMetadata._Type.ValueType # 2 + """Materialized view type supports synchronizing changes from sources.""" class Type(_Type, metaclass=_TypeEnumTypeWrapper): """The storage type.""" TYPE_UNSPECIFIED: StorageMetadata.Type.ValueType # 0 DATASET: StorageMetadata.Type.ValueType # 1 - """The dataset type supports fully managed storage features.""" + """Dataset type supports fully managed storage features.""" + MATERIALIZED_VIEW: StorageMetadata.Type.ValueType # 2 + """Materialized view type supports synchronizing changes from sources.""" @typing_extensions.final class SnapshotsEntry(google.protobuf.message.Message): @@ -107,6 +112,7 @@ class StorageMetadata(google.protobuf.message.Message): SCHEMA_FIELD_NUMBER: builtins.int CURRENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int SNAPSHOTS_FIELD_NUMBER: builtins.int + LOGICAL_PLAN_FIELD_NUMBER: builtins.int @property def create_time(self) -> google.protobuf.timestamp_pb2.Timestamp: """Create time of the storage.""" @@ -122,6 +128,9 @@ class StorageMetadata(google.protobuf.message.Message): @property def snapshots(self) -> google.protobuf.internal.containers.MessageMap[builtins.int, global___Snapshot]: """All alive snapshots with snapshot ID as key.""" + @property + def logical_plan(self) -> global___LogicalPlan: + """Store the logical plan for materialized views.""" def __init__( self, *, @@ -131,9 +140,10 @@ class StorageMetadata(google.protobuf.message.Message): schema: global___Schema | None = ..., current_snapshot_id: builtins.int = ..., snapshots: collections.abc.Mapping[builtins.int, global___Snapshot] | None = ..., + logical_plan: global___LogicalPlan | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "logical_plan", b"logical_plan", "schema", b"schema"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "logical_plan", b"logical_plan", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ... global___StorageMetadata = StorageMetadata @@ -327,3 +337,48 @@ class RowBitmap(google.protobuf.message.Message): def WhichOneof(self, oneof_group: typing_extensions.Literal["bitmap", b"bitmap"]) -> typing_extensions.Literal["roaring_bitmap"] | None: ... global___RowBitmap = RowBitmap + +@typing_extensions.final +class LogicalPlan(google.protobuf.message.Message): + """Store the logical plan of a transform. + NEXT_ID: 3 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + @typing_extensions.final + class UdfsEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + + LOGICAL_PLAN_FIELD_NUMBER: builtins.int + UDFS_FIELD_NUMBER: builtins.int + @property + def logical_plan(self) -> substrait.plan_pb2.Plan: + """Stores the logical plan.""" + @property + def udfs(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Registry of user defined functions. + Key is UDF name; value is pickle file path. + """ + def __init__( + self, + *, + logical_plan: substrait.plan_pb2.Plan | None = ..., + udfs: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["logical_plan", b"logical_plan"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["logical_plan", b"logical_plan", "udfs", b"udfs"]) -> None: ... + +global___LogicalPlan = LogicalPlan diff --git a/python/src/space/core/schema/arrow.py b/python/src/space/core/schema/arrow.py index 1bee269..78728b2 100644 --- a/python/src/space/core/schema/arrow.py +++ b/python/src/space/core/schema/arrow.py @@ -153,6 +153,11 @@ def field_name_to_id_dict(schema: pa.Schema) -> Dict[str, int]: return {f.name: field_id(f) for f in schema} +def field_id_to_name_dict(schema: pa.Schema) -> Dict[int, str]: + """Return a dict with field ID as key and field name as value.""" + return {field_id(f): f.name for f in schema} + + def field_id_to_column_id_dict(schema: pa.Schema) -> Dict[int, int]: """Return a dict with field ID as key and column ID as value.""" field_id_dict = field_name_to_id_dict(schema) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index ee243b3..364c8f9 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -67,6 +67,11 @@ def primary_keys(self) -> List[str]: """Return the storage primary keys.""" return list(self._metadata.schema.primary_keys) + @property + def record_fields(self) -> List[str]: + """Return record field names.""" + return list(self._metadata.schema.record_fields) + @property def logical_schema(self) -> pa.Schema: """Return the user specified schema.""" @@ -89,17 +94,24 @@ def snapshot(self, snapshot_id: Optional[int] = None) -> meta.Snapshot: raise RuntimeError(f"Snapshot {snapshot_id} is not found") + # pylint: disable=too-many-arguments @classmethod def create( - cls, location: str, schema: pa.Schema, primary_keys: List[str], - record_fields: List[str]) -> Storage: # pylint: disable=unused-argument + cls, + location: str, + schema: pa.Schema, + primary_keys: List[str], + record_fields: List[str], + logical_plan: Optional[meta.LogicalPlan] = None + ) -> Storage: # pylint: disable=unused-argument """Create a new empty storage. - + Args: location: the directory path to the storage. schema: the schema of the storage. primary_keys: un-enforced primary keys. record_fields: fields stored in row format (ArrayRecord). + logical_plan: logical plan of materialized view. """ # TODO: to verify that location is an empty directory. # TODO: to verify primary key fields and record_fields (and types) are @@ -122,6 +134,10 @@ def create( current_snapshot_id=_INIT_SNAPSHOT_ID, type=meta.StorageMetadata.DATASET) + if logical_plan is not None: + metadata.type = meta.StorageMetadata.MATERIALIZED_VIEW + metadata.logical_plan.CopyFrom(logical_plan) + new_metadata_path = paths.new_metadata_path(paths.metadata_dir(location)) snapshot = meta.Snapshot(snapshot_id=_INIT_SNAPSHOT_ID, create_time=now) diff --git a/python/src/space/core/transform.py b/python/src/space/core/transform.py new file mode 100644 index 0000000..09804a3 --- /dev/null +++ b/python/src/space/core/transform.py @@ -0,0 +1,250 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Classes for transforming datasets and """ + +from __future__ import annotations +from dataclasses import dataclass +from os import path +from typing import Dict, List, Tuple + +import pyarrow as pa +from substrait.algebra_pb2 import Expression +from substrait.algebra_pb2 import FilterRel +from substrait.algebra_pb2 import FunctionArgument +from substrait.algebra_pb2 import ProjectRel +from substrait.algebra_pb2 import Rel +from substrait.extensions.extensions_pb2 import SimpleExtensionDeclaration +from substrait.extensions.extensions_pb2 import SimpleExtensionURI +from substrait.plan_pb2 import Plan +from substrait.type_pb2 import Type + +from space.core.datasets import Dataset +import space.core.proto.metadata_pb2 as meta +from space.core.schema import arrow +from space.core.utils.plans import SIMPLE_UDF_URI +from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn +from space.core.views import View + + +@dataclass +class BaseUdfTransform(View): + """Base class for transforms containing a single user defined function.""" + + # The UDF of this transform. + udf: UserDefinedFn + # The input view to apply the UDF to. + input_: View + # The fields to read from the input view. + # TODO: to push input field selection down to the input view, for reading + # less data from the sources. + input_fields: List[str] + + @property + def primary_keys(self) -> List[str]: + return self.input_.primary_keys + + @property + def sources(self) -> Dict[str, Dataset]: + return self.input_.sources + + def _add_udf(self, builder: LogicalPlanBuilder) -> int: + """Add the UDF to the logical plan. + + Returns: + The function anchor as the function_reference of this UDF. + """ + # TODO: to support user provided UDF name. + new_udf_name = builder.new_udf_name() + builder.add_udf(new_udf_name, self.udf) + + extension_uri_anchor = builder.next_ext_uri_anchor() + fn_anchor = builder.next_function_anchor() + + builder.append_ext_uri( + SimpleExtensionURI(extension_uri_anchor=extension_uri_anchor, + uri=SIMPLE_UDF_URI)) + builder.append_ext( + SimpleExtensionDeclaration( + extension_function=SimpleExtensionDeclaration.ExtensionFunction( + extension_uri_reference=extension_uri_anchor, + function_anchor=fn_anchor, + name=new_udf_name))) + + return fn_anchor + + def _arguments(self) -> List[FunctionArgument]: + """Returns a list of UDF input args. + + Fields are represented by field IDs. + """ + if not self.input_fields: + input_fields = self.input_.schema.names + else: + input_fields = self.input_fields + + field_id_dict = arrow.field_name_to_id_dict(self.input_.schema) + return [_fn_arg(field_id_dict[name]) for name in input_fields] + + +class MapTransform(BaseUdfTransform): + """Map a view by a user defined function.""" + + @property + def schema(self) -> pa.Schema: + return self.udf.output_schema + + @property + def record_fields(self) -> List[str]: + return self.udf.output_record_fields + + def to_relation(self, builder: LogicalPlanBuilder) -> Rel: + input_rel = self.input_.to_relation(builder) + fn_anchor = self._add_udf(builder) + # NOTE: output_type is unset because it is a single field type per + # project expression in Substrait protocol, but a schema is needed here. + # The output types are recorded in view schema. + # TODO: to populate output_type as a Type.Struct. + project_expr = Expression(scalar_function=Expression.ScalarFunction( + function_reference=fn_anchor, arguments=self._arguments())) + return Rel(project=ProjectRel(input=input_rel, expressions=[project_expr])) + + @classmethod + def from_relation(cls, location: str, metadata: meta.StorageMetadata, + rel: Rel, plan: _CompactPlan) -> MapTransform: + """Build a MapTransform from logical plan relation.""" + return MapTransform(*_load_udf(location, metadata, rel.project. + expressions[0], rel.project.input, plan)) + + +@dataclass +class FilterTransform(BaseUdfTransform): + """Filter a view by a user defined function.""" + + @property + def schema(self) -> pa.Schema: + return self.input_.schema + + @property + def record_fields(self) -> List[str]: + return self.input_.record_fields + + def to_relation(self, builder: LogicalPlanBuilder) -> Rel: + input_rel = self.input_.to_relation(builder) + fn_anchor = self._add_udf(builder) + condition_expr = Expression( + scalar_function=Expression.ScalarFunction(function_reference=fn_anchor, + arguments=self._arguments(), + output_type=Type( + bool=Type.Boolean()))) + return Rel(filter=FilterRel(input=input_rel, condition=condition_expr)) + + @classmethod + def from_relation(cls, location: str, metadata: meta.StorageMetadata, + rel: Rel, plan: _CompactPlan) -> FilterTransform: + """Build a FilterTransform from logical plan relation.""" + return FilterTransform(*_load_udf(location, metadata, rel.filter.condition, + rel.filter.input, plan)) + + +@dataclass +class _CompactPlan: + """A helper class storing information from a Substrait plan in a read + friendly format. + """ + + # Key is function_anchor. + ext_fn_dict: Dict[int, SimpleExtensionDeclaration] + # Key is extension_uri_anchor. + ext_uri_dict: Dict[int, SimpleExtensionURI] + + @classmethod + def from_plan(cls, plan: Plan) -> _CompactPlan: + """Build a _CompactPlan from a plan.""" + ext_fn_dict = {} + for ext in plan.extensions: + ext_fn_dict[ext.extension_function.function_anchor] = ext + + ext_uri_dict = {} + for uri in plan.extension_uris: + ext_uri_dict[uri.extension_uri_anchor] = uri + + return _CompactPlan(ext_fn_dict, ext_uri_dict) + + +def _load_udf(location: str, metadata: meta.StorageMetadata, + expression: Expression, input_rel: Rel, + plan: _CompactPlan) -> Tuple[UserDefinedFn, View, List[str]]: + """Load UDF information for building a transform from a relation. + + Returns: + A tuple of: (1) UDF class, (2) the input view, (3) input argument field + names. + """ + scalar_fn = expression.scalar_function + fn_extension = plan.ext_fn_dict[scalar_fn.function_reference] + + # Sanity check. + if plan.ext_uri_dict[fn_extension.extension_function. + extension_uri_reference].uri != SIMPLE_UDF_URI: + raise RuntimeError("Only UDF is supported in logical plan extension URIs") + + # Load the UDF from file. + pickle_path = metadata.logical_plan.udfs[ + fn_extension.extension_function.name] + udf = UserDefinedFn.load(path.join(location, pickle_path)) + + # Build the input view and input argument field names. + input_ = load_view_(location, metadata, input_rel, plan) + field_name_dict = arrow.field_id_to_name_dict(input_.schema) + input_fields = [ + field_name_dict[arg.value.selection.direct_reference.struct_field.field] + for arg in scalar_fn.arguments + ] + + return udf, input_, input_fields + + +def load_view(location: str, metadata: meta.StorageMetadata, + plan: Plan) -> View: + """Build a view from logical plan relation.""" + rel = plan.relations[0].root.input + return load_view_(location, metadata, rel, _CompactPlan.from_plan(plan)) + + +def load_view_(location: str, metadata: meta.StorageMetadata, rel: Rel, + plan: _CompactPlan) -> View: + """Build a view from logical plan relation.""" + if rel.HasField("read"): + return Dataset.load(rel.read.named_table.names[0]) + elif rel.HasField("project"): + return MapTransform.from_relation(location, metadata, rel, plan) + elif rel.HasField("filter"): + return FilterTransform.from_relation(location, metadata, rel, plan) + + raise RuntimeError(f"Substrait relation not supported: {rel}") + + +def _fn_arg(field_id: int) -> FunctionArgument: + """Return a Substrait function argument for a field ID. + + NOTE: StructField.field in Substrait was for the position of a field in the + field name list in depth first order. Its meaning is replaced by field ID + here. It does not affect any functionality of fetching a field from an + integer. To revisit the design in future. + """ + return FunctionArgument(value=Expression(selection=Expression.FieldReference( + direct_reference=Expression.ReferenceSegment( + struct_field=Expression.ReferenceSegment.StructField( + field=field_id))))) diff --git a/python/src/space/core/utils/paths.py b/python/src/space/core/utils/paths.py index 45f15ee..9bba268 100644 --- a/python/src/space/core/utils/paths.py +++ b/python/src/space/core/utils/paths.py @@ -18,10 +18,13 @@ from space.core.utils.uuids import uuid_ +# Folders of storage metadata. _ENTRY_POINT_FILE = "entrypoint.txtpb" _DATA_DIR = "data" _METADATA_DIR = "metadata" _CHANGE_DATA_DIR = "changes" +# Folder of user defined functions for materialized views. +UDF_DIR = 'udfs' def new_index_file_path(data_dir_: str): diff --git a/python/src/space/core/utils/plans.py b/python/src/space/core/utils/plans.py new file mode 100644 index 0000000..1a2a1c3 --- /dev/null +++ b/python/src/space/core/utils/plans.py @@ -0,0 +1,121 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Plans for view/dataset transforms.""" + +from __future__ import annotations +from dataclasses import dataclass +from typing import Callable, Dict, List + +import cloudpickle # type: ignore[import-untyped] +import pyarrow as pa +from space.core.utils.uuids import random_id + +from substrait.algebra_pb2 import Rel, RelRoot +from substrait.extensions.extensions_pb2 import SimpleExtensionDeclaration +from substrait.extensions.extensions_pb2 import SimpleExtensionURI +from substrait.plan_pb2 import Plan, PlanRel + +# Substrait URI representing user defined functions. +# When constructing a materialized view from logical plan, the UDF is loaded +# from a pickle file path in the storage metadata's UDF registry. +SIMPLE_UDF_URI = "urn:space:substrait_simple_extension_function" + + +@dataclass +class UserDefinedFn: + """A user defined function in the logical plan. + + The class object is persisted in the storage metadata's UDF registry. + """ + # A callable provided by users. The requirement on signature varies depending + # on the transform type. + fn: Callable + # The output schema after applying fn on the input view. + output_schema: pa.Schema + # The record fields in the output schema. + output_record_fields: List[str] + # If reading the input view by batches, number of rows per batch. + batch_size: int = -1 + + # TODO: file operations need to be through the FileSystem interface. + + @classmethod + def load(cls, file_path: str) -> UserDefinedFn: + """Load a UDF from a file.""" + with open(file_path, "rb") as f: + udf = cloudpickle.load(f) + + return udf + + def dump(self, file_path: str) -> None: + """Dump UDF into a file.""" + with open(file_path, 'wb') as f: + cloudpickle.dump(self, f) + + +class LogicalPlanBuilder: + """A builder of logical plan in the Substrait format.""" + + def __init__(self): + self._plan = Plan() + self._udfs: Dict[str, UserDefinedFn] = {} + + self._extension_uri_anchor = 1 + self._function_anchor = 1 + + def next_ext_uri_anchor(self) -> int: + """Return the next extension URI anchor.""" + result = self._extension_uri_anchor + self._extension_uri_anchor += 1 + return result + + def next_function_anchor(self) -> int: + """Return the next function anchor.""" + result = self._function_anchor + self._function_anchor += 1 + return result + + def append_ext_uri(self, uri: SimpleExtensionURI) -> None: + """Append an extension URI in the plan.""" + self._plan.extension_uris.append(uri) + + def append_ext(self, ext: SimpleExtensionDeclaration) -> None: + """Append an extension in the plan.""" + self._plan.extensions.append(ext) + + def build(self, relation: Rel) -> Plan: + """Build the plan.""" + self._plan.relations.append(PlanRel(root=RelRoot(input=relation))) + return self._plan + + def add_udf(self, name: str, fn: UserDefinedFn) -> None: + """Add a new user defined function to the plan.""" + self._udfs[name] = fn + + def new_udf_name(self) -> str: + """Return a random UDF name, unique in the plan scope.""" + retry_count = 0 + while retry_count < 10: + retry_count += 1 + name = f"udf_{random_id()}" + if name not in self._udfs: + return name + + raise RuntimeError("Failed to generate an unused UDF name") + + @property + def udfs(self) -> Dict[str, UserDefinedFn]: + """Return user defined functions in the plan.""" + return self._udfs diff --git a/python/src/space/core/views.py b/python/src/space/core/views.py new file mode 100644 index 0000000..3cdcdde --- /dev/null +++ b/python/src/space/core/views.py @@ -0,0 +1,175 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Views (materialized views) are transforms applied to datasets.""" + +from __future__ import annotations +from abc import ABC, abstractmethod +from os import path +from typing import Callable, Dict, List, Optional, TYPE_CHECKING + +import pyarrow as pa +from substrait.algebra_pb2 import Rel + +from space.core.fs.factory import create_fs +import space.core.proto.metadata_pb2 as meta +from space.core.schema import FieldIdManager +from space.core.storage import Storage +from space.core.utils.paths import UDF_DIR, metadata_dir +from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn + +if TYPE_CHECKING: + from space.core.datasets import Dataset + + +class View(ABC): + """A view is a dataset, or a transform applied to a dataset, or a transform + applied to another view. + """ + + @property + @abstractmethod + def schema(self) -> pa.Schema: + """Return the view schema.""" + + @property + @abstractmethod + def primary_keys(self) -> List[str]: + """Return the primary keys.""" + + @property + @abstractmethod + def record_fields(self) -> List[str]: + """Return the record field names.""" + + @property + @abstractmethod + def sources(self) -> Dict[str, Dataset]: + """Return the datasets in its upstream views. + + Key is dataset location as the identifier. + """ + + @abstractmethod + def to_relation(self, builder: LogicalPlanBuilder) -> Rel: + """Obtain the logical plan relation of the view. + + The relation describes a dataset or a transform in the Substrait format. + """ + + def materialize(self, location: str) -> MaterializedView: + """Materialize a view to files in the Space storage format. + + Args: + location: the folder location of the materialized view files. + """ + plan_builder = LogicalPlanBuilder() + rel = self.to_relation(plan_builder) + logical_plan = meta.LogicalPlan(logical_plan=plan_builder.build(rel)) + return MaterializedView.create(location, self, logical_plan, + plan_builder.udfs) + + # pylint: disable=too-many-arguments + def map_batches(self, + fn: Callable, + input_fields: List[str], + output_schema: pa.Schema, + output_record_fields: List[str], + batch_size: int = -1) -> View: + """Transform batches of data by a user defined function. + + Args: + fn: a user defined function on batches. + input_fields: the fields to read from the input view. + output_schema: the output schema. + batch_size: the number of rows per batch. + """ + # Assign field IDs to the output schema. + field_id_mgr = FieldIdManager(next_field_id=0) + output_schema = field_id_mgr.assign_field_ids(output_schema) + + # pylint: disable=cyclic-import,import-outside-toplevel + from space.core.transform import MapTransform + return MapTransform( + UserDefinedFn(fn, output_schema, output_record_fields, batch_size), + self, input_fields) + + def filter(self, + fn: Callable, + input_fields: Optional[List[str]] = None) -> View: + """Filter rows by the provided user defined function. + + Args: + fn: a user defined function on batches. + input_fields: the fields to read from the input view. + """ + if input_fields is None: + input_fields = [] + + # pylint: disable=cyclic-import,import-outside-toplevel + from space.core.transform import FilterTransform + return FilterTransform(UserDefinedFn(fn, self.schema, self.record_fields), + self, input_fields) + + +class MaterializedView: + """A view materialized as a Space storage. + + When the source datasets are modified, refreshing the materialized view + keeps the view up-to-date by reading the changes in the sources, processing + the changes by the transforms in logical plan, and writing the results into + materialized view storage. + """ + + def __init__(self, storage: Storage, view: View): + self._storage = storage + self._view = view + + @property + def storage(self) -> Storage: + """Return storage of the materialized view.""" + return self._storage + + @property + def view(self) -> Optional[View]: + """Return view of the materialized view.""" + return self._view + + @classmethod + def create(cls, location: str, view: View, logical_plan: meta.LogicalPlan, + udfs: Dict[str, UserDefinedFn]) -> MaterializedView: + """Create a new materialized view.""" + udf_dir = path.join(metadata_dir(location), UDF_DIR) + create_fs(location).create_dir(udf_dir) + + for name, udf in udfs.items(): + full_path = path.join(udf_dir, f"{name}.pkl") + udf.dump(full_path) + logical_plan.udfs[name] = path.relpath(full_path, location) + + storage = Storage.create(location, view.schema, view.primary_keys, + view.record_fields, logical_plan) + return MaterializedView(storage, view) + + @classmethod + def load(cls, location: str) -> MaterializedView: + """Load a materialized view from files.""" + storage = Storage.load(location) + metadata = storage.metadata + plan = metadata.logical_plan.logical_plan + + # pylint: disable=cyclic-import,import-outside-toplevel + from space.core.transform import load_view + view = load_view(storage.location, metadata, plan) + return MaterializedView(storage, view) diff --git a/python/src/space/tf/data_sources.py b/python/src/space/tf/data_sources.py index b716680..bd3f1b1 100644 --- a/python/src/space/tf/data_sources.py +++ b/python/src/space/tf/data_sources.py @@ -23,7 +23,7 @@ from tensorflow_datasets.core.utils import shard_utils # type: ignore[import-untyped] from tensorflow_datasets.core.utils.lazy_imports_utils import array_record_data_source as ards # type: ignore[import-untyped] -from space import Dataset +from space.core.datasets import Dataset from space.core.schema import constants