Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NREL Cambium Archiver #569

Draft
wants to merge 5 commits into
base: 561-nrel-standard-scenarios
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/pudl_archiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,19 @@ def all_archivers():

def all_subclasses(cls):
"""If a subclass has subclasses, include them in the list. Remove intermediaries."""
subclasses = set(cls.__subclasses__())
for c in subclasses.copy():
subclasses = set()
queue = set(cls.__subclasses__())
rejected = set()
while queue:
c = queue.pop()
if c in rejected:
continue
subsubclasses = set(c.__subclasses__())
if subsubclasses:
subclasses.remove(c)
subclasses = subclasses.union(subsubclasses)
rejected.add(c)
queue = queue.union(subsubclasses)
else:
subclasses.add(c)
return subclasses

return all_subclasses(AbstractDatasetArchiver)
Expand Down
109 changes: 109 additions & 0 deletions src/pudl_archiver/archivers/nrelcambium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Download NREL Cambium Scenarios data."""

import re

from pudl_archiver.archivers.classes import (
ArchiveAwaitable,
ResourceInfo,
_download_file,
)
from pudl_archiver.archivers.nrelss import (
API_URL_FILE_DOWNLOAD,
API_URL_PROJECTS_LIST,
AbstractNrelScenarioArchiver,
)
from pudl_archiver.utils import retry_async


class AbstractNrelCambiumArchiver(AbstractNrelScenarioArchiver):
"""Base class for NREL Cambium archivers."""

project_year: int
project_year_pattern = re.compile(r"Cambium (?P<year>\d{4})")
project_startswith = "Cambium "
report_section = "long_description"
file_naming_order = ("scenario", "metric", "time_resolution", "location_type")

concurrency_limit = 1 # Cambium scenarios are Large so only handle 2 at a time

async def get_resources(self) -> ArchiveAwaitable:
"""Download NREL Cambium resources.

Basic flow:
1. Fetch the list of projects and extract just the one for this archiver.
2. Pull out metadata: uuid, year, links to any PDF reports, and data files. PDF report URLs are not provided in a dedicated field in the project response, but are part of an HTML value for the description or citation in the project. Sometimes this field is simply blank, and we need to use a hard-coded exception. The data files don't have good filenames associated with them, so we make one.
4. Download each report and file for the project as separate resources.
"""
project_records = await self.get_json(API_URL_PROJECTS_LIST)
scenario_project = [
p
for p in project_records
if p["name"].startswith(f"{self.project_startswith}{self.project_year}")
]
assert len(scenario_project) == 1
scenario_project = scenario_project.pop()
(
project_uuid,
project_year,
report_data,
file_ids,
) = await self.collect_project_info(scenario_project)
assert project_uuid
for filename, url in report_data:
yield self.get_report_resource(filename, url)
for filename, file_id in file_ids:
yield self.get_file_resource(filename, project_uuid, file_id)

async def get_report_resource(self, filename, url) -> ResourceInfo:
"""Retrieve and compress PDF report and return as ResourceInfo."""
self.logger.info(f"Downloading report {filename}")
zip_path = self.download_directory / f"{filename}.zip"
await self.download_and_zip_file(url, filename, zip_path)
return ResourceInfo(
local_path=zip_path,
partitions={},
)

async def get_file_resource(self, filename, uuid, file_id) -> ResourceInfo:
"""Retrieve and data file and return as ResourceInfo."""
self.logger.info(f"Downloading file {filename} {file_id} {uuid}")
download_path = self.download_directory / filename

await retry_async(
_download_file,
[self.session, API_URL_FILE_DOWNLOAD, download_path, True],
kwargs={"data": {"project_uuid": uuid, "file_ids": file_id}},
retry_base_s=20,
)
return ResourceInfo(
local_path=download_path,
partitions={},
)


class NrelCambium2020Archiver(AbstractNrelCambiumArchiver):
"""NREL Cambium archiver for 2020."""

name = "nrelcambium2020"
project_year = 2020


class NrelCambium2021Archiver(AbstractNrelCambiumArchiver):
"""NREL Cambium archiver for 2021."""

name = "nrelcambium2021"
project_year = 2021


class NrelCambium2022Archiver(AbstractNrelCambiumArchiver):
"""NREL Cambium archiver for 2022."""

name = "nrelcambium2022"
project_year = 2022


class NrelCambium2023Archiver(AbstractNrelCambiumArchiver):
"""NREL Cambium archiver for 2023."""

name = "nrelcambium2023"
project_year = 2023
40 changes: 40 additions & 0 deletions src/pudl_archiver/metadata/nrelcambium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""NREL Cambium -specific metadata helper."""

from pudl.metadata.constants import CONTRIBUTORS, KEYWORDS, LICENSES


def nrel_cambium_generator(year):
"""Generate metadata dictionaries for NREL Cambium.

NREL Cambium datasets are too large to group together under a "years" partition, but otherwise share metadata.
"""
return {
"title": f"NREL Cambium {year}",
"path": "https://www.nrel.gov/analysis/cambium.html",
"description": (
f"""Cambium datasets contain modeled hourly data for a range of possible futures of the U.S. electricity sector, with metrics designed to be useful for forward-looking analysis and decision support.

Cambium is annually updated and expands on the metrics reported in NREL’s Standard Scenarios—another annually released set of projections of how the U.S. electric sector could evolve across a suite of potential futures.

The {year} Cambium release includes two products:

The full {year} Cambium datasets;
NREL reports describing the scenarios, defining metrics and methods, describing major changes since the last release, and discussing intended uses and limitations of the dataset."""
),
"source_file_dict": {
"source_format": "CSV",
},
"working_partitions": {},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
],
"keywords": sorted(
{
"nrel",
"cambium",
}
| set(KEYWORDS["us_govt"] + KEYWORDS["electricity"])
),
"license_raw": LICENSES["cc-by-4.0"],
"license_pudl": LICENSES["cc-by-4.0"],
}
6 changes: 6 additions & 0 deletions src/pudl_archiver/metadata/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pudl.metadata.constants import CONTRIBUTORS, KEYWORDS, LICENSES

from pudl_archiver.metadata.nrelcambium import nrel_cambium_generator

# To add a new contributor, follow the following format to add an entry to the
# ADDL_CONTRIBUTORS dictionary below formatted like this:
# "name-shorthand": {
Expand Down Expand Up @@ -454,4 +456,8 @@
"license_raw": LICENSES["cc-by-4.0"],
"license_pudl": LICENSES["cc-by-4.0"],
},
"nrelcambium2020": nrel_cambium_generator(2020),
"nrelcambium2021": nrel_cambium_generator(2021),
"nrelcambium2022": nrel_cambium_generator(2022),
"nrelcambium2023": nrel_cambium_generator(2023),
}
11 changes: 10 additions & 1 deletion src/pudl_archiver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from hashlib import md5
from io import BytesIO
from pathlib import Path
from time import time

import aiohttp
from pydantic import AnyUrl, BaseModel
Expand Down Expand Up @@ -46,14 +47,22 @@ async def retry_async(
args = []
if kwargs is None:
kwargs = {}
for try_count in range(1, retry_count + 1): # noqa: RET503
last_failure_s = time()
max_delay_s = retry_base_s * 2**retry_count
try_count = 0
while try_count < retry_count: # noqa: RET503
try_count += 1
# try count is 1 indexed for logging clarity
coro = async_func(*args, **kwargs)
try:
return await coro
except retry_on as e:
if try_count == retry_count:
raise e
current_failure_s = time()
if (current_failure_s - last_failure_s) > max_delay_s:
try_count = 1
last_failure_s = current_failure_s
retry_delay_s = retry_base_s * 2 ** (try_count - 1)
logger.info(
f"Error while executing {coro} (try #{try_count}, retry in {retry_delay_s}s): {type(e)} - {e}"
Expand Down