Skip to content

Commit

Permalink
Merge pull request #44 from crim-ca/thredds-loader-logic-fix
Browse files Browse the repository at this point in the history
Fixing a bug in thredds loader that limited crawling ability
  • Loading branch information
dchandan authored Jan 22, 2024
2 parents 2bc22dc + e58e5d2 commit 10b0716
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ DOCKER_TAG := ghcr.io/crim-ca/stac-populator:$(APP_VERSION)
IMP_DIR := $(APP_NAME)/implementations
STAC_HOST ?= http://localhost:8880/stac
# CATALOG = https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/testdata/xclim/cmip6/catalog.html
CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/catalog.html
# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/catalog.html
# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/NOAA-GFDL/catalog.html
# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/AS-RCEC/catalog.html

# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/NUIST/catalog.html
CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/MIROC/catalog.html

PYESSV_ARCHIVE_DIR ?= ~/.esdoc/pyessv-archive
PYESSV_ARCHIVE_REF ?= https://github.com/ES-DOC/pyessv-archive

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import os.path
from typing import NoReturn, Optional, MutableMapping, Any
from typing import Any, MutableMapping, NoReturn, Optional

from requests.sessions import Session

Expand Down Expand Up @@ -45,8 +45,9 @@ def make_parser() -> argparse.ArgumentParser:
parser.add_argument("directory", type=str, help="Path to a directory structure with STAC Collections and Items.")
parser.add_argument("--update", action="store_true", help="Update collection and its items.")
parser.add_argument(
"--prune", action="store_true",
help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure."
"--prune",
action="store_true",
help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure.",
)
add_request_options(parser)
return parser
Expand All @@ -57,7 +58,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:

with Session() as session:
apply_request_options(session, ns)
for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
for _, collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
collection_dir = os.path.dirname(collection_path)
loader = STACDirectoryLoader(collection_dir, "item", prune=ns.prune)
populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json, session=session)
Expand Down
40 changes: 28 additions & 12 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class THREDDSCatalog(TDSCatalog):
Because of how :class:`TDSCatalog` automatically loads and parses right away from ``__init__`` call,
we need to hack around how the ``session`` attribute gets defined.
"""

def __init__(self, catalog_url: str, session: Optional[Session] = None) -> None:
self._session = session
super().__init__(catalog_url)
Expand Down Expand Up @@ -91,7 +92,8 @@ def __init__(
:type depth: int, optional
"""
super().__init__()
self._depth = depth if depth is not None else 1000
self._max_depth = depth if depth is not None else 1000
self._depth = 0

self.thredds_catalog_URL = self.validate_catalog_url(thredds_catalog_url)

Expand Down Expand Up @@ -134,18 +136,26 @@ def reset(self):
"""Reset the generator."""
self.catalog_head = self.catalog

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
"""Return a generator walking a THREDDS data catalog for datasets."""
def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
"""Return a generator walking a THREDDS data catalog for datasets.
:yield: Returns three quantities: name of the item, location of the item, and its attributes
:rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]]
"""

if self._depth > self._max_depth:
return

if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
yield item_name, attrs
yield item_name, ds.url_path, attrs

if self._depth > 0:
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
self._depth -= 1
yield from self
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
self._depth -= 1
yield from self
self._depth += 1

def __getitem__(self, dataset):
return self.catalog.datasets[dataset]
Expand Down Expand Up @@ -192,7 +202,13 @@ def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool =
self._collection_mode = mode == "collection"
self._collection_name = "collection.json"

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
"""Return a generator that walks through a directory structure looking for sTAC Collections or Items.
:yield: Returns three quantities: name of the item, location of the item, and its attributes
:rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]]
"""

is_root = True
for root, dirs, files in self.iter:
# since there can ever be only one 'collection' file name in a same directory
Expand All @@ -201,7 +217,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
if self.prune: # stop recursive search if requested
del dirs[:]
col_path = os.path.join(root, self._collection_name)
yield col_path, self._load_json(col_path)
yield self._collection_name, col_path, self._load_json(col_path)
# if a collection is found deeper when not expected for items parsing
# drop the nested directories to avoid over-crawling nested collections
elif not self._collection_mode and not is_root and self._collection_name in files:
Expand All @@ -211,7 +227,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
for name in files:
if not self._collection_mode and self._is_item(name):
item_path = os.path.join(root, name)
yield item_path, self._load_json(item_path)
yield self._collection_name, item_path, self._load_json(item_path)

def _is_item(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
Expand Down
11 changes: 8 additions & 3 deletions STACpopulator/populator_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from STACpopulator.models import AnyGeometry
from STACpopulator.stac_utils import get_logger, load_config, url_validate


LOGGER = get_logger(__name__)


Expand Down Expand Up @@ -144,9 +143,11 @@ def publish_stac_collection(self, collection_data: dict[str, Any]) -> None:
post_stac_collection(self.stac_host, collection_data, self.update, session=self._session)

def ingest(self) -> None:
counter = 0
LOGGER.info("Data ingestion")
for item_name, item_data in self._ingest_pipeline:
LOGGER.info(f"Creating STAC representation for {item_name}")
for item_name, item_loc, item_data in self._ingest_pipeline:
LOGGER.info(f"New data item: {item_name}")
LOGGER.info(f"Data location: {item_loc}")
stac_item = self.create_stac_item(item_name, item_data)
if stac_item:
post_stac_item(
Expand All @@ -157,3 +158,7 @@ def ingest(self) -> None:
update=self.update,
session=self._session,
)
counter += 1
LOGGER.info(f"Processed {counter} data items")
else:
LOGGER.error("Failed to create STAC representation")

0 comments on commit 10b0716

Please sign in to comment.