Skip to content

Commit

Permalink
Use lazy import for all Ray imports (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhou Fang authored Jan 23, 2024
1 parent 34de328 commit 9df737f
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 39 deletions.
13 changes: 7 additions & 6 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
from space.core.runners import LocalRunner
from space.core.storage import Storage
from space.core.transform.plans import LogicalPlanBuilder
from space.core.utils.lazy_imports_utils import ray
from space.core.utils.lazy_imports_utils import ray, ray_runners
from space.core.views import View
from space.ray.options import RayOptions
from space.ray.runners import RayReadWriterRunner


class Dataset(View):
Expand Down Expand Up @@ -120,8 +119,10 @@ def ray_dataset(
"""Return a Ray dataset for a Space dataset."""
return self._storage.ray_dataset(read_options)

def ray(self,
file_options: Optional[FileOptions] = None,
ray_options: Optional[RayOptions] = None) -> RayReadWriterRunner:
def ray(
self,
file_options: Optional[FileOptions] = None,
ray_options: Optional[RayOptions] = None
) -> ray_runners.RayReadWriterRunner:
"""Get a Ray runner."""
return RayReadWriterRunner(self, file_options, ray_options)
return ray_runners.RayReadWriterRunner(self, file_options, ray_options)
5 changes: 2 additions & 3 deletions python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@
from space.core.schema import substrait as substrait_schema
from space.core.schema.utils import validate_logical_schema
from space.core.utils import errors, paths
from space.core.utils.lazy_imports_utils import ray
from space.core.utils.lazy_imports_utils import ray, ray_data_sources
from space.core.utils.protos import proto_now
from space.core.utils.uuids import uuid_
from space.ray.data_sources import SpaceDataSource

Version: TypeAlias = Union[str, int]

Expand Down Expand Up @@ -367,7 +366,7 @@ def versions(self) -> pa.Table:

def ray_dataset(self, read_options: ReadOptions) -> ray.Dataset:
"""Return a Ray dataset for a Space storage."""
return ray.data.read_datasource(SpaceDataSource(),
return ray.data.read_datasource(ray_data_sources.SpaceDataSource(),
storage=self,
read_options=read_options)

Expand Down
3 changes: 2 additions & 1 deletion python/src/space/core/utils/lazy_imports_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,5 @@ def array_record_error_callback(**kwargs):

with lazy_imports():
import ray # type: ignore[import-untyped] # pylint: disable=unused-import
from ray.data.datasource import datasource as ray_datasource # pylint: disable=unused-import
from space.ray import data_sources as ray_data_sources # pylint: disable=unused-import,cyclic-import
from space.ray import runners as ray_runners # pylint: disable=unused-import,cyclic-import
12 changes: 6 additions & 6 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
from space.core.storage import Storage
from space.core.transform.plans import LogicalPlanBuilder, UserDefinedFn
from space.core.utils import errors
from space.core.utils.lazy_imports_utils import ray # pylint: disable=unused-import
from space.core.utils.lazy_imports_utils import ray, ray_runners # pylint: disable=unused-import
from space.core.utils.paths import UDF_DIR, metadata_dir
from space.core.runners import LocalRunner
from space.core.schema.utils import validate_logical_schema
from space.ray.runners import RayMaterializedViewRunner, RayReadOnlyRunner

if TYPE_CHECKING:
from space.core.datasets import Dataset
Expand Down Expand Up @@ -96,9 +95,9 @@ def ray_dataset(self, read_options: ReadOptions,
join_options: JoinOptions) -> ray.Dataset:
"""Return a Ray dataset for a Space view."""

def ray(self) -> RayReadOnlyRunner:
def ray(self) -> ray_runners.RayReadOnlyRunner:
"""Return a Ray runner for the view."""
return RayReadOnlyRunner(self)
return ray_runners.RayReadOnlyRunner(self)

def materialize(self, location: str) -> MaterializedView:
"""Materialize a view to files in the Space storage format.
Expand Down Expand Up @@ -262,9 +261,10 @@ def view(self) -> View:

def ray(
self,
file_options: Optional[FileOptions] = None) -> RayMaterializedViewRunner:
file_options: Optional[FileOptions] = None
) -> ray_runners.RayMaterializedViewRunner:
"""Return a Ray runner for the materialized view."""
return RayMaterializedViewRunner(self, file_options)
return ray_runners.RayMaterializedViewRunner(self, file_options)

def local(self) -> LocalRunner:
"""Get a runner that runs operations locally.
Expand Down
34 changes: 18 additions & 16 deletions python/src/space/ray/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,41 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, TYPE_CHECKING

from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import Datasource, Reader, ReadTask
from ray.data.datasource.datasource import WriteResult
from ray.types import ObjectRef

from space.core.ops.read import FileSetReadOp
from space.core.options import ReadOptions
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as rt
from space.core.utils.lazy_imports_utils import ray, ray_datasource

if TYPE_CHECKING:
from space.core.storage import Storage


class SpaceDataSource(ray_datasource.Datasource):
class SpaceDataSource(Datasource):
"""A Ray data source for a Space dataset."""

# pylint: disable=arguments-differ,too-many-arguments
def create_reader( # type: ignore[override]
self, storage: Storage,
read_options: ReadOptions) -> ray_datasource.Reader:
self, storage: Storage, read_options: ReadOptions) -> Reader:
return _SpaceDataSourceReader(storage, read_options)

def do_write(
self, blocks: List[ray.types.ObjectRef[ray.data.block.Block]],
metadata: List[ray.data.block.BlockMetadata],
ray_remote_args: Optional[Dict[str, Any]],
location: str) -> List[ray.types.ObjectRef[ray_datasource.WriteResult]]:
def do_write(self, blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
ray_remote_args: Optional[Dict[str, Any]],
location: str) -> List[ObjectRef[WriteResult]]:
"""Write a Ray dataset into Space datasets."""
raise NotImplementedError("Write from a Ray dataset is not supported")

def on_write_complete( # type: ignore[override]
self, write_results: List[ray_datasource.WriteResult]) -> None:
self, write_results: List[WriteResult]) -> None:
raise NotImplementedError("Write from a Ray dataset is not supported")


class _SpaceDataSourceReader(ray_datasource.Reader):
class _SpaceDataSourceReader(Reader):

def __init__(self, storage: Storage, read_options: ReadOptions):
self._storage = storage
Expand All @@ -64,8 +66,8 @@ def estimate_inmemory_data_size(self) -> Optional[int]:
# Note: The `parallelism` which is supposed to indicate how many `ReadTask` to
# return will have no effect here, since we map each query into a `ReadTask`.
# TODO: to properly handle the error that returned list is empty.
def get_read_tasks(self, parallelism: int) -> List[ray_datasource.ReadTask]:
read_tasks: List[ray_datasource.ReadTask] = []
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
read_tasks: List[ReadTask] = []
file_set = self._storage.data_files(self._read_options.filter_,
self._read_options.snapshot_id)

Expand All @@ -75,7 +77,7 @@ def get_read_tasks(self, parallelism: int) -> List[ray_datasource.ReadTask]:
# The metadata about the block that we know prior to actually executing
# the read task.
# TODO: to populate the storage values.
block_metadata = ray.data.block.BlockMetadata(
block_metadata = BlockMetadata(
num_rows=1,
size_bytes=1,
schema=None,
Expand All @@ -88,7 +90,7 @@ def _read_fn(location=self._storage.location,
file_set=task_file_set):
return _read_file_set(location, metadata, file_set, self._read_options)

read_tasks.append(ray_datasource.ReadTask(_read_fn, block_metadata))
read_tasks.append(ReadTask(_read_fn, block_metadata))

return read_tasks

Expand All @@ -97,5 +99,5 @@ def _read_fn(location=self._storage.location,
# whether row group granularity is needed.
def _read_file_set(location: str, metadata: meta.StorageMetadata,
file_set: rt.FileSet,
read_options: ReadOptions) -> List[ray.data.block.Block]:
read_options: ReadOptions) -> List[Block]:
return list(FileSetReadOp(location, metadata, file_set, read_options))
2 changes: 1 addition & 1 deletion python/src/space/ray/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
from typing import List, Optional

import pyarrow as pa
import ray

from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.append import BaseAppendOp, LocalAppendOp
from space.core.ops.base import InputData, InputIteratorFn
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.utils.lazy_imports_utils import ray
from space.ray.options import RayOptions


Expand Down
6 changes: 3 additions & 3 deletions python/src/space/ray/ops/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
from typing import Optional

import pyarrow.compute as pc
import ray

from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.delete import BaseDeleteOp, FileSetDeleteOp
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.storage import Storage
from space.core.utils.lazy_imports_utils import ray
from space.core.utils.paths import StoragePathsMixin


Expand Down Expand Up @@ -63,8 +63,8 @@ def delete(self) -> Optional[rt.Patch]:


@ray.remote
def _delete(location: str, metadata: meta.StorageMetadata,
file_set: rt.FileSet, filter_: pc.Expression,
def _delete(location: str, metadata: meta.StorageMetadata, file_set: rt.FileSet,
filter_: pc.Expression,
file_options: FileOptions) -> Optional[rt.Patch]:
return FileSetDeleteOp(location, metadata, file_set, filter_,
file_options).delete()
2 changes: 1 addition & 1 deletion python/src/space/ray/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import pyarrow as pa
import pyarrow.compute as pc
import ray

from space.ray.ops.append import RayAppendOp
from space.core.ops.insert import InsertOptions, LocalInsertOp
Expand All @@ -27,7 +28,6 @@
import space.core.proto.runtime_pb2 as rt
from space.core.storage import Storage
from space.core.utils import errors
from space.core.utils.lazy_imports_utils import ray
from space.ray.options import RayOptions


Expand Down
2 changes: 1 addition & 1 deletion python/src/space/ray/ops/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pyarrow as pa
import pyarrow.compute as pc
import ray

from space.core.options import JoinOptions, Range, ReadOptions
from space.core.schema import arrow
Expand All @@ -28,7 +29,6 @@
row_id_field_name)
import space.core.transform.utils as transform_utils
from space.core.utils import errors
from space.core.utils.lazy_imports_utils import ray
from space.ray.ops.utils import singleton_storage

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/ray/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import pyarrow as pa
import pyarrow.compute as pc
import ray

from space.core.jobs import JobResult
from space.core.loaders.array_record import ArrayRecordIndexFn
Expand All @@ -36,7 +37,6 @@
from space.core.options import JoinOptions, ReadOptions
import space.core.proto.runtime_pb2 as rt
from space.core.utils import errors
from space.core.utils.lazy_imports_utils import ray
from space.ray.ops.append import RayAppendOp
from space.ray.ops.delete import RayDeleteOp
from space.ray.ops.insert import RayInsertOp
Expand Down

0 comments on commit 9df737f

Please sign in to comment.