Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

CMIP6 COGs #191

Closed
leothomas opened this issue Sep 13, 2022 · 4 comments
Closed

CMIP6 COGs #191

leothomas opened this issue Sep 13, 2022 · 4 comments
Assignees

Comments

@leothomas
Copy link

leothomas commented Sep 13, 2022

I've finally gotten the CMIP6 COGs registered in AWS Open Data! This means I can begin handing off the task of ingesting them into the VEDA data holdings.

The data is hosted in the S3 Bucket: s3://nex-gddp-cmip6-cog

Next steps that I see, for getting the CMIP6 dataset into VEDA are:

  1. Discuss design of the STAC collection for the CMIP6 dataset: this issue further details the complexity that CMIP6 brings in terms of STAC, but the gist of it that the CMIP6 can be organized in many different levels and sub-levels (ie: the data can be grouped by any combination of model, variable and ssp; which, if we create one collection per unique model/variable/ssp combination, would entail to 34 new collections for the CMIP6 daily data, 6 collections for the monthly data and one more collection for a special data product called CrossingYear)

  2. Discuss an indexing strategy for the PgSTAC database:

A quick description of the technical problem we encountered with PgSTAC<v0.5, when generating partitions for a dataset with a temporal range as important as CMIP6's:

Background info on why we need to stand up a new database in order to integrate the changes necessary to ingest CMIP6:
PGSTAC has a partition on commonly searched fields (date, geometry, etc) to make the database more performant.
Up until v0.5, PGSTAC organized everything into weekly partitions, which was great, since most datasets had a temporal range of no more than ~20 years (20 * 52 = 1040 partitions). Along comes CMIP6 with a 150 year temporal range, which forces the database to create 150 * 52 = 7800 partitions, which crashes the database.

To solve this, David Bitner did a huge refactor of the database to allow partitioning first on collections, and then optionally on time, with the option to partition by year or month.

Once partitions have been created in a database, you can't really delete them, so the solution is to deploy a new database, and then re-ingest everything.

Now that indexing strategies are customizable in PgSTAC we should to establish such a strategy. David Bitner suggested that the data has such low temporal density (1 file/day AT MOST) that we may not even want to partition the dataset for dates outside of the 30 year period (approx. 2000 - 2030) during which most of the VEDA datasets overlap, or we may not even want to partition the dataset at all.

  1. Generate + ingest STAC Items: Here is some sample code for generating STAC Items for the monthly aggregations subset of the CMIP6 cogs. This will likely need to be adapted based on the outcome of the discussion from part 1:
# python3.8
import boto3
import json
import datetime
from pypgstac import pypgstac
import rio_stac
# tqdm provides progress bars
from tqdm import tqdm
import concurrent.futures
import os

# GDAL attempts to list the directory when it opens a file, in order to 
# find "sidecar" files. This setting tells GDAL to assume the directory
# is empty when opening a file, saving both on S3 LIST costs and 
# runtime. See: https://github.com/OSGeo/gdal/issues/909
os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR"

# use `profile_name: str` param `Session()` or default AWS profile to ensure 
# correct access
BUCKET = boto3.Session().resource("s3").Bucket("climatedashboard-data")

def create_stac_item(obj: boto3.resources.factory.s3.ObjectSummary) -> Union[pystac.Item, str]:
    """
    Generates a STAC Item object from a CMIP6 data file in S3
    
    :param obj: The S3 object summary of the file for which to create 
                a STAC Item
    :returns: STAC Item
    :returns: str if STAC Item generation failed
    :raises Exception: if unable to extract variable name, date or SSP from filename
    """
    
    filename=obj.key.split("/")[-1]
    
    ssp = filename.split("_")[3]
    date = filename.split("_")[-1].replace(".tif", "")
    var = filename.split("_")[0]
    
    if not ssp in ["ssp245", "ssp585"]: 
        raise Exception("Unable to extract ssp from filename: ", obj.key)
    if not 195000<int(date)<2102101:
        raise Exception("Unable to extract date from filename: ", obj.key)
    if not var in _vars: 
        raise Exception("Unable to extract date from filename: ", obj.key)
    
    try: 
        r = rio_stac.stac.create_stac_item(
            source = f"s3://{obj.bucket_name}/{obj.key}", 
            collection = var, 
            properties = {"cmip6:model":"ensemble", "cmip6:ssp":ssp, "cmip6:variable": var},
            input_datetime=datetime.datetime.strptime(date, "%Y%m"),
            #extensions=None # TODO: official cmip6 extension? 
            #collection_url=None # TODO
            with_proj=True,
            with_raster=True
        )
    except:
        return f"FAILED:{obj.key}"
    return r
    
if __name__ == "__main__":
    # see below for `cmip6-collections.json` content
    with open("./cmip6-collections.json", "r") as f:
        collections = [json.loads(x) for x in f.readlines()]
    _vars = [c["id"] for c in collections]
    
    # S3 prefix for searching 
    prefix = "cmip6/monthly/CMIP6_ensemble_median"
    # collects ensemble data across all variables
    objs = [i for var in _vars for i in BUCKET.objects.filter(Prefix=f"{prefix}/{var}/") if "historical" not in i.key]
    
    # Executes in ~20 minutes 
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        results = list(
            tqdm(
                executor.map(
                    lambda x: create_stac_item(x), 
                    objs
                ), 
                total=len(objs) # sets total length of progressbar
            )
        ) 
    
    # Verify no failures: 
    failed = [x for x in results if isinstance(x, str) and x.startswith("FAILED:")]
    if len(failed): 
        print(f"FAILED: {len(failed)} (of {len(results)}). Aborting...")
        exit()
    
    # sort by date (to optimize loading) and dump to file for safekeeping
    with open("cmip6-monthly-ensemble-items-sorted.json", "w") as f: 
        f.write("\n".join([json.dumps(x.to_dict()) for x in sorted(success, key=lambda x: x.to_dict()["properties"]["datetime"])]))

    # if loading from file, uncomment the next 2 lines: 
    # with open("cmip6-monthly-ensemble-items-sorted.json", "r") as f:
    #     items = [json.loads(x) for x in f.readlines()]
    
    # chunk items to load into year long chunks to avoid memory issues
    # since pgstac uses staging tables and transactions to pre-process 
    # and then insert the data
    for i in range(1950, 2100):
        chunk = [x for x in items if x["properties"]["datetime"].startswith(str(i))]
        print(
            f"Timerange: {chunk[0]['properties']['datetime']} - {chunk[-1]['properties']['datetime']}, Count: {len(chunk)}"
        )
        
        # Write to temp file (since pypgstac only loads from file)
        with open("cmip6-monthly-ensemble-items-temp.json", "w") as f:
            f.write("\n".join([json.dumps(x) for x in chunk]))
    
        pypgstac.load(
            table="items",
            file="cmip6-monthly-ensemble-items-temp.json",
            dsn="postgres://[USERNAME]:[PASSWORD]@[HOST]/postgis",
            method="insert_ignore", # use insert_ignore to avoid overwritting existing items
        )
    
    # remove temp file
    os.remove("cmip6-monthly-ensemble-items-temp.json")
  1. Missing data: I have identified 54 missing files (out of ~1.63M expected files --> 0.0033% loss, not bad!). The missing data likely exists in the raw NetCDF data (in s3://nex-gddp-cmip6). Since each NetCDF contains a year's worth of data and each COG is for a single day, the 54 missing COGs come from only 11 different NetCDFs - meaning only those 11 files need to be reprocessed. See attached file for list of missing files:
    missing_files.txt

  2. [OPTIONAL] Set up a validation pipeline: The COG data (in s3://nex-gddp-cmip6-cog is hosted in the NASA SMCE AWS account. Some reprocessing was necessary for which I set up an SQS queue + Lambda function in the NASA Impact/UAH AWS account. This same setup could be reused to run and rio cogeo validate to ensure that all the expected data is present and correctly formatted

  3. [HOUSEKEEPING]:
    Delete the CMIP6 data from the NASA Impact/UAH account (in s3://climatedashboard-data/cmip6) to avoid un-necessary storage cost/data duplication.
    Delete the SQS Queue and Lambda function used for reprocessing the data in the SMCE account to avoid further un-necessary costs due to empty "receives" by the SQS queue.

@leothomas leothomas self-assigned this Sep 13, 2022
@leothomas leothomas changed the title Ingest CMIP6 dataset in delta-backend CMIP6 COGs Oct 14, 2022
@leothomas
Copy link
Author

Lastly, @sharkinsspatial made a really great point about the fact that if the main purpose of reprocessing all the NetCDF files into COGs is just to display in them a visual dashboard, we may be approaching the problem wrong. We can make use of tools to provide a "tile-like" access to the data without having to actually reprocess the data. The idea would be to implement an index file to serve data from the NetCDFs in a zarr-like fashion and then use the Zarr-TiTiler extension @vincentsarago has been working to serve the data directly from the NetCDF files, eliminating the need for the CMIP6 COG dataset entirely.

This should be discussed further to establish: feasibility and timeline

@vincentsarago
Copy link
Collaborator

@leothomas are you talking about kerchunk ? The issue for visualisation is then that you don't have overviews which then limit to raw level. IMO when we need visualization, and if we can, we should create a COG (this might change if Zarr supports overview in the future).

Zarr-Titiler is not yet ready and I'm pretty afraid of the performance!

@j08lue
Copy link
Contributor

j08lue commented Dec 1, 2022

While the quest for rendering from Zarr etc is ongoing, we should finish up getting the COGs into VEDA for now and consider replacing the setup when a Zarr-native (or other better) solution is available.

@j08lue
Copy link
Contributor

j08lue commented May 2, 2023

Superseded by #204

@j08lue j08lue closed this as completed May 2, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants