Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generating and uploading STAC records for the CMIP6 dataset #13

Closed
leothomas opened this issue Jan 19, 2022 · 1 comment
Closed

Generating and uploading STAC records for the CMIP6 dataset #13

leothomas opened this issue Jan 19, 2022 · 1 comment

Comments

@leothomas
Copy link
Contributor

leothomas commented Jan 19, 2022

STAC Items for the monthly ensemble averages of the CMIP6 dataset have been generated and uploaded to the staging STAC API. See: https://bgp41fmlrh.execute-api.us-east-1.amazonaws.com/collections/huss/items?limit=10

Code walk through:

# python3.8
import boto3
import json
import datetime
from pypgstac import pypgstac
import rio_stac
# tqdm provides progress bars
from tqdm import tqdm
import concurrent.futures
import os

# GDAL attempts to list the directory when it opens a file, in order to 
# find "sidecar" files. This setting tells GDAL to assume the directory
# is empty when opening a file, saving both on S3 LIST costs and 
# runtime. See: https://github.com/OSGeo/gdal/issues/909
os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR"

# use `profile_name: str` param `Session()` or default AWS profile to ensure 
# correct access
BUCKET = boto3.Session().resource("s3").Bucket("climatedashboard-data")

def create_stac_item(obj: boto3.resources.factory.s3.ObjectSummary) -> Union[pystac.Item, str]:
    """
    Generates a STAC Item object from a CMIP6 data file in S3
    
    :param obj: The S3 object summary of the file for which to create 
                a STAC Item
    :returns: STAC Item
    :returns: str if STAC Item generation failed
    :raises Exception: if unable to extract variable name, date or SSP from filename
    """
    
    filename=obj.key.split("/")[-1]
    
    ssp = filename.split("_")[3]
    date = filename.split("_")[-1].replace(".tif", "")
    var = filename.split("_")[0]
    
    if not ssp in ["ssp245", "ssp585"]: 
        raise Exception("Unable to extract ssp from filename: ", obj.key)
    if not 195000<int(date)<2102101:
        raise Exception("Unable to extract date from filename: ", obj.key)
    if not var in _vars: 
        raise Exception("Unable to extract date from filename: ", obj.key)
    
    try: 
        r = rio_stac.stac.create_stac_item(
            source = f"s3://{obj.bucket_name}/{obj.key}", 
            collection = var, 
            properties = {"cmip6:model":"ensemble", "cmip6:ssp":ssp, "cmip6:variable": var},
            input_datetime=datetime.datetime.strptime(date, "%Y%m"),
            #extensions=None # TODO: official cmip6 extension? 
            #collection_url=None # TODO
            with_proj=True,
            with_raster=True
        )
    except:
        return f"FAILED:{obj.key}"
    return r
    
if __name__ == "__main__":
    # see below for `cmip6-collections.json` content
    with open("./cmip6-collections.json", "r") as f:
        collections = [json.loads(x) for x in f.readlines()]
    _vars = [c["id"] for c in collections]
    
    # S3 prefix for searching 
    prefix = "cmip6/monthly/CMIP6_ensemble_median"
    # collects ensemble data across all variables
    objs = [i for var in _vars for i in BUCKET.objects.filter(Prefix=f"{prefix}/{var}/") if "historical" not in i.key]
    
    # Executes in ~20 minutes 
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        results = list(
            tqdm(
                executor.map(
                    lambda x: create_stac_item(x), 
                    objs
                ), 
                total=len(objs) # sets total length of progressbar
            )
        ) 
    
    # Verify no failures: 
    failed = [x for x in results if isinstance(x, str) and x.startswith("FAILED:")]
    if len(failed): 
        print(f"FAILED: {len(failed)} (of {len(results)}). Aborting...")
        exit()
    
    # sort by date (to optimize loading) and dump to file for safekeeping
    with open("cmip6-monthly-ensemble-items-sorted.json", "w") as f: 
        f.write("\n".join([json.dumps(x.to_dict()) for x in sorted(success, key=lambda x: x.to_dict()["properties"]["datetime"])]))

    # if loading from file, uncomment the next 2 lines: 
    # with open("cmip6-monthly-ensemble-items-sorted.json", "r") as f:
    #     items = [json.loads(x) for x in f.readlines()]
    
    # chunk items to load into year long chunks to avoid memory issues
    # since pgstac uses staging tables and transactions to pre-process 
    # and then insert the data
    for i in range(1950, 2100):
        chunk = [x for x in items if x["properties"]["datetime"].startswith(str(i))]
        print(
            f"Timerange: {chunk[0]['properties']['datetime']} - {chunk[-1]['properties']['datetime']}, Count: {len(chunk)}"
        )
        
        # Write to temp file (since pypgstac only loads from file)
        with open("cmip6-monthly-ensemble-items-temp.json", "w") as f:
            f.write("\n".join([json.dumps(x) for x in chunk]))
    
        pypgstac.load(
            table="items",
            file="cmip6-monthly-ensemble-items-temp.json",
            dsn="postgres://[USERNAME]:[PASSWORD]@[HOST]/postgis",
            method="insert_ignore", # use insert_ignore to avoid overwritting existing items
        )
    
    # remove temp file
    os.remove("cmip6-monthly-ensemble-items-temp.json")

cmip6-collections.json :

{"id":"hurs","title":"Near-Surface Relative Humidity","description":"Near-Surface Relative Humidity, %","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"huss","title":"Near-Surface Specific Humidity","description":"Near-Surface Specific Humidity dimensionless ratio (kg/kg)","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"pr","title":"Precipitation","description":"Precipitation (mean of the daily precipitation rate) mm/month","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"rlds","title":"Surface Downwelling Longwave Radiation","description":"Surface Downwelling Longwave Radiation W m⁻²","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"rsds","title":"Surface Downwelling Shortwave Radiation","description":"Surface Downwelling Shortwave Radiation W m⁻²","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"sfcWind","title":"Daily-Mean Near-Surface Wind Speed","description":"Daily-Mean Near-Surface Wind Speed m s⁻¹","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"tas","title":"Daily Near-Surface Air Temperature","description":"Daily Near-Surface Air Temperature Degrees Kelvin (convert to C)","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"tasmax","title":"Maximum Daily Near-Surface Air Temperature","description":"Maximum Daily Near-Surface Air Temperature Degrees Kelvin (convert to C)","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}
{"id":"tasmin","title":"Minimum Daily Near-Surface Air Temperature","description":"Surface Downwelling Longwave Radiation W m⁻²","stac_version":"1.0.0","license":"public-domain","links":[],"extent":{"spatial":{"bbox":[[-180,-60,180,90]]},"temporal":{"interval":[["1950-01-01T00:00:00Z","2100-01-01T00:00:00Z"]]}}}

Usage:

TODO

@leothomas
Copy link
Contributor Author

CMIP6 is being tracked here: NASA-IMPACT/veda-data-pipelines#191

This issue can be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant