Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Removes Kerchunk recipe pipeline tools #786

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repos:
rev: 24.4.2
hooks:
- id: black
args: ["--line-length", "100"]
args: ["--line-length", "120"]

- repo: https://github.com/PyCQA/flake8
rev: 7.1.0
Expand Down
1 change: 0 additions & 1 deletion ci/requirements-upstream-dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
git+https://github.com/fsspec/filesystem_spec.git
git+https://github.com/pydata/xarray.git
git+https://github.com/fsspec/kerchunk.git
git+https://github.com/zarr-developers/zarr-python.git
7 changes: 0 additions & 7 deletions docs/composition/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,3 @@ The following example recipes are organized according to the {doc}`../styles` th
noaa-oisst
gpcp-from-gcs
```

**Examples of [](../styles.md#open-with-kerchunk-write-to-virtual-zarr):**

```{note}
Examples of this recipe style currently exist in development form, and will be cited here as soon as they
are integration tested, which is pending <https://github.com/pangeo-forge/pangeo-forge-recipes/issues/608>.
```
34 changes: 0 additions & 34 deletions docs/composition/styles.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,3 @@ selecting this option, it is therefore up to you, the user, to ensure that the i
{doc}`file pattern <file_patterns>` for the appending recipe are limited to those which you want to
append.
```


## Open with Kerchunk, write to virtual Zarr

The standard Zarr recipe creates a copy of the original dataset in the Zarr format, this
[kerchunk](https://fsspec.github.io/kerchunk/)-based reference recipe style does not copy the
data and instead creates a Kerchunk mapping, which allows archival formats (including NetCDF, GRIB2, etc.) to be read _as if_ they were Zarr datasets. More details about how Kerchunk works can be found in the
[kerchunk docs](https://fsspec.github.io/kerchunk/detail.html) and
[this blog post](https://medium.com/pangeo/fake-it-until-you-make-it-reading-goes-netcdf4-data-on-aws-s3-as-zarr-for-rapid-data-access-61e33f8fe685).

```{note}
Examples of this recipe style currently exist in development form, and will be cited here as soon as they
are integration tested, which is pending <https://github.com/pangeo-forge/pangeo-forge-recipes/issues/608>.
```

### Is this style right for my dataset?

For archival data stored on highly-throughput storage devices, and for which
preprocessing is not required, reference recipes are an ideal and storage-efficient option.
When choosing whether to create a reference recipe, it is important to consider questions such as:

#### Where are the archival (i.e. source) files for this dataset currently stored?

If the original data are not already in the cloud (or some other high-bandwidth storage device,
such as an on-prem data center), the performance benefits of using a reference recipe may be limited,
because network speeds to access the original data will constrain I/O throughput.

#### Does this dataset require preprocessing?

With reference recipes, modification of the underlying data is not possible. For example, the
chunking schema of a dataset cannot be modified with Kerchunk, so you are limited to the chunk schema of the
archival data. If you need to optimize your datasets chunking schema for space or time, the standard Zarr
recipe is the only option. While you cannot modify chunking in a reference recipe, changes in the metadata
(attributes, encoding, etc.) can be applied.
3 changes: 0 additions & 3 deletions docs/composition/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ you'll need to open it somehow. Pangeo Forge currently provides the following op
- {class}`pangeo_forge_recipes.transforms.OpenURLWithFSSpec`
- ⚙️ `cache` - <a href="#configurable-kwargs">Deploy-time configurable keyword argument</a>
- {class}`pangeo_forge_recipes.transforms.OpenWithXarray`
- {class}`pangeo_forge_recipes.transforms.OpenWithKerchunk`

## Preprocessors

Expand All @@ -58,8 +57,6 @@ for this purpose and included in your recipe.

- {class}`pangeo_forge_recipes.transforms.StoreToZarr`
- ⚙️ `target_root` - <a href="#configurable-kwargs">Deploy-time configurable keyword argument</a>
- {class}`pangeo_forge_recipes.transforms.WriteCombinedReference`
- ⚙️ `target_root` - <a href="#configurable-kwargs">Deploy-time configurable keyword argument</a>

## What's next

Expand Down
9 changes: 2 additions & 7 deletions examples/feedstock/gpcp_from_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
StoreToZarr,
)

dates = [
d.to_pydatetime().strftime("%Y%m%d")
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]
dates = [d.to_pydatetime().strftime("%Y%m%d") for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")]


def make_url(time):
Expand All @@ -32,9 +29,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
import xarray as xr

ds = xr.open_dataset(store, engine="zarr", chunks={})
assert ds.title == (
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
)
assert ds.title == ("Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3")
# Making sure that the native chunking is different from the dynamic chunking
assert ds.chunks["time"][0] == 1

Expand Down
9 changes: 2 additions & 7 deletions examples/feedstock/gpcp_from_gcs_dynamic_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr

dates = [
d.to_pydatetime().strftime("%Y%m%d")
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]
dates = [d.to_pydatetime().strftime("%Y%m%d") for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")]


def make_url(time):
Expand All @@ -29,9 +26,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
import xarray as xr

ds = xr.open_dataset(store, engine="zarr", chunks={})
assert ds.title == (
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
)
assert ds.title == ("Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3")

assert ds.chunks["time"][0] == 2
return store
Expand Down
57 changes: 0 additions & 57 deletions examples/feedstock/hrrr_kerchunk_concat_step.py

This file was deleted.

73 changes: 0 additions & 73 deletions examples/feedstock/hrrr_kerchunk_concat_valid_time.py

This file was deleted.

2 changes: 0 additions & 2 deletions examples/feedstock/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,3 @@ recipes:
object: "noaa_oisst:recipe"
- id: "terraclimate"
object: "terraclimate:recipe"
- id: "hrrr-kerchunk-concat-step"
object: "hrrr_kerchunk_concat_step:recipe"
8 changes: 2 additions & 6 deletions examples/feedstock/terraclimate.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def make_filename(variable, time):
)


pattern = FilePattern(
make_filename, ConcatDim(name="time", keys=years), MergeDim(name="variable", keys=variables)
)
pattern = FilePattern(make_filename, ConcatDim(name="time", keys=years), MergeDim(name="variable", keys=variables))


class Munge(beam.PTransform):
Expand Down Expand Up @@ -109,9 +107,7 @@ def _preproc(self, item: Indexed[xr.Dataset]) -> Indexed[xr.Dataset]:
"using a water balance model and plant extractable soil water capacity derived "
"from Wang-Erlandsson et al (2016)."
),
"title": (
"TerraClimate: monthly climate and climatic water balance for global land surfaces"
),
"title": ("TerraClimate: monthly climate and climatic water balance for global land surfaces"),
"summary": (
"This archive contains a dataset of high-spatial resolution (1/24th degree, ~4-km) "
"monthly climate and climatic water balance for global terrestrial surfaces from "
Expand Down
17 changes: 4 additions & 13 deletions pangeo_forge_recipes/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def _combine_xarray_schemas(
s1: Optional[XarraySchema], s2: Optional[XarraySchema], concat_dim: Optional[str] = None
) -> XarraySchema:
if s1 is None and s2 is None:
raise ValueError(
"Encountered two empty XarraySchemas during combine: one must be non-empty"
)
raise ValueError("Encountered two empty XarraySchemas during combine: one must be non-empty")
if s1 is None:
assert s2 is not None
return s2
Expand All @@ -65,9 +63,7 @@ def _combine_xarray_schemas(
}


def _combine_dims(
d1: Dict[str, int], d2: Dict[str, int], concat_dim: Optional[str]
) -> Dict[str, int]:
def _combine_dims(d1: Dict[str, int], d2: Dict[str, int], concat_dim: Optional[str]) -> Dict[str, int]:
if not d1:
return d2
all_dims = set(d1) | set(d2)
Expand Down Expand Up @@ -233,14 +229,9 @@ def schema_to_template_ds(

target_chunks = determine_target_chunks(schema, specified_chunks)

data_vars = {
name: _to_variable(template, target_chunks)
for name, template in schema["data_vars"].items()
}
data_vars = {name: _to_variable(template, target_chunks) for name, template in schema["data_vars"].items()}

coords = {
name: _to_variable(template, target_chunks) for name, template in schema["coords"].items()
}
coords = {name: _to_variable(template, target_chunks) for name, template in schema["coords"].items()}
dataset_attrs = schema["attrs"]

if attrs and isinstance(attrs, dict):
Expand Down
21 changes: 5 additions & 16 deletions pangeo_forge_recipes/chunk_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ def from_uniform_grid(cls, chunksize_and_dimsize: Dict[str, Tuple[int, int]]):
raise ValueError("chunksize must be greater than 0")
if chunksize > dimsize:
# TODO: make sure this path is more thoroughly tested
warnings.warn(
f"chunksize ({chunksize}) > dimsize ({dimsize}). "
f"Decreasing chunksize to {dimsize}"
)
warnings.warn(f"chunksize ({chunksize}) > dimsize ({dimsize}). " f"Decreasing chunksize to {dimsize}")
chunksize = dimsize
chunks = (dimsize // chunksize) * (chunksize,)
if dimsize % chunksize > 0:
Expand Down Expand Up @@ -83,8 +80,7 @@ def consolidate(self, factors: Dict[str, int]) -> ChunkGrid:
# doesn't seem like the kosher way to do this but /shrug
new = self.__class__({})
new._chunk_axes = {
name: ca.consolidate(factors[name]) if name in factors else ca
for name, ca in self._chunk_axes.items()
name: ca.consolidate(factors[name]) if name in factors else ca for name, ca in self._chunk_axes.items()
}
return new

Expand All @@ -95,27 +91,20 @@ def subset(self, factors: Dict[str, int]) -> ChunkGrid:
# doesn't seem like the kosher way to do this but /shrug
new = self.__class__({})
new._chunk_axes = {
name: ca.subset(factors[name]) if name in factors else ca
for name, ca in self._chunk_axes.items()
name: ca.subset(factors[name]) if name in factors else ca for name, ca in self._chunk_axes.items()
}
return new

def chunk_index_to_array_slice(self, chunk_index: Dict[str, int]) -> Dict[str, slice]:
"""Convert a single index from chunk space to a slice in array space
for each specified dimension."""

return {
name: self._chunk_axes[name].chunk_index_to_array_slice(idx)
for name, idx in chunk_index.items()
}
return {name: self._chunk_axes[name].chunk_index_to_array_slice(idx) for name, idx in chunk_index.items()}

def array_index_to_chunk_index(self, array_index: Dict[str, int]) -> Dict[str, int]:
"""Figure out which chunk a single array-space index comes from
for each specified dimension."""
return {
name: self._chunk_axes[name].array_index_to_chunk_index(idx)
for name, idx in array_index.items()
}
return {name: self._chunk_axes[name].array_index_to_chunk_index(idx) for name, idx in array_index.items()}

def array_slice_to_chunk_slice(self, array_slices: Dict[str, slice]) -> Dict[str, slice]:
"""Find all chunks that intersect with a given array-space slice
Expand Down
8 changes: 2 additions & 6 deletions pangeo_forge_recipes/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,11 @@ def create_accumulator(self) -> SchemaAccumulator:
concat_dim = self.dimension.name if self.dimension.operation == CombineOp.CONCAT else None
return (None, concat_dim)

def add_input(
self, accumulator: SchemaAccumulator, item: Indexed[XarraySchema]
) -> SchemaAccumulator:
def add_input(self, accumulator: SchemaAccumulator, item: Indexed[XarraySchema]) -> SchemaAccumulator:
acc_schema, acc_concat_dim = accumulator
next_index, next_schema = item
if acc_concat_dim:
assert (
acc_concat_dim not in next_schema["chunks"]
), "Concat dim should be unchunked for new input"
assert acc_concat_dim not in next_schema["chunks"], "Concat dim should be unchunked for new input"
position = self.get_position(next_index)
# Copy to avoid side effects (just python things)
next_schema = copy.deepcopy(next_schema)
Expand Down
Loading