Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

12 generalize downloader #13

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions climateset/download/abstract_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class AbstractDownloader(ABC):
@abstractmethod
def download(self):
pass
186 changes: 186 additions & 0 deletions climateset/download/abstract_downloader_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import logging
from abc import ABC
from pathlib import Path
from typing import Union

import yaml

from climateset import CONFIGS, RAW_DATA
from climateset.download.constants.esgf import ESGF_PROJECTS, ESGF_PROJECTS_CONSTANTS
from climateset.utils import create_logger

LOGGER = create_logger(__name__)


class AbstractDownloaderConfig(ABC):
def __init__(
self,
project: str,
data_dir: Union[str, Path] = RAW_DATA,
experiments: list[str] = None,
variables: list[str] = None,
overwrite: bool = False,
logger: logging.Logger = LOGGER,
):
self.logger = logger

self.project = ""
uppercase_project = project.upper()
for p in ESGF_PROJECTS:
if p.upper() == uppercase_project:
self.project = p

if self.project not in ESGF_PROJECTS:
self.logger.error(f"Project {self.project} has not been implemented in the Downloader yet.")
raise ValueError(
f"Project {self.project} is not recognized. Consider adding a constant class in download/constants and "
f"the esgf.py file."
)

if isinstance(data_dir, str):
data_dir = Path(data_dir)
self.data_dir = data_dir

self.experiments = experiments
self.variables = variables
self.overwrite = overwrite

# init shared constants
self.proj_constants = ESGF_PROJECTS_CONSTANTS[self.project]
self.node_link = self.proj_constants.NODE_LINK
self.avail_variables = self.proj_constants.VAR_SOURCE_LOOKUP
self.avail_experiments = self.proj_constants.SUPPORTED_EXPERIMENTS

def generate_config_file(self, config_file_name: str, config_path: Union[str, Path] = CONFIGS) -> None:
if isinstance(config_path, str):
config_path = Path(config_path)
if not config_file_name.endswith(".yaml"):
config_file_name = f"{config_file_name}.yaml"

config_full_path = config_path / config_file_name
data = {self.project: {}}
for key, value in self.__dict__.items():
if key not in ["project", "logger"] and not callable(value):
data[self.project][key] = value
with open(config_full_path, "w") as config_file:
yaml.dump(data, config_file, indent=2)


class Input4mipsDownloaderConfig(AbstractDownloaderConfig):
def __init__(
self,
project: str,
data_dir: str = RAW_DATA,
experiments: list[str] = None,
variables: list[str] = None,
download_biomassburning: bool = True, # get biomassburning data for input4mips
download_metafiles: bool = True, # get input4mips meta files
use_plain_emission_vars: bool = True,
overwrite: bool = False,
logger: logging.Logger = LOGGER,
):
super().__init__(project, data_dir, experiments, variables, overwrite, logger)

self.download_metafiles: bool = download_metafiles # TODO infer automatically from vars
self.download_biomass_burning: bool = download_biomassburning # TODO infer automatically from vars
self.use_plain_emission_vars: bool = use_plain_emission_vars

self.emissions_endings = self.proj_constants.EMISSIONS_ENDINGS
self.meta_endings_prc = self.proj_constants.META_ENDINGS_PRC
self.meta_endings_share = self.proj_constants.META_ENDINGS_SHAR
self.mip_area = self.proj_constants.MIP_ERA
self.target_mip = self.proj_constants.TARGET_MIP

# Attributes that are going to be retrieved / set within this class for
## (all)
self.vars: list[str] = variables
## (climate model inputs)
self.biomass_vars: list[str] = []
self.meta_vars_percentage: list[str] = []
self.meta_vars_share: list[str] = []

self._handle_emission_variables(
variables=variables,
)

def _handle_emission_variables(self, variables: list[str]):
self.vars = []
self._generate_raw_emission_vars(variables=variables)
self._generate_plain_emission_vars()
self.logger.info(f"Emission variables to download: {self.vars}")
if self.download_biomass_burning:
self.logger.info(f"Biomass burning vars to download: {self.biomass_vars}")
if self.download_metafiles:
self.logger.info(
f"Meta emission vars to download:\n\t{self.meta_vars_percentage}\n\t{self.meta_vars_share}"
)

def _generate_raw_emission_vars(self, variables: list[str]):
if variables is None:
# variables = ["tas", "pr", "SO2_em_anthro", "BC_em_anthro"]
raise ValueError("No variables have been given to the downloader. Variables must be given for downloader.")
variables = [v.replace(" ", "_").replace("-", "_") for v in variables]
self.logger.info(f"Cleaned variables : {variables}")
for v in variables:
self.vars.append(v)

def _generate_plain_emission_vars(self):
if self.use_plain_emission_vars:
# plain vars are biomass vars
self.biomass_vars = self.vars
self.meta_vars_percentage = [
biomass_var + ending
for biomass_var in self.biomass_vars
if biomass_var != "CO2"
for ending in self.meta_endings_prc
]
self.meta_vars_share = [
biomass_var + ending
for biomass_var in self.biomass_vars
if biomass_var != "CO2"
for ending in self.meta_endings_share
]

self.vars = [
variable + emission_ending for variable in self.vars for emission_ending in self.emissions_endings
]
# be careful with CO2
if "CO2_em_openburning" in self.vars:
self.vars.remove("CO2_em_openburning")
else:
# get plain input4mips vars = biomass vars for historical
self.biomass_vars = list({v.split("_")[0] for v in self.vars})
# remove biomass vars from normal vars list
for b in self.biomass_vars:
try:
self.vars.remove(b)
except Exception as error:
self.logger.warning(f"Caught the following exception but continuing : {error}")

self.meta_vars_percentage = [
biomass_var + ending
for biomass_var in self.biomass_vars
if biomass_var != "CO2"
for ending in self.meta_endings_prc
]
self.meta_vars_share = [
biomass_var + ending
for biomass_var in self.biomass_vars
if biomass_var != "CO2"
for ending in self.meta_endings_share
]


class CMIP6DownloaderConfig(AbstractDownloaderConfig):
def __init__(
self,
project: str,
data_dir: str = RAW_DATA,
experiments: list[str] = None,
variables: list[str] = None,
overwrite: bool = False,
logger: logging.Logger = LOGGER,
):
super().__init__(project, data_dir, experiments, variables, overwrite, logger)

self.avail_models = self.proj_constants.MODEL_SOURCES
134 changes: 134 additions & 0 deletions climateset/download/cmip6_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from abstract_downloader import AbstractDownloader
from pyesgf.search import SearchConnection

from climateset.download.utils import (
_handle_base_search_constraints,
download_model_variable,
get_upload_version,
)
from climateset.utils import create_logger

LOGGER = create_logger(__name__)


class CMIP6Downloader(AbstractDownloader):
def __init__(self):
self.logger = LOGGER

def download(self):
"""
Function handling the download of all variables that are associated with a model's output.

Searches for all files associated with the respected variables and experiment that the downloader
was initialized with.

A search connection is established and the search is iteratively constraint to meet all specifications.
Data is downloaded and stored in a separate file for each year. The default format is netCDF4.

Resulting hierarchy:

`CMIPx/model_id/ensemble_member/experiment/variable/nominal_resolution/frequency/year.nc`

If the constraints cannot be met, per default behaviour for the downloader to select first other
available value
"""

for variable in self.model_vars:
self.logger.info(f"Downloading data for variable: {variable}")
for experiment in self.experiments:
if experiment in self.SUPPORTED_EXPERIMENTS:
self.logger.info(f"Downloading data for experiment: {experiment}")
self.download_from_model_single_var(project=self.project, variable=variable, experiment=experiment)
else:
self.logger.info(
f"Chosen experiment {experiment} not supported. All supported experiments: "
f"{self.SUPPORTED_EXPERIMENTS}. Skipping."
)

def download_from_model_single_var( # noqa: C901
self,
variable: str,
experiment: str,
project: str = "CMIP6",
default_frequency: str = "mon",
preferred_version: str = "latest",
default_grid_label: str = "gn",
):
"""
Function handling the download of a single variable-experiment pair that is associated with a model's output
(CMIP data).

Args:
variable: variable ID
experiment: experiment ID
project: umbrella project id e.g. CMIPx
default_frequency: default frequency to download
preferred_version: data upload version, if 'latest', the newest version will get selected always
default_grid_label: default gridding method in which the data is provided
"""
conn = SearchConnection(url=self.model_node_link, distrib=False)

facets = (
"project,experiment_id,source_id,variable,frequency,variant_label,variable, nominal_resolution, "
"version, grid_label, experiment_id"
)

self.logger.info("Using download_from_model_single_var() function")

ctx = conn.new_context(
project=project,
experiment_id=experiment,
source_id=self.model,
variable=variable,
facets=facets,
)

ctx = _handle_base_search_constraints(ctx, default_frequency, default_grid_label)

variants = list(ctx.facet_counts["variant_label"])

if len(variants) < 1:
self.logger.info(
"No items were found for this request. Please check on the esgf server if the combination of your model/scenarios/variables exists."
)
raise ValueError(
"Downloader did not find any items on esgf for your request with: Project {project}, Experiment {experiment}, Model {self.model}, Variable {variable}."
)

self.logger.info(f"Available variants : {variants}\n")
self.logger.info(f"Length : {len(variants)}")

# TODO refactor logic of if/else
if not self.ensemble_members:
if self.max_ensemble_members > len(variants):
self.logger.info("Less ensemble members available than maximum number desired. Including all variants.")
ensemble_member_final_list = variants
else:
self.logger.info(
f"{len(variants)} ensemble members available than desired (max {self.max_ensemble_members}. "
f"Choosing only the first {self.max_ensemble_members}.)."
)
ensemble_member_final_list = variants[: self.max_ensemble_members]
else:
self.logger.info(f"Desired list of ensemble members given: {self.ensemble_members}")
ensemble_member_final_list = list(set(variants) & set(self.ensemble_members))
if len(ensemble_member_final_list) == 0:
self.logger.info("WARNING: no overlap between available and desired ensemble members!")
self.logger.info("Skipping.")
return None

for ensemble_member in ensemble_member_final_list:
self.logger.info(f"Ensembles member: {ensemble_member}")
ctx_ensemble = ctx.constrain(variant_label=ensemble_member)

version = get_upload_version(context=ctx, preferred_version=preferred_version)
if version:
ctx_ensemble = ctx_ensemble.constrain(version=version)

results = ctx_ensemble.search()

self.logger.info(f"Result len {len(results)}")

download_model_variable(
model_id=self.model, search_results=results, variable=variable, base_path=self.data_dir
)
Loading
Loading