diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index eda87889a..344bbbf5d 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -41,10 +41,9 @@ def read_csv( ) -read_csv.__doc__ = ( - """ +read_csv.__doc__ = """ This wraps dask.dataframe.read_csv. Original docstring: -""" - + dask.dataframe.read_csv.__doc__ +""" + ( + dask.dataframe.read_csv.__doc__ or "" ) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 9fc01cc28..661f7c414 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -321,7 +321,7 @@ async def __aiter__(self): @property def data_sources(self): - return [DataSource.from_orm(ds) for ds in self.node.data_sources or []] + return [DataSource.from_orm(ds) for ds in (self.node.data_sources or [])] async def asset_by_id(self, asset_id): statement = ( @@ -597,12 +597,14 @@ async def create_node( if data_source.management != Management.external: if structure_family == StructureFamily.container: raise NotImplementedError(structure_family) - data_source.mimetype = DEFAULT_CREATION_MIMETYPE[structure_family] + data_source.mimetype = DEFAULT_CREATION_MIMETYPE[ + data_source.structure_family + ] data_source.parameters = {} data_uri = str(self.context.writable_storage) + "".join( f"/{quote_plus(segment)}" for segment in (self.segments + [key]) ) - init_storage = DEFAULT_INIT_STORAGE[structure_family] + init_storage = DEFAULT_INIT_STORAGE[data_source.structure_family] assets = await ensure_awaitable( init_storage, data_uri, data_source.structure ) @@ -1307,9 +1309,9 @@ def specs_array_to_json(specs): STRUCTURES = { - StructureFamily.container: CatalogContainerAdapter, StructureFamily.array: CatalogArrayAdapter, StructureFamily.awkward: CatalogAwkwardAdapter, - StructureFamily.table: CatalogTableAdapter, + StructureFamily.container: CatalogContainerAdapter, StructureFamily.sparse: CatalogSparseAdapter, + StructureFamily.table: CatalogTableAdapter, } diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index f2ec818c0..eeca4c2ac 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -291,7 +291,6 @@ class DataSource(Timestamped, Base): node_id = Column( Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False ) - structure_family = Column(Enum(StructureFamily), nullable=False) structure_id = Column( Unicode(32), ForeignKey("structures.id", ondelete="CASCADE"), nullable=True ) @@ -301,6 +300,7 @@ class DataSource(Timestamped, Base): parameters = Column(JSONVariant, nullable=True) # This relates to the mutability of the data. management = Column(Enum(Management), nullable=False) + structure_family = Column(Enum(StructureFamily), nullable=False) # many-to-one relationship to Structure structure: Mapped["Structure"] = relationship( diff --git a/tiled/client/constructors.py b/tiled/client/constructors.py index b9b7ee385..8a035c89d 100644 --- a/tiled/client/constructors.py +++ b/tiled/client/constructors.py @@ -150,11 +150,14 @@ def from_context( and (context.http_client.auth is None) ): context.authenticate() + params = {} + if include_data_sources: + params["include_data_sources"] = True content = handle_error( context.http_client.get( item_uri, headers={"Accept": MSGPACK_MIME_TYPE}, - params={"include_data_sources": include_data_sources}, + params=params, ) ).json() else: diff --git a/tiled/client/container.py b/tiled/client/container.py index 2f9d99a0a..55a9dc59e 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -249,16 +249,18 @@ def __getitem__(self, keys, _ignore_inlined_contents=False): # Lookup this key *within the search results* of this Node. key, *tail = keys tail = tuple(tail) # list -> tuple + params = { + **_queries_to_params(KeyLookup(key)), + **self._queries_as_params, + **self._sorting_params, + } + if self._include_data_sources: + params["include_data_sources"] = True content = handle_error( self.context.http_client.get( self.item["links"]["search"], headers={"Accept": MSGPACK_MIME_TYPE}, - params={ - "include_data_sources": self._include_data_sources, - **_queries_to_params(KeyLookup(key)), - **self._queries_as_params, - **self._sorting_params, - }, + params=params, ) ).json() self._cached_len = ( @@ -305,13 +307,14 @@ def __getitem__(self, keys, _ignore_inlined_contents=False): self_link = self.item["links"]["self"] if self_link.endswith("/"): self_link = self_link[:-1] + params = {} + if self._include_data_sources: + params["include_data_sources"] = True content = handle_error( self.context.http_client.get( self_link + "".join(f"/{key}" for key in keys[i:]), headers={"Accept": MSGPACK_MIME_TYPE}, - params={ - "include_data_sources": self._include_data_sources - }, + params=params, ) ).json() except ClientError as err: @@ -413,15 +416,17 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False): next_page_url = f"{self.item['links']['search']}?page[offset]={start}" item_counter = itertools.count(start) while next_page_url is not None: + params = { + **self._queries_as_params, + **sorting_params, + } + if self._include_data_sources: + params["include_data_sources"] = True content = handle_error( self.context.http_client.get( next_page_url, headers={"Accept": MSGPACK_MIME_TYPE}, - params={ - "include_data_sources": self._include_data_sources, - **self._queries_as_params, - **sorting_params, - }, + params=params, ) ).json() self._cached_len = ( diff --git a/tiled/client/dataframe.py b/tiled/client/dataframe.py index 794c7fde1..dd83b73a0 100644 --- a/tiled/client/dataframe.py +++ b/tiled/client/dataframe.py @@ -1,5 +1,5 @@ import dask -import dask.dataframe +import dask.dataframe.core from ..serialization.table import deserialize_arrow, serialize_arrow from ..utils import APACHE_ARROW_FILE_MIME_TYPE, UNCHANGED @@ -162,7 +162,7 @@ def read(self, columns=None): if columns is not None: meta = meta[columns] - ddf = dask.dataframe.DataFrame( + ddf = dask.dataframe.core.DataFrame( dask_tasks, name=name, meta=meta, diff --git a/tiled/server/core.py b/tiled/server/core.py index 857d547a0..10e0d93d6 100644 --- a/tiled/server/core.py +++ b/tiled/server/core.py @@ -34,6 +34,7 @@ ) from . import schemas from .etag import tokenize +from .links import links_for_node from .utils import record_timing del queries @@ -404,6 +405,7 @@ async def construct_resource( depth=0, ): path_str = "/".join(path_parts) + id_ = path_parts[-1] if path_parts else "" attributes = {"ancestors": path_parts[:-1]} if include_data_sources and hasattr(entry, "data_sources"): attributes["data_sources"] = entry.data_sources @@ -488,15 +490,16 @@ async def construct_resource( for key, direction in entry.sorting ] d = { - "id": path_parts[-1] if path_parts else "", + "id": id_, "attributes": schemas.NodeAttributes(**attributes), } if not omit_links: - d["links"] = { - "self": f"{base_url}/metadata/{path_str}", - "search": f"{base_url}/search/{path_str}", - "full": f"{base_url}/container/full/{path_str}", - } + d["links"] = links_for_node( + entry.structure_family, + entry.structure(), + base_url, + path_str, + ) resource = schemas.Resource[ schemas.NodeAttributes, schemas.ContainerLinks, schemas.ContainerMeta @@ -510,34 +513,16 @@ async def construct_resource( entry.structure_family ] links.update( - { - link: template.format(base_url=base_url, path=path_str) - for link, template in FULL_LINKS[entry.structure_family].items() - } + links_for_node( + entry.structure_family, + entry.structure(), + base_url, + path_str, + ) ) structure = asdict(entry.structure()) if schemas.EntryFields.structure_family in fields: attributes["structure_family"] = entry.structure_family - if entry.structure_family == StructureFamily.sparse: - shape = structure.get("shape") - block_template = ",".join(f"{{{index}}}" for index in range(len(shape))) - links[ - "block" - ] = f"{base_url}/array/block/{path_str}?block={block_template}" - elif entry.structure_family == StructureFamily.array: - shape = structure.get("shape") - block_template = ",".join( - f"{{index_{index}}}" for index in range(len(shape)) - ) - links[ - "block" - ] = f"{base_url}/array/block/{path_str}?block={block_template}" - elif entry.structure_family == StructureFamily.table: - links[ - "partition" - ] = f"{base_url}/table/partition/{path_str}?partition={{index}}" - elif entry.structure_family == StructureFamily.awkward: - links["buffers"] = f"{base_url}/awkward/buffers/{path_str}" if schemas.EntryFields.structure in fields: attributes["structure"] = structure else: @@ -719,15 +704,6 @@ class WrongTypeForRoute(Exception): pass -FULL_LINKS = { - StructureFamily.array: {"full": "{base_url}/array/full/{path}"}, - StructureFamily.awkward: {"full": "{base_url}/awkward/full/{path}"}, - StructureFamily.container: {"full": "{base_url}/container/full/{path}"}, - StructureFamily.table: {"full": "{base_url}/table/full/{path}"}, - StructureFamily.sparse: {"full": "{base_url}/array/full/{path}"}, -} - - def asdict(dc): "Compat for converting dataclass or pydantic.BaseModel to dict." if dc is None: diff --git a/tiled/server/dependencies.py b/tiled/server/dependencies.py index 7cf89c4f3..404f4a5d7 100644 --- a/tiled/server/dependencies.py +++ b/tiled/server/dependencies.py @@ -48,7 +48,7 @@ def get_root_tree(): ) -def SecureEntry(scopes): +def SecureEntry(scopes, structure_families=None): async def inner( path: str, request: Request, @@ -116,7 +116,19 @@ async def inner( ) except NoEntry: raise HTTPException(status_code=404, detail=f"No such entry: {path_parts}") - return entry + # Fast path for the common successful case + if (structure_families is None) or ( + entry.structure_family in structure_families + ): + return entry + raise HTTPException( + status_code=404, + detail=( + f"The node at {path} has structure family {entry.structure_family} " + "and this endpoint is compatible with structure families " + f"{structure_families}" + ), + ) return Security(inner, scopes=scopes) diff --git a/tiled/server/links.py b/tiled/server/links.py new file mode 100644 index 000000000..76bf2616f --- /dev/null +++ b/tiled/server/links.py @@ -0,0 +1,53 @@ +""" +Generate the 'links' section of the response JSON. + +The links vary by structure family. +""" +from ..structures.core import StructureFamily + + +def links_for_node(structure_family, structure, base_url, path_str): + links = {} + links = LINKS_BY_STRUCTURE_FAMILY[structure_family]( + structure_family, structure, base_url, path_str + ) + links["self"] = f"{base_url}/metadata/{path_str}" + return links + + +def links_for_array(structure_family, structure, base_url, path_str): + links = {} + block_template = ",".join(f"{{{index}}}" for index in range(len(structure.shape))) + links["block"] = f"{base_url}/array/block/{path_str}?block={block_template}" + links["full"] = f"{base_url}/array/full/{path_str}" + return links + + +def links_for_awkward(structure_family, structure, base_url, path_str): + links = {} + links["buffers"] = f"{base_url}/awkward/buffers/{path_str}" + links["full"] = f"{base_url}/awkward/full/{path_str}" + return links + + +def links_for_container(structure_family, structure, base_url, path_str): + links = {} + links["full"] = f"{base_url}/container/full/{path_str}" + links["search"] = f"{base_url}/search/{path_str}" + return links + + +def links_for_table(structure_family, structure, base_url, path_str): + links = {} + links["partition"] = f"{base_url}/table/partition/{path_str}?partition={{index}}" + links["full"] = f"{base_url}/table/full/{path_str}" + return links + + +LINKS_BY_STRUCTURE_FAMILY = { + StructureFamily.array: links_for_array, + StructureFamily.awkward: links_for_awkward, + StructureFamily.container: links_for_container, + StructureFamily.sparse: links_for_array, # spare and array are the same + StructureFamily.table: links_for_table, +} diff --git a/tiled/server/router.py b/tiled/server/router.py index c715e6772..2dbd4fe63 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -46,6 +46,7 @@ get_validation_registry, slice_, ) +from .links import links_for_node from .settings import get_settings from .utils import filter_for_access, get_base_url, record_timing @@ -348,7 +349,10 @@ async def metadata( ) async def array_block( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], + structure_families={StructureFamily.array, StructureFamily.sparse}, + ), block=Depends(block), slice=Depends(slice_), expected_shape=Depends(expected_shape), @@ -360,15 +364,7 @@ async def array_block( """ Fetch a chunk of array-like data. """ - if entry.structure_family == "array": - shape = entry.structure().shape - elif entry.structure_family == "sparse": - shape = entry.structure().shape - else: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /array/block route.", - ) + shape = entry.structure().shape # Check that block dimensionality matches array dimensionality. ndim = len(shape) if len(block) != ndim: @@ -430,7 +426,10 @@ async def array_block( ) async def array_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], + structure_families={StructureFamily.array, StructureFamily.sparse}, + ), slice=Depends(slice_), expected_shape=Depends(expected_shape), format: Optional[str] = None, @@ -442,11 +441,6 @@ async def array_full( Fetch a slice of array-like data. """ structure_family = entry.structure_family - if structure_family not in {"array", "sparse"}: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /array/full route.", - ) # Deferred import because this is not a required dependency of the server # for some use cases. import numpy @@ -454,7 +448,7 @@ async def array_full( try: with record_timing(request.state.metrics, "read"): array = await ensure_awaitable(entry.read, slice) - if structure_family == "array": + if structure_family == StructureFamily.array: array = numpy.asarray(array) # Force dask or PIMS or ... to do I/O. except IndexError: raise HTTPException(status_code=400, detail="Block index out of range") @@ -496,7 +490,7 @@ async def array_full( async def get_table_partition( request: Request, partition: int, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry(scopes=["read:data"], structure_families={StructureFamily.table}), column: Optional[List[str]] = Query(None, min_length=1), field: Optional[List[str]] = Query(None, min_length=1, deprecated=True), format: Optional[str] = None, @@ -544,7 +538,7 @@ async def get_table_partition( async def post_table_partition( request: Request, partition: int, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry(scopes=["read:data"], structure_families={StructureFamily.table}), column: Optional[List[str]] = Body(None, min_length=1), format: Optional[str] = None, filename: Optional[str] = None, @@ -579,11 +573,6 @@ async def table_partition( """ Fetch a partition (continuous block of rows) from a DataFrame. """ - if entry.structure_family != StructureFamily.table: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /table/partition route.", - ) try: # The singular/plural mismatch here of "fields" and "field" is # due to the ?field=A&field=B&field=C... encodes in a URL. @@ -627,7 +616,7 @@ async def table_partition( ) async def get_table_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry(scopes=["read:data"], structure_families={StructureFamily.table}), column: Optional[List[str]] = Query(None, min_length=1), format: Optional[str] = None, filename: Optional[str] = None, @@ -655,7 +644,7 @@ async def get_table_full( ) async def post_table_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry(scopes=["read:data"], structure_families={StructureFamily.table}), column: Optional[List[str]] = Body(None, min_length=1), format: Optional[str] = None, filename: Optional[str] = None, @@ -688,11 +677,6 @@ async def table_full( """ Fetch the data for the given table. """ - if entry.structure_family != StructureFamily.table: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /table/full route.", - ) try: with record_timing(request.state.metrics, "read"): data = await ensure_awaitable(entry.read, column) @@ -733,7 +717,9 @@ async def table_full( ) async def get_container_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], structure_families={StructureFamily.container} + ), principal: str = Depends(get_current_principal), field: Optional[List[str]] = Query(None, min_length=1), format: Optional[str] = None, @@ -761,7 +747,9 @@ async def get_container_full( ) async def post_container_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], structure_families={StructureFamily.container} + ), principal: str = Depends(get_current_principal), field: Optional[List[str]] = Body(None, min_length=1), format: Optional[str] = None, @@ -794,11 +782,6 @@ async def container_full( """ Fetch the data for the given container. """ - if entry.structure_family != StructureFamily.container: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /container/full route.", - ) try: with record_timing(request.state.metrics, "read"): data = await ensure_awaitable(entry.read, fields=field) @@ -838,7 +821,10 @@ async def container_full( ) async def node_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], + structure_families={StructureFamily.table, StructureFamily.container}, + ), principal: str = Depends(get_current_principal), field: Optional[List[str]] = Query(None, min_length=1), format: Optional[str] = None, @@ -901,7 +887,9 @@ async def node_full( ) async def get_awkward_buffers( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], structure_families={StructureFamily.awkward} + ), form_key: Optional[List[str]] = Query(None, min_length=1), format: Optional[str] = None, filename: Optional[str] = None, @@ -937,7 +925,9 @@ async def get_awkward_buffers( async def post_awkward_buffers( request: Request, body: List[str], - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], structure_families={StructureFamily.awkward} + ), format: Optional[str] = None, filename: Optional[str] = None, serialization_registry=Depends(get_serialization_registry), @@ -975,11 +965,6 @@ async def _awkward_buffers( ): structure_family = entry.structure_family structure = entry.structure() - if structure_family != StructureFamily.awkward: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /awkward/buffers route.", - ) with record_timing(request.state.metrics, "read"): # The plural vs. singular mismatch is due to the way query parameters # are given as ?form_key=A&form_key=B&form_key=C. @@ -1020,7 +1005,9 @@ async def _awkward_buffers( ) async def awkward_full( request: Request, - entry=SecureEntry(scopes=["read:data"]), + entry=SecureEntry( + scopes=["read:data"], structure_families={StructureFamily.awkward} + ), # slice=Depends(slice_), format: Optional[str] = None, filename: Optional[str] = None, @@ -1031,11 +1018,6 @@ async def awkward_full( Fetch a slice of AwkwardArray data. """ structure_family = entry.structure_family - if structure_family != StructureFamily.awkward: - raise HTTPException( - status_code=404, - detail=f"Cannot read {entry.structure_family} structure with /awkward/full route.", - ) # Deferred import because this is not a required dependency of the server # for some use cases. import awkward @@ -1174,30 +1156,9 @@ async def _create_node( specs=body.specs, data_sources=body.data_sources, ) - links = {} - base_url = get_base_url(request) - path_parts = [segment for segment in path.split("/") if segment] + [key] - path_str = "/".join(path_parts) - links["self"] = f"{base_url}/metadata/{path_str}" - if body.structure_family in {StructureFamily.array, StructureFamily.sparse}: - block_template = ",".join( - f"{{{index}}}" for index in range(len(node.structure().shape)) - ) - links["block"] = f"{base_url}/array/block/{path_str}?block={block_template}" - links["full"] = f"{base_url}/array/full/{path_str}" - elif body.structure_family == StructureFamily.table: - links[ - "partition" - ] = f"{base_url}/table/partition/{path_str}?partition={{index}}" - links["full"] = f"{base_url}/table/full/{path_str}" - elif body.structure_family == StructureFamily.container: - links["full"] = f"{base_url}/container/full/{path_str}" - links["search"] = f"{base_url}/search/{path_str}" - elif body.structure_family == StructureFamily.awkward: - links["buffers"] = f"{base_url}/awkward/buffers/{path_str}" - links["full"] = f"{base_url}/awkward/full/{path_str}" - else: - raise NotImplementedError(body.structure_family) + links = links_for_node( + structure_family, structure, get_base_url(request), path + f"/{key}" + ) response_data = { "id": key, "links": links, @@ -1239,7 +1200,10 @@ async def bulk_delete( @router.put("/array/full/{path:path}") async def put_array_full( request: Request, - entry=SecureEntry(scopes=["write:data"]), + entry=SecureEntry( + scopes=["write:data"], + structure_families={StructureFamily.array, StructureFamily.sparse}, + ), deserialization_registry=Depends(get_deserialization_registry), ): body = await request.body() @@ -1265,7 +1229,10 @@ async def put_array_full( @router.put("/array/block/{path:path}") async def put_array_block( request: Request, - entry=SecureEntry(scopes=["write:data"]), + entry=SecureEntry( + scopes=["write:data"], + structure_families={StructureFamily.array, StructureFamily.sparse}, + ), deserialization_registry=Depends(get_deserialization_registry), block=Depends(block), ): @@ -1297,7 +1264,9 @@ async def put_array_block( @router.put("/node/full/{path:path}", deprecated=True) async def put_node_full( request: Request, - entry=SecureEntry(scopes=["write:data"]), + entry=SecureEntry( + scopes=["write:data"], structure_families={StructureFamily.table} + ), deserialization_registry=Depends(get_deserialization_registry), ): if not hasattr(entry, "write"): @@ -1334,14 +1303,12 @@ async def put_table_partition( @router.put("/awkward/full/{path:path}") async def put_awkward_full( request: Request, - entry=SecureEntry(scopes=["write:data"]), + entry=SecureEntry( + scopes=["write:data"], structure_families={StructureFamily.awkward} + ), deserialization_registry=Depends(get_deserialization_registry), ): body = await request.body() - if entry.structure_family != StructureFamily.awkward: - raise HTTPException( - status_code=404, detail="This route is not applicable to this node." - ) if not hasattr(entry, "write"): raise HTTPException(status_code=405, detail="This node cannot be written to.") media_type = request.headers["content-type"] diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index 5fbe0a1e5..85bf50884 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -137,9 +137,9 @@ class DataSource(pydantic.BaseModel): Union[ ArrayStructure, AwkwardStructure, - TableStructure, NodeStructure, SparseStructure, + TableStructure, ] ] = None mimetype: Optional[str] = None @@ -169,9 +169,9 @@ class NodeAttributes(pydantic.BaseModel): Union[ ArrayStructure, AwkwardStructure, - TableStructure, NodeStructure, SparseStructure, + TableStructure, ] ] sorting: Optional[List[SortingItem]] @@ -218,11 +218,11 @@ class SparseLinks(pydantic.BaseModel): resource_links_type_by_structure_family = { - StructureFamily.container: ContainerLinks, StructureFamily.array: ArrayLinks, StructureFamily.awkward: AwkwardLinks, - StructureFamily.table: DataFrameLinks, + StructureFamily.container: ContainerLinks, StructureFamily.sparse: SparseLinks, + StructureFamily.table: DataFrameLinks, } diff --git a/tiled/structures/core.py b/tiled/structures/core.py index 065e3dd4e..600a47ad6 100644 --- a/tiled/structures/core.py +++ b/tiled/structures/core.py @@ -10,9 +10,9 @@ class StructureFamily(str, enum.Enum): + array = "array" awkward = "awkward" container = "container" - array = "array" sparse = "sparse" table = "table"