diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 0a66064..66b07f5 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -25,10 +25,9 @@ from space.core.runners import LocalRunner from space.core.storage import Storage from space.core.transform.plans import LogicalPlanBuilder -from space.core.utils.lazy_imports_utils import ray +from space.core.utils.lazy_imports_utils import ray, ray_runners from space.core.views import View from space.ray.options import RayOptions -from space.ray.runners import RayReadWriterRunner class Dataset(View): @@ -120,8 +119,10 @@ def ray_dataset( """Return a Ray dataset for a Space dataset.""" return self._storage.ray_dataset(read_options) - def ray(self, - file_options: Optional[FileOptions] = None, - ray_options: Optional[RayOptions] = None) -> RayReadWriterRunner: + def ray( + self, + file_options: Optional[FileOptions] = None, + ray_options: Optional[RayOptions] = None + ) -> ray_runners.RayReadWriterRunner: """Get a Ray runner.""" - return RayReadWriterRunner(self, file_options, ray_options) + return ray_runners.RayReadWriterRunner(self, file_options, ray_options) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 146fa73..74fa39d 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -40,10 +40,9 @@ from space.core.schema import substrait as substrait_schema from space.core.schema.utils import validate_logical_schema from space.core.utils import errors, paths -from space.core.utils.lazy_imports_utils import ray +from space.core.utils.lazy_imports_utils import ray, ray_data_sources from space.core.utils.protos import proto_now from space.core.utils.uuids import uuid_ -from space.ray.data_sources import SpaceDataSource Version: TypeAlias = Union[str, int] @@ -367,7 +366,7 @@ def versions(self) -> pa.Table: def ray_dataset(self, read_options: ReadOptions) -> ray.Dataset: """Return a Ray dataset for a Space storage.""" - return ray.data.read_datasource(SpaceDataSource(), + return ray.data.read_datasource(ray_data_sources.SpaceDataSource(), storage=self, read_options=read_options) diff --git a/python/src/space/core/utils/lazy_imports_utils.py b/python/src/space/core/utils/lazy_imports_utils.py index 9285dd1..a48af56 100644 --- a/python/src/space/core/utils/lazy_imports_utils.py +++ b/python/src/space/core/utils/lazy_imports_utils.py @@ -192,4 +192,5 @@ def array_record_error_callback(**kwargs): with lazy_imports(): import ray # type: ignore[import-untyped] # pylint: disable=unused-import - from ray.data.datasource import datasource as ray_datasource # pylint: disable=unused-import + from space.ray import data_sources as ray_data_sources # pylint: disable=unused-import,cyclic-import + from space.ray import runners as ray_runners # pylint: disable=unused-import,cyclic-import diff --git a/python/src/space/core/views.py b/python/src/space/core/views.py index 973386d..2d0afb6 100644 --- a/python/src/space/core/views.py +++ b/python/src/space/core/views.py @@ -31,11 +31,10 @@ from space.core.storage import Storage from space.core.transform.plans import LogicalPlanBuilder, UserDefinedFn from space.core.utils import errors -from space.core.utils.lazy_imports_utils import ray # pylint: disable=unused-import +from space.core.utils.lazy_imports_utils import ray, ray_runners # pylint: disable=unused-import from space.core.utils.paths import UDF_DIR, metadata_dir from space.core.runners import LocalRunner from space.core.schema.utils import validate_logical_schema -from space.ray.runners import RayMaterializedViewRunner, RayReadOnlyRunner if TYPE_CHECKING: from space.core.datasets import Dataset @@ -96,9 +95,9 @@ def ray_dataset(self, read_options: ReadOptions, join_options: JoinOptions) -> ray.Dataset: """Return a Ray dataset for a Space view.""" - def ray(self) -> RayReadOnlyRunner: + def ray(self) -> ray_runners.RayReadOnlyRunner: """Return a Ray runner for the view.""" - return RayReadOnlyRunner(self) + return ray_runners.RayReadOnlyRunner(self) def materialize(self, location: str) -> MaterializedView: """Materialize a view to files in the Space storage format. @@ -262,9 +261,10 @@ def view(self) -> View: def ray( self, - file_options: Optional[FileOptions] = None) -> RayMaterializedViewRunner: + file_options: Optional[FileOptions] = None + ) -> ray_runners.RayMaterializedViewRunner: """Return a Ray runner for the materialized view.""" - return RayMaterializedViewRunner(self, file_options) + return ray_runners.RayMaterializedViewRunner(self, file_options) def local(self) -> LocalRunner: """Get a runner that runs operations locally. diff --git a/python/src/space/ray/data_sources.py b/python/src/space/ray/data_sources.py index 3396926..a6d6079 100644 --- a/python/src/space/ray/data_sources.py +++ b/python/src/space/ray/data_sources.py @@ -17,39 +17,41 @@ from __future__ import annotations from typing import Any, Dict, List, Optional, TYPE_CHECKING +from ray.data.block import Block, BlockMetadata +from ray.data.datasource.datasource import Datasource, Reader, ReadTask +from ray.data.datasource.datasource import WriteResult +from ray.types import ObjectRef + from space.core.ops.read import FileSetReadOp from space.core.options import ReadOptions import space.core.proto.metadata_pb2 as meta import space.core.proto.runtime_pb2 as rt -from space.core.utils.lazy_imports_utils import ray, ray_datasource if TYPE_CHECKING: from space.core.storage import Storage -class SpaceDataSource(ray_datasource.Datasource): +class SpaceDataSource(Datasource): """A Ray data source for a Space dataset.""" # pylint: disable=arguments-differ,too-many-arguments def create_reader( # type: ignore[override] - self, storage: Storage, - read_options: ReadOptions) -> ray_datasource.Reader: + self, storage: Storage, read_options: ReadOptions) -> Reader: return _SpaceDataSourceReader(storage, read_options) - def do_write( - self, blocks: List[ray.types.ObjectRef[ray.data.block.Block]], - metadata: List[ray.data.block.BlockMetadata], - ray_remote_args: Optional[Dict[str, Any]], - location: str) -> List[ray.types.ObjectRef[ray_datasource.WriteResult]]: + def do_write(self, blocks: List[ObjectRef[Block]], + metadata: List[BlockMetadata], + ray_remote_args: Optional[Dict[str, Any]], + location: str) -> List[ObjectRef[WriteResult]]: """Write a Ray dataset into Space datasets.""" raise NotImplementedError("Write from a Ray dataset is not supported") def on_write_complete( # type: ignore[override] - self, write_results: List[ray_datasource.WriteResult]) -> None: + self, write_results: List[WriteResult]) -> None: raise NotImplementedError("Write from a Ray dataset is not supported") -class _SpaceDataSourceReader(ray_datasource.Reader): +class _SpaceDataSourceReader(Reader): def __init__(self, storage: Storage, read_options: ReadOptions): self._storage = storage @@ -64,8 +66,8 @@ def estimate_inmemory_data_size(self) -> Optional[int]: # Note: The `parallelism` which is supposed to indicate how many `ReadTask` to # return will have no effect here, since we map each query into a `ReadTask`. # TODO: to properly handle the error that returned list is empty. - def get_read_tasks(self, parallelism: int) -> List[ray_datasource.ReadTask]: - read_tasks: List[ray_datasource.ReadTask] = [] + def get_read_tasks(self, parallelism: int) -> List[ReadTask]: + read_tasks: List[ReadTask] = [] file_set = self._storage.data_files(self._read_options.filter_, self._read_options.snapshot_id) @@ -75,7 +77,7 @@ def get_read_tasks(self, parallelism: int) -> List[ray_datasource.ReadTask]: # The metadata about the block that we know prior to actually executing # the read task. # TODO: to populate the storage values. - block_metadata = ray.data.block.BlockMetadata( + block_metadata = BlockMetadata( num_rows=1, size_bytes=1, schema=None, @@ -88,7 +90,7 @@ def _read_fn(location=self._storage.location, file_set=task_file_set): return _read_file_set(location, metadata, file_set, self._read_options) - read_tasks.append(ray_datasource.ReadTask(_read_fn, block_metadata)) + read_tasks.append(ReadTask(_read_fn, block_metadata)) return read_tasks @@ -97,5 +99,5 @@ def _read_fn(location=self._storage.location, # whether row group granularity is needed. def _read_file_set(location: str, metadata: meta.StorageMetadata, file_set: rt.FileSet, - read_options: ReadOptions) -> List[ray.data.block.Block]: + read_options: ReadOptions) -> List[Block]: return list(FileSetReadOp(location, metadata, file_set, read_options)) diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py index 53e20ad..ce87144 100644 --- a/python/src/space/ray/ops/append.py +++ b/python/src/space/ray/ops/append.py @@ -18,6 +18,7 @@ from typing import List, Optional import pyarrow as pa +import ray from space.core.ops import utils from space.core.ops.utils import FileOptions @@ -25,7 +26,6 @@ from space.core.ops.base import InputData, InputIteratorFn from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt -from space.core.utils.lazy_imports_utils import ray from space.ray.options import RayOptions diff --git a/python/src/space/ray/ops/delete.py b/python/src/space/ray/ops/delete.py index 28613b9..7a12316 100644 --- a/python/src/space/ray/ops/delete.py +++ b/python/src/space/ray/ops/delete.py @@ -18,6 +18,7 @@ from typing import Optional import pyarrow.compute as pc +import ray from space.core.ops import utils from space.core.ops.utils import FileOptions @@ -25,7 +26,6 @@ from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.storage import Storage -from space.core.utils.lazy_imports_utils import ray from space.core.utils.paths import StoragePathsMixin @@ -63,8 +63,8 @@ def delete(self) -> Optional[rt.Patch]: @ray.remote -def _delete(location: str, metadata: meta.StorageMetadata, - file_set: rt.FileSet, filter_: pc.Expression, +def _delete(location: str, metadata: meta.StorageMetadata, file_set: rt.FileSet, + filter_: pc.Expression, file_options: FileOptions) -> Optional[rt.Patch]: return FileSetDeleteOp(location, metadata, file_set, filter_, file_options).delete() diff --git a/python/src/space/ray/ops/insert.py b/python/src/space/ray/ops/insert.py index 5b87809..56c59ff 100644 --- a/python/src/space/ray/ops/insert.py +++ b/python/src/space/ray/ops/insert.py @@ -18,6 +18,7 @@ import pyarrow as pa import pyarrow.compute as pc +import ray from space.ray.ops.append import RayAppendOp from space.core.ops.insert import InsertOptions, LocalInsertOp @@ -27,7 +28,6 @@ import space.core.proto.runtime_pb2 as rt from space.core.storage import Storage from space.core.utils import errors -from space.core.utils.lazy_imports_utils import ray from space.ray.options import RayOptions diff --git a/python/src/space/ray/ops/join.py b/python/src/space/ray/ops/join.py index bbc6076..7769feb 100644 --- a/python/src/space/ray/ops/join.py +++ b/python/src/space/ray/ops/join.py @@ -20,6 +20,7 @@ import pyarrow as pa import pyarrow.compute as pc +import ray from space.core.options import JoinOptions, Range, ReadOptions from space.core.schema import arrow @@ -28,7 +29,6 @@ row_id_field_name) import space.core.transform.utils as transform_utils from space.core.utils import errors -from space.core.utils.lazy_imports_utils import ray from space.ray.ops.utils import singleton_storage if TYPE_CHECKING: diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index f4cd7b5..00df785 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -21,6 +21,7 @@ import pyarrow as pa import pyarrow.compute as pc +import ray from space.core.jobs import JobResult from space.core.loaders.array_record import ArrayRecordIndexFn @@ -36,7 +37,6 @@ from space.core.options import JoinOptions, ReadOptions import space.core.proto.runtime_pb2 as rt from space.core.utils import errors -from space.core.utils.lazy_imports_utils import ray from space.ray.ops.append import RayAppendOp from space.ray.ops.delete import RayDeleteOp from space.ray.ops.insert import RayInsertOp