From b6bef553402cbd32771630689b3a05b6255c712a Mon Sep 17 00:00:00 2001 From: Eugene M Date: Mon, 12 Aug 2024 16:51:43 -0400 Subject: [PATCH] resolve conflict FIX: Recursion error when pickling with dill TST: Initial tests for zarr endpoints Clean, refactor, and lint TST: tests for arrays and tables TST: tests for arrays and tables ENH: restructure demo examples ENH: (partial) support for StructDtype TST: fix tests ENH: support for datetime types --- alembic.ini | 116 ++++++++++++ tiled/_tests/test_zarr.py | 301 ++++++++++++++++++++++++++++++++ tiled/adapters/sparse.py | 2 +- tiled/adapters/zarr.py | 33 ++-- tiled/client/container.py | 2 +- tiled/examples/generated.py | 100 ++++++----- tiled/server/app.py | 30 ++-- tiled/server/dependencies.py | 1 - tiled/server/pydantic_sparse.py | 2 +- tiled/server/router.py | 1 - tiled/server/schemas.py | 4 + tiled/server/utils.py | 2 +- tiled/server/zarr.py | 214 +++++++++++++---------- tiled/structures/sparse.py | 1 + 14 files changed, 631 insertions(+), 178 deletions(-) create mode 100644 alembic.ini create mode 100644 tiled/_tests/test_zarr.py diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 000000000..e7e80abc8 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,116 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# Use forward slashes (/) also on windows to provide an os agnostic path +script_location = /Users/eugene/code/tiled/tiled/catalog/migrations + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to catalog/migrations/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:catalog/migrations/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# sqlalchemy.url = driver://user:pass@localhost/dbname +sqlalchemy.url = sqlite+aiosqlite:////Users/eugene/code/demo_stream_documents/catalog/catalog.db + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/tiled/_tests/test_zarr.py b/tiled/_tests/test_zarr.py new file mode 100644 index 000000000..c3fa49256 --- /dev/null +++ b/tiled/_tests/test_zarr.py @@ -0,0 +1,301 @@ +import contextlib +import math +import string +import threading +import time +import warnings + +import dask.array +import numpy +import pytest +import numpy +import pandas.testing +import uvicorn +import zarr +from fsspec.implementations.http import HTTPFileSystem +from httpx import ASGITransport, AsyncClient +from starlette.status import HTTP_200_OK, HTTP_404_NOT_FOUND + +from ..adapters.array import ArrayAdapter +from ..adapters.dataframe import DataFrameAdapter +from ..adapters.mapping import MapAdapter +from ..server.app import build_app + +rng = numpy.random.default_rng(seed=42) +array_cases = { + "dtype_b": (numpy.arange(10) % 2).astype("b"), + "dtype_i": numpy.arange(-10, 10, dtype="i"), + "dtype_uint8": numpy.arange(10, dtype="uint8"), + "dtype_uint16": numpy.arange(10, dtype="uint16"), + "dtype_uint64": numpy.arange(10, dtype="uint64"), + "dtype_f": numpy.arange(10, dtype="f"), + "dtype_c": (numpy.arange(10) * 1j).astype("c"), + "dtype_S": numpy.array([letter * 3 for letter in string.ascii_letters], dtype="S3"), + "dtype_U": numpy.array([letter * 3 for letter in string.ascii_letters], dtype="U3"), + "dtype_m": numpy.array(['2007-07-13', '2006-01-13', '2010-08-13'], dtype='datetime64') - numpy.datetime64('2008-01-01'), + "dtype_M": numpy.array(['2007-07-13', '2006-01-13', '2010-08-13'], dtype='datetime64'), + "random_2d": rng.random((10, 10)), +} +# TODO bitfield "t", void "v", and object "O" (which is not supported by default) +scalar_cases = { + k: numpy.array(v[0], dtype=v.dtype) + for k, v in array_cases.items() + if k.startswith("dtype_") +} +for v in scalar_cases.values(): + assert v.shape == () +array_tree = MapAdapter({k: ArrayAdapter.from_array(v) for k, v in array_cases.items()}) +scalar_tree = MapAdapter( + {k: ArrayAdapter.from_array(v) for k, v in scalar_cases.items()} +) + +cube_cases = { + "tiny_cube": rng.random((10, 10, 10)), + "tiny_hypercube": rng.random((10, 10, 10, 10, 10)), +} +cube_tree = MapAdapter({k: ArrayAdapter.from_array(v) for k, v in cube_cases.items()}) +arr_with_inf = numpy.array([0, 1, numpy.nan, -numpy.inf, numpy.inf]) +inf_tree = MapAdapter( + { + "example": ArrayAdapter.from_array( + arr_with_inf, + metadata={"infinity": math.inf, "-infinity": -math.inf, "nan": numpy.nan}, + ) + }, + metadata={"infinity": math.inf, "-infinity": -math.inf, "nan": numpy.nan}, +) +arr_with_zero_dim = numpy.array([]).reshape((0, 100, 1, 10)) +# Suppress RuntimeWarning: divide by zero encountered in true_divide from dask.array.core. +with warnings.catch_warnings(): + zero_tree = MapAdapter( + { + "example": ArrayAdapter.from_array( + dask.array.from_array(arr_with_zero_dim, chunks=arr_with_zero_dim.shape) + ) + } + ) +df = pandas.DataFrame( + { + "x": rng.random(size=10, dtype='float64'), + "y": rng.integers(10, size=10, dtype='uint'), + "z": rng.integers(-10, 10, size=10, dtype='int64'), + } + ) +table_tree = MapAdapter( + { + # a dataframe divided into three partitions + "divided": DataFrameAdapter.from_pandas(df, npartitions=3), + # a dataframe with just one partition + "single": DataFrameAdapter.from_pandas(df, npartitions=1), + } +) + +tree = MapAdapter( + { + "nested": MapAdapter({"array": array_tree, "cube": cube_tree}), + "inf": inf_tree, + "scalar": scalar_tree, + "zero": zero_tree, + "table": table_tree, + "random_2d": array_tree["random_2d"], + } +) + + +def traverse_tree(tree, parent='', result = None): + result = result or {} + for key, val in tree.items(): + if isinstance(val, ArrayAdapter): + result.update({f'{parent}/{key}' : 'array'}) + elif isinstance(val, DataFrameAdapter): + result.update({f'{parent}/{key}' : 'group'}) + for col, _ in val.items(): + result.update({f'{parent}/{key}/{col}' : 'array'}) + else: + result.update({f'{parent}/{key}' : 'group'}) + traverse_tree(val, parent=f'{parent}/{key}', result=result) + return result + + +@pytest.fixture(scope="module") +def app(): + app = build_app(tree, authentication={"single_user_api_key": "secret"}) + return app + + +class ThreadedServer(uvicorn.Server): + @contextlib.contextmanager + def run_in_thread(self): + thread = threading.Thread(target=self.run) + thread.start() + try: + while not self.started: + time.sleep(1e-3) + self.port = ( + self.servers[0].sockets[0].getsockname()[1] + ) # Actual port number + yield + finally: + self.should_exit = True + thread.join() + + +@pytest.fixture(scope="module") +def server(app): + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="info") + server = ThreadedServer(config) + with server.run_in_thread(): + yield server + + +@pytest.fixture(scope="module") +def fs(): + headers = {"Authorization": "Apikey secret", "Content-Type": "application/json"} + fs = HTTPFileSystem(client_kwargs={"headers": headers}) + return fs + + +@pytest.mark.parametrize("path", ["/zarr/v2/", "/zarr/v2", "/zarr/v2/nested", "/zarr/v2/table/single"]) +@pytest.mark.asyncio +async def test_zarr_group_routes(path, app): + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + headers={"Authorization": "Apikey secret"}, + follow_redirects=True, + ) as client: + response = await client.get(path) + assert response.status_code == HTTP_200_OK + + response = await client.get(path + '/.zarray') + assert response.status_code == HTTP_404_NOT_FOUND + + response = await client.get(path + '/.zgroup') + assert response.status_code == HTTP_200_OK + + +@pytest.mark.parametrize("path", ["/zarr/v2/nested/cube/tiny_cube", "/zarr/v2/table/single/x"]) +@pytest.mark.asyncio +async def test_zarr_array_routes(path, app): + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + headers={"Authorization": "Apikey secret"}, + follow_redirects=True, + ) as client: + response = await client.get(path) + assert response.status_code == HTTP_200_OK + + response = await client.get(path + '/.zgroup') + assert response.status_code == HTTP_404_NOT_FOUND + + response = await client.get(path + '/.zarray') + assert response.status_code == HTTP_200_OK + + ndim = len(response.json().get('shape')) + indx = '.'.join( ['0']*max(ndim, 0) ) + response = await client.get(path + f'/{indx}') + assert response.status_code == HTTP_200_OK + +def test_zarr_integration(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/" + grp = zarr.open(fs.get_mapper(url), mode="r") + + assert grp.store.fs == fs + assert set(grp.keys()) == set(tree.keys()) + assert len(set(grp.group_keys())) == 5 + assert len(set(grp.array_keys())) == 1 + + +@pytest.mark.parametrize( + "suffix, path", + [ + ("", "random_2d"), + ("", "nested/array/random_2d"), + ("nested", "array/random_2d"), + ("nested/array", "random_2d"), + ("nested/array/random_2d", ""), + ], +) +@pytest.mark.parametrize("slash", ["", "/"]) +def test_zarr_groups(suffix, path, slash, server, fs): + expected = array_cases["random_2d"] + url = f"http://localhost:{server.port}/zarr/v2/{suffix}{slash}" + arr = zarr.open(fs.get_mapper(url), mode="r") + if path: + arr = arr[path] + assert numpy.array_equal(arr[...], expected) + + +@pytest.mark.parametrize("kind", list(array_cases)) +def test_array_dtypes(kind, server, fs): + expected = array_cases[kind] + url = f"http://localhost:{server.port}/zarr/v2/nested/array" + grp = zarr.open(fs.get_mapper(url), mode="r") + actual = grp[kind][...] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("kind", list(scalar_cases)) +def test_scalar_dtypes(kind, server, fs): + expected = scalar_cases[kind] + url = f"http://localhost:{server.port}/zarr/v2/scalar" + grp = zarr.open(fs.get_mapper(url), mode="r") + actual = grp[kind][...] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("kind", list(cube_cases)) +def test_cube_cases(kind, server, fs): + expected = cube_cases[kind] + url = f"http://localhost:{server.port}/zarr/v2/nested/cube" + grp = zarr.open(fs.get_mapper(url), mode="r") + actual = grp[kind][...] + assert numpy.array_equal(actual, expected) + + +def test_infinity(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/inf/example" + actual = zarr.open(fs.get_mapper(url), mode="r")[...] + mask = numpy.isnan(arr_with_inf) + assert numpy.array_equal(actual[~mask], arr_with_inf[~mask]) + assert numpy.isnan(actual[mask]).all() + + +def test_shape_with_zero(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/zero/example" + actual = zarr.open(fs.get_mapper(url), mode="r")[...] + assert numpy.array_equal(actual, arr_with_zero_dim) + + +def test_dataframe_group(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/table" + grp = zarr.open(fs.get_mapper(url), mode="r") + assert set(grp.keys()) == set(table_tree.keys()) + + for key in grp.keys(): + for col in grp[key].keys(): + actual = grp[key][col][...] + expected = df[col] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("key", list(table_tree.keys())) +def test_dataframe_single(key, server, fs): + url = f"http://localhost:{server.port}/zarr/v2/table/{key}" + grp = zarr.open(fs.get_mapper(url), mode="r") + + for col in df.columns: + actual = grp[col][...] + expected = df[col] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("key", list(table_tree.keys())) +def test_dataframe_column(key, server, fs): + for col in df.columns: + url = f"http://localhost:{server.port}/zarr/v2/table/{key}/{col}" + arr = zarr.open(fs.get_mapper(url), mode="r") + actual = arr[...] + expected = df[col] + assert numpy.array_equal(actual, expected) diff --git a/tiled/adapters/sparse.py b/tiled/adapters/sparse.py index 5aa53a90c..c6771f942 100644 --- a/tiled/adapters/sparse.py +++ b/tiled/adapters/sparse.py @@ -6,9 +6,9 @@ import sparse from numpy._typing import NDArray +from ..structures.array import BuiltinDtype from ..structures.core import Spec, StructureFamily from ..structures.sparse import COOStructure -from ..structures.array import BuiltinDtype from .array import slice_and_shape_from_block_and_chunks from .protocols import AccessPolicy from .type_alliases import JSON, NDSlice diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 7a914965c..e8761fecd 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -1,5 +1,4 @@ import builtins -import collections.abc import os import sys from typing import Any, Iterator, List, Optional, Tuple, Union @@ -19,6 +18,11 @@ from .protocols import AccessPolicy from .type_alliases import JSON, NDSlice +if sys.version_info < (3, 9): + from typing_extensions import Mapping as MappingType +else: + from collections.abc import Mapping as MappingType + INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) @@ -27,17 +31,17 @@ def read_zarr( structure: Optional[ArrayStructure] = None, **kwargs: Any, ) -> Union["ZarrGroupAdapter", ArrayAdapter]: - """ + """Create an adapter for zarr Group or Array Parameters ---------- - data_uri : - structure : - kwargs : + data_uri : location of the zarr resource, e.g. 'file://localhost/data/arr1' + structure : specification of the shape, chunks, and data type + kwargs : any kwargs accepted by ZarrGroupAdapter or ZarrArrayAdapter Returns ------- - + Initialized ZarrGroupAdapter or ZarrArrayAdapter. """ filepath = path_from_uri(data_uri) zarr_obj = zarr.open(filepath) # Group or Array @@ -91,14 +95,17 @@ def init_storage(cls, data_uri: str, structure: ArrayStructure) -> List[Asset]: ] def _stencil(self) -> Tuple[slice, ...]: - """ - Trims overflow because Zarr always has equal-sized chunks. + """Trims overflow because Zarr always has equal-sized chunks. + Returns ------- """ return tuple(builtins.slice(0, dim) for dim in self.structure().shape) + def get(self, key: str): + return None + def read( self, slice: NDSlice = ..., @@ -184,16 +191,6 @@ async def write_block( self._array[block_slice] = data -if sys.version_info < (3, 9): - from typing_extensions import Mapping - - MappingType = Mapping -else: - import collections - - MappingType = collections.abc.Mapping - - class ZarrGroupAdapter( MappingType[str, Union["ArrayAdapter", "ZarrGroupAdapter"]], IndexersMixin, diff --git a/tiled/client/container.py b/tiled/client/container.py index 51b466ba4..2434185ee 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -886,8 +886,8 @@ def write_sparse( >>> x.write_block(coords=[[2, 4]], data=[3.1, 2.8], block=(0,)) >>> x.write_block(coords=[[0, 1]], data=[6.7, 1.2], block=(1,)) """ - from ..structures.sparse import COOStructure from ..structures.array import BuiltinDtype + from ..structures.sparse import COOStructure structure = COOStructure( shape=shape, diff --git a/tiled/examples/generated.py b/tiled/examples/generated.py index 4fa76f0c1..2d296fd87 100644 --- a/tiled/examples/generated.py +++ b/tiled/examples/generated.py @@ -17,24 +17,25 @@ from tiled.adapters.xarray import DatasetAdapter print("Generating large example data...", file=sys.stderr) +rng = numpy.random.default_rng(seed=42) data = { - "big_image": numpy.random.random((10_000, 10_000)), - "small_image": numpy.random.random((300, 300)), - "medium_image": numpy.random.random((1000, 1000)), - "tiny_image": numpy.random.random((50, 50)), - "tiny_cube": numpy.random.random((50, 50, 50)), - "tiny_hypercube": numpy.random.random((50, 50, 50, 50, 50)), - "high_entropy": numpy.random.random((100, 100)), - "low_entropy": numpy.ones((100, 100)), - "short_column": numpy.random.random(100), - "tiny_column": numpy.random.random(10), - "long_column": numpy.random.random(100_000), + "big_image": rng.random((10_000, 10_000)), + "small_image": rng.random((300, 300)), + "medium_image": rng.random((1000, 1000)), + "tiny_image": rng.random((50, 50)), + "tiny_cube": rng.random((50, 50, 50)), + "tiny_hypercube": rng.random((50, 50, 50, 50, 50)), + "high_entropy": rng.integers(-10, 10, size=(100, 100)), + "low_entropy": numpy.ones((100, 100), dtype='int32'), + "short_column": rng.integers(10, size=100, dtype=numpy.dtype('uint8')), + "tiny_column": rng.random(10), + "long_column": rng.random(100_000), } -temp = 15 + 8 * numpy.random.randn(2, 2, 3) -precip = 10 * numpy.random.rand(2, 2, 3) +temp = 15 + 8 * rng.normal(size=(2, 2, 3)) +precip = 10 * rng.uniform(size=(2, 2, 3)) lon = [[-99.83, -99.32], [-99.79, -99.23]] lat = [[42.25, 42.21], [42.63, 42.59]] -sparse_arr = numpy.random.random((100, 100)) +sparse_arr = rng.random((100, 100)) sparse_arr[sparse_arr < 0.9] = 0 # fill most of the array with zeros awkward_arr = awkward.Array( [[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}], [], [{"x": 3.3, "y": [1, 2, 3]}]] @@ -43,17 +44,39 @@ print("Done generating example data.", file=sys.stderr) mapping = { - "nested": MapAdapter( - {"small_image": ArrayAdapter.from_array(data["small_image"]), - "tiny_image": ArrayAdapter.from_array(data["tiny_image"]), - "inner": MapAdapter( - {"small_image": ArrayAdapter.from_array(data["small_image"]), - "tiny_image": ArrayAdapter.from_array(data["tiny_image"]), - }, - metadata = {"animal": "cat", "color": "green"}, + "scalars": MapAdapter( + { + "pi": ArrayAdapter.from_array(3.14159), + "e_arr": ArrayAdapter.from_array(["2.71828"]), + "fsc": ArrayAdapter.from_array("1/137"), + "fortytwo": ArrayAdapter.from_array(42), + }, + metadata={"numbers": "constants", "precision": 5}, ), - }, - metadata = {"animal": "cat", "color": "green"}, + "nested": MapAdapter( + { + "images": MapAdapter( + { + "tiny_image": ArrayAdapter.from_array(data["tiny_image"]), + "small_image": ArrayAdapter.from_array(data["small_image"]), + "medium_image": ArrayAdapter.from_array( + data["medium_image"], chunks=((250,) * 4, (100,) * 10) + ), + "big_image": ArrayAdapter.from_array(data["big_image"]), + }, + metadata={"animal": "cat", "color": "green"}, + ), + "cubes": MapAdapter( + { + "tiny_cube": ArrayAdapter.from_array(data["tiny_cube"]), + "tiny_hypercube": ArrayAdapter.from_array(data["tiny_hypercube"]), + }, + metadata={"animal": "dog", "color": "red"}, + ), + "sparse_image": COOAdapter.from_coo(sparse.COO(sparse_arr)), + "awkward_array": AwkwardAdapter.from_array(awkward_arr), + }, + metadata={"animal": "cat", "color": "green"}, ), "tables": MapAdapter( { @@ -64,7 +87,9 @@ "B": 2 * data["short_column"], "C": 3 * data["short_column"], }, - index=pandas.Index(numpy.arange(len(data["short_column"])), name="index"), + index=pandas.Index( + numpy.arange(len(data["short_column"])), name="index" + ), ), npartitions=1, metadata={"animal": "dog", "color": "red"}, @@ -76,7 +101,9 @@ "B": 2 * data["long_column"], "C": 3 * data["long_column"], }, - index=pandas.Index(numpy.arange(len(data["long_column"])), name="index"), + index=pandas.Index( + numpy.arange(len(data["long_column"])), name="index" + ), ), npartitions=5, metadata={"animal": "dog", "color": "green"}, @@ -87,28 +114,19 @@ letter: i * data["tiny_column"] for i, letter in enumerate(string.ascii_uppercase, start=1) }, - index=pandas.Index(numpy.arange(len(data["tiny_column"])), name="index"), + index=pandas.Index( + numpy.arange(len(data["tiny_column"])), name="index" + ), ), npartitions=1, metadata={"animal": "dog", "color": "red"}, ), } - ), - "big_image": ArrayAdapter.from_array(data["big_image"]), - "small_image": ArrayAdapter.from_array(data["small_image"]), - "medium_image": ArrayAdapter.from_array(data["medium_image"], chunks=((250, )*4, (100, )*10)), - "sparse_image": COOAdapter.from_coo(sparse.COO(sparse_arr)), - "awkward_array": AwkwardAdapter.from_array(awkward_arr), - "tiny_image": ArrayAdapter.from_array(data["tiny_image"]), - "tiny_cube": ArrayAdapter.from_array(data["tiny_cube"]), - "tiny_hypercube": ArrayAdapter.from_array(data["tiny_hypercube"]), + ), "structured_data": MapAdapter( { "pets": ArrayAdapter.from_array( - numpy.array( - [("Rex", 9, 81.0), ("Fido", 3, 27.0)], - dtype=[("name", "U10"), ("age", "i4"), ("weight", "f4")], - ) + numpy.array([("Rex", 9, 81.0), ("Fido", 3, 27.0)],dtype=[("name", "U10"), ("age", "i4"), ("weight", "f4")]) ), "xarray_dataset": DatasetAdapter.from_dataset( xarray.Dataset( @@ -126,7 +144,7 @@ }, metadata={"animal": "cat", "color": "green"}, ), - "flat_array": ArrayAdapter.from_array(numpy.random.random(100)), + "flat_array": ArrayAdapter.from_array(rng.random(100)), "low_entropy": ArrayAdapter.from_array(data["low_entropy"]), "high_entropy": ArrayAdapter.from_array(data["high_entropy"]), # Below, an asynchronous task modifies this value over time. diff --git a/tiled/server/app.py b/tiled/server/app.py index cb100cab4..4fd0ff4d1 100644 --- a/tiled/server/app.py +++ b/tiled/server/app.py @@ -3,15 +3,16 @@ import contextvars import logging import os +import re import secrets import sys import urllib.parse +import urllib.parse as urlparse import warnings from contextlib import asynccontextmanager from functools import lru_cache, partial from pathlib import Path -from typing import List -import re +from typing import Dict, List import anyio import packaging.version @@ -69,7 +70,7 @@ } CSRF_HEADER_NAME = "x-csrf" CSRF_QUERY_PARAMETER = "csrf" -ZARR_PREFIX = '/zarr/v2' +ZARR_PREFIX = "/zarr/v2" MINIMUM_SUPPORTED_PYTHON_CLIENT_VERSION = packaging.version.parse("0.1.0a104") @@ -432,11 +433,7 @@ async def unhandled_exception_handler( # opporunity to register custom query types before startup. app.get( "/api/v1/search/{path:path}", - response_model=schemas.Response[ - List[schemas.Resource[schemas.NodeAttributes, dict, dict]], - schemas.PaginationLinks, - dict, - ], + response_model=schemas.SearchResponse, )(patch_route_signature(search, query_registry)) app.get( "/api/v1/distinct/{path:path}", @@ -901,15 +898,16 @@ async def resolve_zarr_uris(request: Request, call_next): # safely encoded) if request.url.path.startswith(ZARR_PREFIX) and response.status_code == 404: # Extract the last bit of the path - zarr_path = request.url.path.removeprefix(ZARR_PREFIX).strip('/').split('/') - zarr_block = zarr_path[-1] if len(zarr_path) > 0 else '' - if re.compile(r'^(?:\d+\.)*\d+$').fullmatch(zarr_block): - # Create a query string if the last part is in the zarr block forma, e.g. `m.n.p. ... .q` - request.scope['query_string'] = f"block={zarr_block.replace('.', '%2C')}".encode() - request.scope['path'] = ZARR_PREFIX + '/' + '/'.join(zarr_path[:-1]) - response = await call_next(request) + zarr_path = request.url.path.removeprefix(ZARR_PREFIX).strip("/").split("/") + zarr_block = zarr_path[-1] if len(zarr_path) > 0 else "" + if re.compile(r"^(?:\d+\.)*\d+$").fullmatch(zarr_block): + # Create a query string if the last part is in the zarr block form, e.g. `m.n.p. ... .q` + query = dict(urlparse.parse_qsl(request.url.query)) + query.update({"block": zarr_block.replace(".", ",")}) + request.scope["query_string"] = urlparse.urlencode(query).encode() + request.scope["path"] = ZARR_PREFIX + "/" + "/".join(zarr_path[:-1]) - # TODO: Try compiling a single RE for matching and replacement -- possible speedup? + response = await call_next(request) response.__class__ = PatchedStreamingResponse # tolerate memoryview return response diff --git a/tiled/server/dependencies.py b/tiled/server/dependencies.py index f13c676fc..c756a80d3 100644 --- a/tiled/server/dependencies.py +++ b/tiled/server/dependencies.py @@ -73,7 +73,6 @@ async def inner( """ path_parts = [segment for segment in path.split("/") if segment] entry = root_tree - # If the entry/adapter can take a session state, pass it in. # The entry/adapter may return itself or a different object. if hasattr(entry, "with_session_state") and session_state: diff --git a/tiled/server/pydantic_sparse.py b/tiled/server/pydantic_sparse.py index 59883e94a..6c7d35e05 100644 --- a/tiled/server/pydantic_sparse.py +++ b/tiled/server/pydantic_sparse.py @@ -2,8 +2,8 @@ import pydantic -from ..structures.sparse import SparseLayout from ..structures.array import BuiltinDtype, StructDtype +from ..structures.sparse import SparseLayout class COOStructure(pydantic.BaseModel): diff --git a/tiled/server/router.py b/tiled/server/router.py index c469190d7..a81a2fea3 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -456,7 +456,6 @@ async def array_full( """ Fetch a slice of array-like data. """ - breakpoint() structure_family = entry.structure_family # Deferred import because this is not a required dependency of the server # for some use cases. diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index fa7f039e9..3cc757b84 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -567,4 +567,8 @@ class PatchMetadataResponse(pydantic.BaseModel, Generic[ResourceLinksT]): data_sources: Optional[List[DataSource]] +SearchResponse = Response[ + List[Resource[NodeAttributes, Dict, Dict]], PaginationLinks, Dict +] + NodeStructure.model_rebuild() diff --git a/tiled/server/utils.py b/tiled/server/utils.py index 29233e134..bc7e55d91 100644 --- a/tiled/server/utils.py +++ b/tiled/server/utils.py @@ -42,7 +42,7 @@ def get_base_url(request): return f"{get_root_url(request)}/api/v1" -def get_zarr_url(request, version: Literal['v2', 'v3'] = 'v2'): +def get_zarr_url(request, version: Literal["v2", "v3"] = "v2"): """ Base URL for the Zarr API """ diff --git a/tiled/server/zarr.py b/tiled/server/zarr.py index c6b991320..f62fd5d9b 100644 --- a/tiled/server/zarr.py +++ b/tiled/server/zarr.py @@ -1,5 +1,6 @@ import dataclasses import inspect +import json import os import re import warnings @@ -7,7 +8,6 @@ from functools import partial, wraps from pathlib import Path from typing import Any, List, Optional, Tuple -import json import anyio from fastapi import APIRouter, Body, Depends, HTTPException, Query, Request, Security @@ -31,9 +31,11 @@ from .. import __version__ from ..structures.core import Spec, StructureFamily +from ..structures.array import StructDtype from ..utils import ensure_awaitable, patch_mimetypes, path_from_uri from ..validation_registration import ValidationError -from . import schemas + +# from . import schemas from .authentication import Mode, get_authenticators, get_current_principal from .core import ( DEFAULT_PAGE_SIZE, @@ -66,29 +68,33 @@ from .utils import filter_for_access, get_base_url, record_timing ZARR_BLOCK_SIZE = 10000 -ZARR_BYTE_ORDER = 'C' -ZARR_CODEC_SPEC = {'blocksize': 0, - 'clevel': 5, - 'cname': 'lz4', - 'id': 'blosc', - 'shuffle': 1} +ZARR_BYTE_ORDER = "C" +ZARR_CODEC_SPEC = { + "blocksize": 0, + "clevel": 5, + "cname": "lz4", + "id": "blosc", + "shuffle": 1, +} +ZARR_DATETIME64_PRECISION = 'ns' import numcodecs + zarr_codec = numcodecs.get_codec(ZARR_CODEC_SPEC) router = APIRouter() + def convert_chunks_for_zarr(tiled_chunks: Tuple[Tuple[int]]): """Convert full tiled/dask chunk specification into zarr format - + Zarr only accepts chunks of constant size along each dimension; this function finds a unique representation of (possibly variable-sized chunks) internal to Tiled ArrayAdapter in terms of zarr blocks. + + Zarr chunks must be at least of size 1 (even for zero-dimensional arrays). """ -<<<<<<< HEAD - return [min(ZARR_BLOCK_SIZE, max(tc)) for tc in tiled_chunks] -======= - return [min(ZARR_BLOCK_SIZE, max(c)) for c in tiled_chunks] ->>>>>>> 08f255d687118b1983cf1019b375d7d6f948ce2e + return [min(ZARR_BLOCK_SIZE, max(*tc, 1)) for tc in tiled_chunks] + @router.get("{path:path}.zgroup", name="Root .zgroup metadata") @router.get("/{path:path}/.zgroup", name="Zarr .zgroup metadata") @@ -96,123 +102,137 @@ async def get_zarr_group_metadata( request: Request, entry=SecureEntry( scopes=["read:data", "read:metadata"], - structure_families={StructureFamily.table, StructureFamily.container}, + structure_families={StructureFamily.table, StructureFamily.container, StructureFamily.array}, ), ): + # Usual (unstructured) array; should respond to /.zarray instead + if entry.structure_family == StructureFamily.array and not isinstance(entry.structure().data_type, StructDtype): + raise HTTPException(status_code=HTTP_404_NOT_FOUND) + # Structured numpy array, Container, or Table return Response(json.dumps({"zarr_format": 2}), status_code=200) @router.get("/{path:path}/.zarray", name="Zarr .zarray metadata") async def get_zarr_array_metadata( request: Request, - path: str, - column: str = '', - entry=SecureEntry(scopes=["read:data", "read:metadata"], - structure_families={StructureFamily.array, StructureFamily.sparse, StructureFamily.table}), + entry=SecureEntry( + scopes=["read:data", "read:metadata"], + structure_families={StructureFamily.array, StructureFamily.sparse}, + ), ): -<<<<<<< HEAD - breakpoint() -======= ->>>>>>> 08f255d687118b1983cf1019b375d7d6f948ce2e - if entry.structure_family in {StructureFamily.array, StructureFamily.sparse}: - try: - metadata = entry.metadata() - structure = entry.structure() - zarray_spec = {'chunks': convert_chunks_for_zarr(structure.chunks), - 'compressor': ZARR_CODEC_SPEC, - 'dtype': structure.data_type.to_numpy_str(), - 'fill_value': 0, - 'filters': None, - 'order': ZARR_BYTE_ORDER, - 'shape': list(structure.shape), - 'zarr_format': 2} - except Exception as err: - print(f"Can not create .zarray metadata, {err}") - raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=err.args[0]) - - # elif entry.structure_family == StructureFamily.table: - # try: - # zarray_spec = {} - # metadata = entry.metadata() - # structure = entry.structure() - # # zarray_spec = {'chunks': [100, 1], #convert_chunks_for_zarr(structure.chunks), - # # 'compressor': ZARR_CODEC_SPEC, - # # 'dtype': entry.structure().meta.dtypes[column].str, - # # 'fill_value': 0, - # # 'filters': None, - # # 'order': ZARR_BYTE_ORDER, - # # # 'shape': list(structure.shape), - # # 'zarr_format': 2} - # except Exception as err: - # print(f"Can not create .zarray metadata, {err}") - # raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=err.args[0]) - - else: - # This is normal behaviour; zarr will try to open .zarray and, if 404 is received, it will move on assuming - # that the requested resource is a group (`.../path/.zgroup` would be requested next). - raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Requested resource does not have .zarray") - - return Response(json.dumps(zarray_spec), status_code=200) - - -@router.get("/{path:path}", name="Zarr .zgroup directory structure or a chunk of a zarr array") + # Only StructureFamily.array and StructureFamily.sparse can respond to `/.zarray` querries. Zarr will try to + # request .zarray on all other nodes in Tiled (not included in SecureEntry above), in which case the server + # will return an 404 error; this is the expected behaviour, which will signal zarr to try /.zgroup instead. + structure = entry.structure() + if isinstance(structure.data_type, StructDtype): + # Structured numpy array should be treated as a DataFrame and will respond to /.zgroup instead + raise HTTPException(status_code=HTTP_404_NOT_FOUND) + try: + zarray_spec = { + "chunks": convert_chunks_for_zarr(structure.chunks), + "compressor": ZARR_CODEC_SPEC, + "dtype": structure.data_type.to_numpy_str(), + "fill_value": 0, + "filters": None, + "order": ZARR_BYTE_ORDER, + "shape": list(structure.shape), + "zarr_format": 2, + } + return Response(json.dumps(zarray_spec), status_code=200) + except Exception as err: + print(f"Can not create .zarray metadata, {err}") + raise HTTPException( + status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=err.args[0] + ) + + +@router.get( + "/{path:path}", name="Zarr group (directory) structure or a chunk of a zarr array" +) async def get_zarr_array( request: Request, block: str | None = None, - entry=SecureEntry(scopes=["read:data"], - structure_families={StructureFamily.array, StructureFamily.sparse, StructureFamily.table, StructureFamily.container}, + entry=SecureEntry( + scopes=["read:data"], + structure_families={ + StructureFamily.array, + StructureFamily.sparse, + StructureFamily.table, + StructureFamily.container, + }, ), ): - url = str(request.url).split('?')[0].rstrip('/') # Remove query params and the trailing slash + # Remove query params and the trailing slash from the url + url = str(request.url).split("?")[0].rstrip("/") - # breakpoint() if entry.structure_family == StructureFamily.container: # List the contents of a "simulated" zarr directory (excluding .zarray and .zgroup files) - body = json.dumps([url + '/' + key for key in entry.keys()]) + if hasattr(entry, "keys_range"): + keys = await entry.keys_range(offset=0, limit=None) + else: + keys = entry.keys() + body = json.dumps([url + "/" + key for key in keys]) + + return Response(body, status_code=200, media_type="application/json") - return Response(body, status_code=200, media_type='application/json') - elif entry.structure_family == StructureFamily.table: - url = str(request.url).split('?')[0].rstrip('/') # Remove query params and the trailing slash - # breakpoint() - body = json.dumps([url + '/' + key for key in entry.structure().columns]) + # List the columns of the table -- they will be accessed separately as arrays + body = json.dumps([url + "/" + key for key in entry.structure().columns]) - # entry.structure().meta.dtypes + return Response(body, status_code=200, media_type="application/json") + + elif entry.structure_family == StructureFamily.array and isinstance(entry.structure().data_type, StructDtype): + # List the column names of the structured array -- they will be accessed separately + body = json.dumps([url + "/" + f.name for f in entry.structure().data_type.fields]) - return Response(body, status_code=200, media_type='application/json') + return Response(body, status_code=200, media_type="application/json") elif entry.structure_family in {StructureFamily.array, StructureFamily.sparse}: + # Return the actual array values for a single block of zarr array if block is not None: - import zarr import numpy as np + from sparse import SparseArray + + zarr_block_indx = [int(i) for i in block.split(",")] + zarr_block_spec = convert_chunks_for_zarr(entry.structure().chunks) + if (not (zarr_block_spec == [] and zarr_block_indx == [0])) and ( + len(zarr_block_spec) != len(zarr_block_indx) + ): + # Not a scalar and shape doesn't match + raise HTTPException( + status_code=HTTP_400_BAD_REQUEST, + detail=f"Requested zarr block index {zarr_block_indx} is inconsistent with the shape of array, {entry.structure().shape}.", # noqa + ) - block_indx = [int(i) for i in block.split(',')] - zarr_chunks = convert_chunks_for_zarr(entry.structure().chunks) - block_slice = tuple([slice(i*c, (i+1)*c) for c, i in zip(zarr_chunks, block_indx)]) - padding_size = [max(0, sl.stop-sh) for sh, sl in zip(entry.structure().shape, block_slice)] - - # if block == (): - # # Handle special case of numpy scalar - # with record_timing(request.state.metrics, "read"): - # array = await ensure_awaitable(entry.read) - # else: - - # breakpoint() + # Indices of the array slices in each dimension that correspond to the requested zarr block + block_slices = tuple( + [ + slice(i * c, (i + 1) * c) + for i, c in zip(zarr_block_indx, zarr_block_spec) + ] + ) try: with record_timing(request.state.metrics, "read"): - array = await ensure_awaitable(entry.read, slice=block_slice) - if sum(padding_size) > 0: - array = np.pad(array, [(0, p) for p in padding_size], mode='constant') + array = await ensure_awaitable(entry.read, slice=block_slices) except IndexError: raise HTTPException( - status_code=HTTP_400_BAD_REQUEST, detail="Block index out of range" + status_code=HTTP_400_BAD_REQUEST, + detail=f"Index of zarr block {zarr_block_indx} is out of range.", ) - # buf = zarr.array(array).store['0.0'] # Define a zarr array as a single block + if isinstance(array, SparseArray): + array = array.todense() - # breakpoint() + # Padd the last slices with zeros if needed to ensure all zarr blocks have same shapes + padding_size = [ + max(0, sl.stop - sh) + for sl, sh in zip(block_slices, entry.structure().shape) + ] + if sum(padding_size) > 0: + array = np.pad(array, [(0, p) for p in padding_size], mode="constant") - array = array.astype(array.dtype, order=ZARR_BYTE_ORDER, copy=False) # ensure array is contiguous + # Ensure the array is contiguous and encode it; equivalent to `buf = zarr.array(array).store['0.0']` + array = array.astype(array.dtype, order=ZARR_BYTE_ORDER, copy=False) buf = zarr_codec.encode(array) if not isinstance(buf, bytes): buf = array.tobytes(order="A") diff --git a/tiled/structures/sparse.py b/tiled/structures/sparse.py index 91ec0ee51..5b1e34d96 100644 --- a/tiled/structures/sparse.py +++ b/tiled/structures/sparse.py @@ -1,6 +1,7 @@ import enum from dataclasses import dataclass from typing import Optional, Tuple, Union + from .array import BuiltinDtype, StructDtype