Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Summarizing STAC Collections: Take One #31

Open
anayeaye opened this issue Feb 24, 2022 · 3 comments
Open

Summarizing STAC Collections: Take One #31

anayeaye opened this issue Feb 24, 2022 · 3 comments
Assignees
Labels
question Further information is requested

Comments

@anayeaye
Copy link
Collaborator

anayeaye commented Feb 24, 2022

The problem

For MVP dashboard visualization, we need to provide some basic information summarizing all Items in a Collection:

  1. Collection specific time picker requires either the current temporal range of the dataset or a list unique dates for a dataset that does not have periodically spaced captures.
  2. Collection specific rescale values from aggregated raster band statistics over all items (the average of the means, min of the minimums, and max of the maximums).

We intend to leverage the pgstac database to provide custom summary information in the future--such as summary statistics for items within a bounding box and datetime range.

Iterative solution

Commit to implementing a use-case specific version of the core stac-spec summaries collection property. Our implementation is intended to support the dashboard and will supply datetime and raster statistics for a single default map layer asset across the entire collection. Multi asset spectral collections do not map well to this pattern and need further consideration. This may not be the best solution but it captures where we are starting (hopefully in time to change course if there is a better solution).

Insert snapshot summary in collection record

As a first step, we are querying pgstac and updating collection records so that the dashboard can begin relying on the /collections/{collection-id} endpoint to request collection datetime summary and scaling values.

Some corners were cut with assumptions and deferred work to implement a POC for review:
Assumptions

  • All collections have a boolean dashboard:is_periodic property
  • Items in a collection have a single default map layer asset (named cog_default) that is a single-band geotiff

Deferred work

  • This hack only inserts summaries in collections that do not already have a summary, jsonb_set is needed to update a collection summary
  • This hack does not yet dynamically identify the temporal range of periodic collections instead of just using temporal domain of the collection which is not guaranteed to be up to date
  • This requires manual execution (it isn't clean enough to merit automating anyway)

POC SQL

Here's how we got started:

UPDATE collections SET "content" =
    (SELECT jsonb_insert(
        collections."content"::jsonb,
        '{summaries}',
        jsonb_build_object(
            'datetime', (
                    CASE 
                        WHEN (collections."content"->>'dashboard:is_periodic')::boolean 
                        THEN (to_jsonb(array[
                            to_char(min(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
                            to_char(max(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')
                        ]))
                        ELSE jsonb_agg(DISTINCT items."content"->'properties'->>'datetime')
                    END     
            ,
            'cog_default', json_build_object(
                'avg', avg((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'mean')::float),
                'min', min((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'minimum')::float),
                'max', max((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'maximum')::float)
            )
        ) 
    )
    FROM items
    JOIN collections ON (items.collection_id = collections.id)
    WHERE collections.id = 'no2-monthly'
    GROUP BY collections."content")
WHERE collections.id = 'no2-monthly';

Example summaries object:

"summaries": {
    "datetime": ["2016-01-01T00:00:00Z", "2022-01-01T00:00:00Z"],
    "cog_default": {
      "avg": 382798140137793,
      "max": 50064805976866820,
      "min": -6618294421291008
    }
  }

User defined functions to generate summary

To build out this solution, the next step will be to decompose this SQL into smaller functions (some of which may already be defined for pgstac). Here are some possible supporting functions that could be assembed into a method that inserts a new summary or updates an existing summary:

  • get_asset_raster_summary(items, assetkey) get aggregate raster statistics over all selected items for a given asset
  • get_temporal_domain(items) get the up to date temporal domain over selected items
  • get_unique_datetimes(items) get a list of unique datetimes over selected items

Using selected items rather than collection id as parameters, the same functions could be used to support full collection summaries and selected items summaries (for future API development).

Triggered summary update in collection record

If the above UDF concept pans out and we have a better idea of ingest patterns, we might decide to add triggers to keep collection summaries up to date.

Dynamic summary of search results and custom APIs

Pgstac has a nifty request customization parameter (conf) that can be used to turn context on/off (i.e. number of items matched). We might be able to rig up a something similar to enable returning summary statistics for a stac item search without creating a new endpoint.

{
    "filter": {
        </SNIP>
    },
    "conf":{
        "context":"on/off/auto",
        "summaries":"on/off"
     }
}
@anayeaye anayeaye added the question Further information is requested label Feb 25, 2022
@anayeaye
Copy link
Collaborator Author

anayeaye commented Mar 1, 2022

Slightly improved SQL handles set and insert dynamically and implements common collections selection

WITH coll_agg AS (
    SELECT json_build_object(
        'datetime', (
            CASE
                WHEN (c."content"->>'dashboard:is_periodic')::boolean 
                THEN (to_jsonb(array[
                    to_char(min(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
                    to_char(max(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')]))
                ELSE jsonb_agg(DISTINCT i."content"->'properties'->>'datetime')
            END
        ),
        'cog_default', json_build_object(
            'avg', avg((i."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'mean')::float),
            'min', min((i."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'minimum')::float),
            'max', max((i."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'maximum')::float)
        )
    ) summaries,
    c.id summary_collection_id
    FROM c
    JOIN items ON i.collection_id = c.id
    GROUP BY c."content", c.id
)
UPDATE collections c
SET "content" = 
SELECT
    (
    	CASE 
        	WHEN EXISTS "content"->'summaries'
	        THEN jsonb_set(
	            c."content"::jsonb,
	            '{summaries}',
	            coll_agg.summaries
	        )
	        ELSE jsonb_insert(
	            c."content"::jsonb,
	            '{summaries}',
	            coll_agg.summaries
	        )
    	END
    )
WHERE c.id = :c_id
JOIN coll_agg ON coll_agg.summary_collection_id = c.id;

@anayeaye
Copy link
Collaborator Author

anayeaye commented Mar 3, 2022

Incremental improvements, next will define custom schema and function.
Changed:

  • concatenate/merge nested summaries into collection content instead of conditional set/insert
  • use item.datetime column exclusively (last iteration used item.content.properties.datetime), for items with start/end datetime the start date is used for the distinct datetimes list.
  • remove average of means from cog summary
  • only cog_default summary = null if the collection does not have a cog_default object
-- Custom dashboard schema extends pgstac with functions supporting dashboard-specific api metadata
CREATE SCHEMA IF NOT EXISTS dashboard;

-- Add that schema to your path
SET SEARCH_PATH TO dashboard, pgstac, public;

-- This change is ONLY going to stick around for the current session
-- You can change the default search_path for a database role by altering the role
-- ALTER ROLE delta SET SEARCH_PATH TO dashboard, pgstac, public;


-- SQL Functions documentation - https://www.postgresql.org/docs/13/xfunc-sql.html

CREATE OR REPLACE FUNCTION update_default_summary(_collection_id text) RETURNS VOID AS $$
WITH coll_item_cte AS (
    SELECT jsonb_build_object(
        'summaries',
        jsonb_build_object(
            'datetime', (
	            CASE
	            WHEN (collections."content"->>'dashboard:is_periodic')::boolean
	            THEN (to_jsonb(array[
	                to_char(min(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
	                to_char(max(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')]))
	            ELSE jsonb_agg(distinct to_char(datetime at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'))
	            END
	        ),
	        'cog_default', (
	        	CASE
	            WHEN collections."content"->'item_assets' ? 'cog_default'
	            THEN jsonb_build_object(
	                'min', min((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'minimum')::float),
	                'max', max((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'maximum')::float)
	                )
	            ELSE NULL
	            END
			)
		)
    ) summaries,
    collections.id coll_id
    FROM items
    JOIN collections on items.collection_id = collections.id
    WHERE collections.id = _collection_id
    GROUP BY collections."content" , collections.id
)
UPDATE collections SET "content" = "content" || coll_item_cte.summaries
FROM coll_item_cte 
WHERE collections.id = coll_item_cte.coll_id;
$$ LANGUAGE SQL SET SEARCH_PATH TO dashboard, pgstac, public;

CREATE OR REPLACE FUNCTION update_all_default_summaries() RETURNS VOID AS $$
SELECT 
    update_default_summary(id)
FROM collections
WHERE collections."content" ?| array['item_assets', 'dashboard:is_periodic'];
$$ LANGUAGE SQL SET SEARCH_PATH TO dashboard, pgstac, public;

@anayeaye
Copy link
Collaborator Author

Resolved by PR #43

Boilerplate python
This expects that the database information is stored in a variable named con_str, so the code to retrieve and populate that variable is also included.

import json
import boto3
import psycopg
from psycopg import sql
from psycopg.conninfo import make_conninfo

def update_collection_summaries(cursor, collection_id: str) -> None:
  """Update summaries object in pgstac for all items in collection"""
  cursor.execute(
    sql.SQL("""
        SELECT update_default_summaries(id)
        FROM collections
        WHERE collections.id = (%s);
        """), (collection["id"],)
  )

def get_secret(secret_name:str, profile_name:str=None) -> None:
    """Retrieve secrets from AWS Secrets Manager

    Args:
        secret_name (str): name of aws secrets manager secret containing database connection secrets
        profile_name (str, optional): optional name of aws profile, default is used if not defined

    Returns:
        secrets (dict): decrypted secrets in dict
    """

    # Create a Secrets Manager client
    if profile_name:
        session = boto3.session.Session(profile_name=profile_name)
    else:
        session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager'
    )

    # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'AccessDeniedException':
            raise e
        if e.response['Error']['Code'] == 'DecryptionFailureException':
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException':
            # An error occurred on the server side.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException':
            # You provided an invalid value for a parameter.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException':
            # You provided a parameter value that is not valid for the current state of the resource.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException':
            # We can't find the resource that you asked for.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
    else:
        # Decrypts secret using the associated KMS key.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if 'SecretString' in get_secret_value_response:
            return json.loads(get_secret_value_response['SecretString'])
        else:
            return json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))

# String id of STAC collection to summarize
collection_id = "collection-id"

# Get connection info
secret_name = "name or arn of delta/pgstac (not admin) connection secret in target env"
con_secrets = get_secret(secret_name)

# Connect and execute update summaries
con_str = make_conninfo(
    dbname=con_secrets["dbname"],
    user=con_secrets["username"],
    password=con_secrets["password"],
    host=con_secrets["host"],
    port=con_secrets["port"],
)
with psycopg.connect(con_str, autocommit=True) as conn:
    with conn.cursor() as cur:
        print("Adding default collection summaries")
        update_collection_summaries(cur, collection_id)

cc: @abarciauskas-bgse @leothomas this function is now deployed to delta staging and can be executed using the python boiler plate code above or a just a psql connection and SELECT update_default_summaries('nightlights-hd-monthly');.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant