Skip to content

Commit

Permalink
Add logical plan support
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 28, 2023
1 parent 19dd8fa commit 745ba9a
Show file tree
Hide file tree
Showing 13 changed files with 705 additions and 34 deletions.
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ requires-python = ">=3.8"
dependencies = [
"absl-py",
"array-record",
"cloudpickle",
"numpy",
"protobuf",
"pyarrow >= 14.0.0",
Expand Down
1 change: 1 addition & 0 deletions python/src/space/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#
"""Space is a storage framework for ML datasets."""

from space.core.views import View
from space.core.datasets import Dataset
from space.core.runners import LocalRunner
from space.core.schema.types import TfFeatures
24 changes: 22 additions & 2 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
"""Space dataset is the interface to interact with underlying storage."""

from __future__ import annotations
from typing import List
from typing import Dict, List

import pyarrow as pa
from substrait.algebra_pb2 import ReadRel, Rel

from space.core.runners import LocalRunner
from space.core.serializers.base import DictSerializer
from space.core.storage import Storage
from space.core.utils.plans import LogicalPlanBuilder
from space.core.views import View


class Dataset:
class Dataset(View):
"""Dataset is the interface to interact with Space storage."""

def __init__(self, storage: Storage):
Expand Down Expand Up @@ -54,6 +57,14 @@ def schema(self) -> pa.Schema:
"""Return the dataset schema."""
return self._storage.logical_schema

@property
def primary_keys(self) -> List[str]:
return self._storage.primary_keys

@property
def record_fields(self) -> List[str]:
return self._storage.record_fields

def serializer(self) -> DictSerializer:
"""Return a serializer (deserializer) for the dataset."""
return DictSerializer(self.schema)
Expand All @@ -71,3 +82,12 @@ def index_files(self) -> List[str]:
def snapshot_ids(self) -> List[int]:
"""A list of all alive snapshot IDs in the dataset."""
return self._storage.snapshot_ids

@property
def sources(self) -> Dict[str, Dataset]:
return {self._storage.location: self}

def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
location = self._storage.location
return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]),
base_schema=self._storage.metadata.schema.fields))
21 changes: 19 additions & 2 deletions python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ syntax = "proto3";
package space.proto;

import "google/protobuf/timestamp.proto";
import "substrait/plan.proto";
import "substrait/type.proto";

// Record the current storage metadata path in a static local file.
Expand All @@ -32,7 +33,7 @@ message EntryPoint {
// Metadata persisting the current status of a storage, including logical
// metadata such as schema, and physical metadata persisted as a history of
// snapshots
// NEXT_ID: 7
// NEXT_ID: 8
message StorageMetadata {
// Create time of the storage.
google.protobuf.Timestamp create_time = 1;
Expand All @@ -43,8 +44,10 @@ message StorageMetadata {
// The storage type.
enum Type {
TYPE_UNSPECIFIED = 0;
// The dataset type supports fully managed storage features.
// Dataset type supports fully managed storage features.
DATASET = 1;
// Materialized view type supports synchronizing changes from sources.
MATERIALIZED_VIEW = 2;
}
Type type = 3;

Expand All @@ -56,6 +59,9 @@ message StorageMetadata {

// All alive snapshots with snapshot ID as key.
map<int64, Snapshot> snapshots = 6;

// Store the logical plan for materialized views.
LogicalPlan logical_plan = 7;
}

// The storage logical schema where user provided types are persisted instead
Expand Down Expand Up @@ -148,3 +154,14 @@ message RowBitmap {
bytes roaring_bitmap = 3;
}
}

// Store the logical plan of a transform.
// NEXT_ID: 3
message LogicalPlan {
// Stores the logical plan.
substrait.Plan logical_plan = 1;

// Registry of user defined functions.
// Key is UDF name; value is pickle file path.
map<string, string> udfs = 2;
}
49 changes: 28 additions & 21 deletions python/src/space/core/proto/metadata_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 60 additions & 5 deletions python/src/space/core/proto/metadata_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import google.protobuf.timestamp_pb2
import substrait.plan_pb2
import substrait.type_pb2
import sys
import typing
Expand Down Expand Up @@ -61,7 +62,7 @@ class StorageMetadata(google.protobuf.message.Message):
"""Metadata persisting the current status of a storage, including logical
metadata such as schema, and physical metadata persisted as a history of
snapshots
NEXT_ID: 7
NEXT_ID: 8
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand All @@ -74,14 +75,18 @@ class StorageMetadata(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
TYPE_UNSPECIFIED: StorageMetadata._Type.ValueType # 0
DATASET: StorageMetadata._Type.ValueType # 1
"""The dataset type supports fully managed storage features."""
"""Dataset type supports fully managed storage features."""
MATERIALIZED_VIEW: StorageMetadata._Type.ValueType # 2
"""Materialized view type supports synchronizing changes from sources."""

class Type(_Type, metaclass=_TypeEnumTypeWrapper):
"""The storage type."""

TYPE_UNSPECIFIED: StorageMetadata.Type.ValueType # 0
DATASET: StorageMetadata.Type.ValueType # 1
"""The dataset type supports fully managed storage features."""
"""Dataset type supports fully managed storage features."""
MATERIALIZED_VIEW: StorageMetadata.Type.ValueType # 2
"""Materialized view type supports synchronizing changes from sources."""

@typing_extensions.final
class SnapshotsEntry(google.protobuf.message.Message):
Expand All @@ -107,6 +112,7 @@ class StorageMetadata(google.protobuf.message.Message):
SCHEMA_FIELD_NUMBER: builtins.int
CURRENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int
SNAPSHOTS_FIELD_NUMBER: builtins.int
LOGICAL_PLAN_FIELD_NUMBER: builtins.int
@property
def create_time(self) -> google.protobuf.timestamp_pb2.Timestamp:
"""Create time of the storage."""
Expand All @@ -122,6 +128,9 @@ class StorageMetadata(google.protobuf.message.Message):
@property
def snapshots(self) -> google.protobuf.internal.containers.MessageMap[builtins.int, global___Snapshot]:
"""All alive snapshots with snapshot ID as key."""
@property
def logical_plan(self) -> global___LogicalPlan:
"""Store the logical plan for materialized views."""
def __init__(
self,
*,
Expand All @@ -131,9 +140,10 @@ class StorageMetadata(google.protobuf.message.Message):
schema: global___Schema | None = ...,
current_snapshot_id: builtins.int = ...,
snapshots: collections.abc.Mapping[builtins.int, global___Snapshot] | None = ...,
logical_plan: global___LogicalPlan | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "logical_plan", b"logical_plan", "schema", b"schema"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "logical_plan", b"logical_plan", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ...

global___StorageMetadata = StorageMetadata

Expand Down Expand Up @@ -327,3 +337,48 @@ class RowBitmap(google.protobuf.message.Message):
def WhichOneof(self, oneof_group: typing_extensions.Literal["bitmap", b"bitmap"]) -> typing_extensions.Literal["roaring_bitmap"] | None: ...

global___RowBitmap = RowBitmap

@typing_extensions.final
class LogicalPlan(google.protobuf.message.Message):
"""Store the logical plan of a transform.
NEXT_ID: 3
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

@typing_extensions.final
class UdfsEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
value: builtins.str
def __init__(
self,
*,
key: builtins.str = ...,
value: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

LOGICAL_PLAN_FIELD_NUMBER: builtins.int
UDFS_FIELD_NUMBER: builtins.int
@property
def logical_plan(self) -> substrait.plan_pb2.Plan:
"""Stores the logical plan."""
@property
def udfs(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
"""Registry of user defined functions.
Key is UDF name; value is pickle file path.
"""
def __init__(
self,
*,
logical_plan: substrait.plan_pb2.Plan | None = ...,
udfs: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["logical_plan", b"logical_plan"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["logical_plan", b"logical_plan", "udfs", b"udfs"]) -> None: ...

global___LogicalPlan = LogicalPlan
5 changes: 5 additions & 0 deletions python/src/space/core/schema/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ def field_name_to_id_dict(schema: pa.Schema) -> Dict[str, int]:
return {f.name: field_id(f) for f in schema}


def field_id_to_name_dict(schema: pa.Schema) -> Dict[int, str]:
"""Return a dict with field ID as key and field name as value."""
return {field_id(f): f.name for f in schema}


def field_id_to_column_id_dict(schema: pa.Schema) -> Dict[int, int]:
"""Return a dict with field ID as key and column ID as value."""
field_id_dict = field_name_to_id_dict(schema)
Expand Down
22 changes: 19 additions & 3 deletions python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def primary_keys(self) -> List[str]:
"""Return the storage primary keys."""
return list(self._metadata.schema.primary_keys)

@property
def record_fields(self) -> List[str]:
"""Return record field names."""
return list(self._metadata.schema.record_fields)

@property
def logical_schema(self) -> pa.Schema:
"""Return the user specified schema."""
Expand All @@ -89,17 +94,24 @@ def snapshot(self, snapshot_id: Optional[int] = None) -> meta.Snapshot:

raise RuntimeError(f"Snapshot {snapshot_id} is not found")

# pylint: disable=too-many-arguments
@classmethod
def create(
cls, location: str, schema: pa.Schema, primary_keys: List[str],
record_fields: List[str]) -> Storage: # pylint: disable=unused-argument
cls,
location: str,
schema: pa.Schema,
primary_keys: List[str],
record_fields: List[str],
logical_plan: Optional[meta.LogicalPlan] = None
) -> Storage: # pylint: disable=unused-argument
"""Create a new empty storage.
Args:
location: the directory path to the storage.
schema: the schema of the storage.
primary_keys: un-enforced primary keys.
record_fields: fields stored in row format (ArrayRecord).
logical_plan: logical plan of materialized view.
"""
# TODO: to verify that location is an empty directory.
# TODO: to verify primary key fields and record_fields (and types) are
Expand All @@ -122,6 +134,10 @@ def create(
current_snapshot_id=_INIT_SNAPSHOT_ID,
type=meta.StorageMetadata.DATASET)

if logical_plan is not None:
metadata.type = meta.StorageMetadata.MATERIALIZED_VIEW
metadata.logical_plan.CopyFrom(logical_plan)

new_metadata_path = paths.new_metadata_path(paths.metadata_dir(location))

snapshot = meta.Snapshot(snapshot_id=_INIT_SNAPSHOT_ID, create_time=now)
Expand Down
Loading

0 comments on commit 745ba9a

Please sign in to comment.