From 6d654664246f574820ed495e083add2d48b6a0f8 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Wed, 17 Jan 2024 16:50:21 -0500 Subject: [PATCH 1/4] fixing a bug in thredds loader that limited crawling ability --- Makefile | 5 ++++- STACpopulator/input.py | 25 ++++++++++++++++--------- STACpopulator/populator_base.py | 10 +++++++--- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/Makefile b/Makefile index 910588c..6414ad3 100644 --- a/Makefile +++ b/Makefile @@ -11,10 +11,13 @@ DOCKER_TAG := ghcr.io/crim-ca/stac-populator:$(APP_VERSION) IMP_DIR := $(APP_NAME)/implementations STAC_HOST ?= http://localhost:8880/stac # CATALOG = https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/testdata/xclim/cmip6/catalog.html -CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/catalog.html +# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/catalog.html # CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/NOAA-GFDL/catalog.html # CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/AS-RCEC/catalog.html +# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/NUIST/catalog.html +CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/MIROC/catalog.html + PYESSV_ARCHIVE_DIR ?= ~/.esdoc/pyessv-archive PYESSV_ARCHIVE_REF ?= https://github.com/ES-DOC/pyessv-archive diff --git a/STACpopulator/input.py b/STACpopulator/input.py index 0004d81..43ef8bb 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -60,6 +60,7 @@ class THREDDSCatalog(TDSCatalog): Because of how :class:`TDSCatalog` automatically loads and parses right away from ``__init__`` call, we need to hack around how the ``session`` attribute gets defined. """ + def __init__(self, catalog_url: str, session: Optional[Session] = None) -> None: self._session = session super().__init__(catalog_url) @@ -91,7 +92,8 @@ def __init__( :type depth: int, optional """ super().__init__() - self._depth = depth if depth is not None else 1000 + self._max_depth = depth if depth is not None else 1000 + self._depth = 0 self.thredds_catalog_URL = self.validate_catalog_url(thredds_catalog_url) @@ -134,18 +136,23 @@ def reset(self): """Reset the generator.""" self.catalog_head = self.catalog - def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: + def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]: """Return a generator walking a THREDDS data catalog for datasets.""" + + if self._depth > self._max_depth: + return + if self.catalog_head.datasets.items(): for item_name, ds in self.catalog_head.datasets.items(): attrs = self.extract_metadata(ds) - yield item_name, attrs - - if self._depth > 0: - for name, ref in self.catalog_head.catalog_refs.items(): - self.catalog_head = ref.follow() - self._depth -= 1 - yield from self + yield item_name, ds.url_path, attrs + # yield item_name, ds.url_path, [] + + for name, ref in self.catalog_head.catalog_refs.items(): + self.catalog_head = ref.follow() + self._depth -= 1 + yield from self + self._depth += 1 def __getitem__(self, dataset): return self.catalog.datasets[dataset] diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 762c5b9..745c635 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -17,7 +17,6 @@ from STACpopulator.models import AnyGeometry from STACpopulator.stac_utils import get_logger, load_config, url_validate - LOGGER = get_logger(__name__) @@ -144,9 +143,12 @@ def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: post_stac_collection(self.stac_host, collection_data, self.update, session=self._session) def ingest(self) -> None: + counter = 0 LOGGER.info("Data ingestion") - for item_name, item_data in self._ingest_pipeline: - LOGGER.info(f"Creating STAC representation for {item_name}") + for item_name, item_loc, item_data in self._ingest_pipeline: + LOGGER.info(f"New data item: {item_name}") + if item_loc: + LOGGER.info(f"Data location: {item_loc}") stac_item = self.create_stac_item(item_name, item_data) if stac_item: post_stac_item( @@ -157,3 +159,5 @@ def ingest(self) -> None: update=self.update, session=self._session, ) + counter += 1 + LOGGER.info(f"Processed {counter} data items") From ed82212eecfd2f7d3fc5a082cd89fcacf7c98f6f Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Wed, 17 Jan 2024 17:02:09 -0500 Subject: [PATCH 2/4] fix issue --- STACpopulator/input.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/STACpopulator/input.py b/STACpopulator/input.py index 43ef8bb..097c681 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -146,7 +146,6 @@ def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]: for item_name, ds in self.catalog_head.datasets.items(): attrs = self.extract_metadata(ds) yield item_name, ds.url_path, attrs - # yield item_name, ds.url_path, [] for name, ref in self.catalog_head.catalog_refs.items(): self.catalog_head = ref.follow() @@ -208,7 +207,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: if self.prune: # stop recursive search if requested del dirs[:] col_path = os.path.join(root, self._collection_name) - yield col_path, self._load_json(col_path) + yield col_path, "", self._load_json(col_path) # if a collection is found deeper when not expected for items parsing # drop the nested directories to avoid over-crawling nested collections elif not self._collection_mode and not is_root and self._collection_name in files: @@ -218,7 +217,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: for name in files: if not self._collection_mode and self._is_item(name): item_path = os.path.join(root, name) - yield item_path, self._load_json(item_path) + yield item_path, "", self._load_json(item_path) def _is_item(self, path: Union[os.PathLike[str], str]) -> bool: name = os.path.split(path)[-1] From d07d5e6e02d5c6d19a48738eed3250c88c34842a Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Wed, 17 Jan 2024 17:07:39 -0500 Subject: [PATCH 3/4] fixes to pass tests --- .../implementations/DirectoryLoader/crawl_directory.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py index 7a2651e..ee11f52 100644 --- a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py +++ b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py @@ -1,6 +1,6 @@ import argparse import os.path -from typing import NoReturn, Optional, MutableMapping, Any +from typing import Any, MutableMapping, NoReturn, Optional from requests.sessions import Session @@ -45,8 +45,9 @@ def make_parser() -> argparse.ArgumentParser: parser.add_argument("directory", type=str, help="Path to a directory structure with STAC Collections and Items.") parser.add_argument("--update", action="store_true", help="Update collection and its items.") parser.add_argument( - "--prune", action="store_true", - help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure." + "--prune", + action="store_true", + help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure.", ) add_request_options(parser) return parser @@ -57,7 +58,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: with Session() as session: apply_request_options(session, ns) - for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune): + for collection_path, _, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune): collection_dir = os.path.dirname(collection_path) loader = STACDirectoryLoader(collection_dir, "item", prune=ns.prune) populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json, session=session) From e58e5d2869d8414d1f9eeb18163e63ab67500d2e Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Fri, 19 Jan 2024 12:49:34 -0500 Subject: [PATCH 4/4] PR 44 fixes --- .../DirectoryLoader/crawl_directory.py | 2 +- STACpopulator/input.py | 18 ++++++++++++++---- STACpopulator/populator_base.py | 5 +++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py index ee11f52..5336d08 100644 --- a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py +++ b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py @@ -58,7 +58,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: with Session() as session: apply_request_options(session, ns) - for collection_path, _, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune): + for _, collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune): collection_dir = os.path.dirname(collection_path) loader = STACDirectoryLoader(collection_dir, "item", prune=ns.prune) populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json, session=session) diff --git a/STACpopulator/input.py b/STACpopulator/input.py index 097c681..a9525e8 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -137,7 +137,11 @@ def reset(self): self.catalog_head = self.catalog def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]: - """Return a generator walking a THREDDS data catalog for datasets.""" + """Return a generator walking a THREDDS data catalog for datasets. + + :yield: Returns three quantities: name of the item, location of the item, and its attributes + :rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]] + """ if self._depth > self._max_depth: return @@ -198,7 +202,13 @@ def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool = self._collection_mode = mode == "collection" self._collection_name = "collection.json" - def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: + def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]: + """Return a generator that walks through a directory structure looking for sTAC Collections or Items. + + :yield: Returns three quantities: name of the item, location of the item, and its attributes + :rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]] + """ + is_root = True for root, dirs, files in self.iter: # since there can ever be only one 'collection' file name in a same directory @@ -207,7 +217,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: if self.prune: # stop recursive search if requested del dirs[:] col_path = os.path.join(root, self._collection_name) - yield col_path, "", self._load_json(col_path) + yield self._collection_name, col_path, self._load_json(col_path) # if a collection is found deeper when not expected for items parsing # drop the nested directories to avoid over-crawling nested collections elif not self._collection_mode and not is_root and self._collection_name in files: @@ -217,7 +227,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: for name in files: if not self._collection_mode and self._is_item(name): item_path = os.path.join(root, name) - yield item_path, "", self._load_json(item_path) + yield self._collection_name, item_path, self._load_json(item_path) def _is_item(self, path: Union[os.PathLike[str], str]) -> bool: name = os.path.split(path)[-1] diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 745c635..e3da33e 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -147,8 +147,7 @@ def ingest(self) -> None: LOGGER.info("Data ingestion") for item_name, item_loc, item_data in self._ingest_pipeline: LOGGER.info(f"New data item: {item_name}") - if item_loc: - LOGGER.info(f"Data location: {item_loc}") + LOGGER.info(f"Data location: {item_loc}") stac_item = self.create_stac_item(item_name, item_data) if stac_item: post_stac_item( @@ -161,3 +160,5 @@ def ingest(self) -> None: ) counter += 1 LOGGER.info(f"Processed {counter} data items") + else: + LOGGER.error("Failed to create STAC representation")