Skip to content

Commit

Permalink
Merge pull request #171 from pangeo-forge/npz/feature/internal-integr…
Browse files Browse the repository at this point in the history
…ation-fixtures

Move integration testing feedstock to tests directory
  • Loading branch information
ranchodeluxe authored Feb 17, 2024
2 parents 028903c + 59042fc commit 340dc74
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docs/tutorial/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ we must answer:
### Where Data is Discovered

Recipe input is defined in the recipe itself. Most recipes will define a `pangeo_forge_recipes.patterns.FilePattern` that provides the pipeline with input file locations.
The example below taken from the [integration test recipe](https://github.com/pforgetest/gpcp-from-gcs-feedstock/blob/main/feedstock/recipe.py)
The example below taken from [this example recipe](https://github.com/pforgetest/gpcp-from-gcs-feedstock/blob/main/feedstock/recipe.py)

```python
import apache_beam as beam
Expand Down
11 changes: 7 additions & 4 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path

import escapism
from apache_beam import Pipeline
from apache_beam import Pipeline, PTransform
from traitlets import Bool, Type, Unicode, validate

from .. import Feedstock
Expand Down Expand Up @@ -266,10 +266,13 @@ def start(self):
# Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline
# for pipeline options - we have traitlets doing that for us.
pipeline = Pipeline(options=pipeline_options, argv=[])

# Chain our recipe to the pipeline. This mutates the `pipeline` object!
# We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam'
# method that returns a transform.
pipeline | recipe
# We expect `recipe` to be 1) a beam PTransform or 2) or a a string that leverages the
# `dict_object:` see `tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/meta.yaml`
# as an example
if isinstance(recipe, PTransform):
pipeline | recipe

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
Expand Down
13 changes: 7 additions & 6 deletions tests/integration/flink/test_flink_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import tempfile
import time
from importlib.metadata import version
from pathlib import Path

import xarray as xr
from packaging.version import parse as parse_version

TEST_DATA_DIR = Path(__file__).parent.parent.parent / "test-data"


def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):
fsspec_args = {
Expand All @@ -17,7 +20,7 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):

pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
recipe_version_ref = "0.10.x"

bucket = "s3://gpcp-out"
config = {
Expand Down Expand Up @@ -59,11 +62,9 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):
"pangeo-forge-runner",
"bake",
"--repo",
"https://github.com/pforgetest/gpcp-from-gcs-feedstock.git",
"--ref",
# in the test feedstock, tags are named for
# the recipe version used to write the recipe module
recipe_version_ref,
str(TEST_DATA_DIR / "gpcp-from-gcs"),
"--feedstock-subdir",
f"feedstock-{recipe_version_ref}",
"-f",
f.name,
]
Expand Down
31 changes: 25 additions & 6 deletions tests/integration/test_dataflow_integration.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import json
import subprocess
import sys
import tempfile
import time
from importlib.metadata import version
from pathlib import Path

import pytest
import xarray as xr
from packaging.version import parse as parse_version

TEST_DATA_DIR = Path(__file__).parent.parent / "test-data"


def test_dataflow_integration():
python_version = (
f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
)
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
recipe_version_ref = "0.10.x"
else:
raise ValueError(
f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer."
Expand All @@ -22,6 +29,7 @@ def test_dataflow_integration():
"Bake": {
"prune": True,
"bakery_class": "pangeo_forge_runner.bakery.dataflow.DataflowBakery",
"job_name": f"gpcp-from-gcs-py{python_version.replace('.','')}-v{''.join([str(i) for i in pfr_version.release])}",
},
"DataflowBakery": {"temp_gcs_location": bucket + "/temp"},
"TargetStorage": {
Expand All @@ -41,11 +49,9 @@ def test_dataflow_integration():
"pangeo-forge-runner",
"bake",
"--repo",
"https://github.com/pforgetest/gpcp-from-gcs-feedstock.git",
"--ref",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipe_version_ref,
str(TEST_DATA_DIR / "gpcp-from-gcs"),
"--feedstock-subdir",
f"feedstock-{recipe_version_ref}",
"--json",
"-f",
f.name,
Expand All @@ -70,6 +76,15 @@ def test_dataflow_integration():

# okay, time to start checking if the job is done
show_job = f"gcloud dataflow jobs show {job_id} --format=json".split()
show_job_errors = [
"gcloud",
"logging",
"read",
f'resource.type="dataflow_step" AND resource.labels.job_id="{job_id}" AND severity>=ERROR',
"--limit",
"500",
"--format=json",
]
while True:
elapsed = time.time() - start
print(f"Time {elapsed = }")
Expand All @@ -93,6 +108,10 @@ def test_dataflow_integration():
# still running, let's give it another 30s then check again
time.sleep(30)
else:
# try to get some output to the stdout so we don't have to log into the GCP console
state_proc = subprocess.run(show_job_errors, capture_output=True)
assert state_proc.returncode == 0
print(json.loads(state_proc.stdout))
# consider any other state a failure
pytest.fail(f"{state = } is neither 'Done' nor 'Running'")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
title: "Test meta.yaml with list of recipes"
recipes:
- dict_object: 'recipe:recipes'
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
recipes = {"test_1": "test_1", "test_2": "test_2"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pangeo-forge-recipes>=0.10.0,<0.11
31 changes: 31 additions & 0 deletions tests/test-data/gpcp-from-gcs/feedstock-0.10.x/meta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
title: "Global Precipitation Climatology Project"
description: >
Global Precipitation Climatology Project (GPCP) Daily Version 1.3 gridded, merged ty
satellite/gauge precipitation Climate data Record (CDR) from 1996 to present.
pangeo_forge_version: "0.9.0"
pangeo_notebook_version: "2022.06.02"
recipes:
- id: gpcp-from-gcs
object: "recipe:recipe"
provenance:
providers:
- name: "NOAA NCEI"
description: "National Oceanographic & Atmospheric Administration National Centers for Environmental Information"
roles:
- host
- licensor
url: https://www.ncei.noaa.gov/products/global-precipitation-climatology-project
- name: "University of Maryland"
description: >
University of Maryland College Park Earth System Science Interdisciplinary Center
(ESSIC) and Cooperative Institute for Climate and Satellites (CICS).
roles:
- producer
url: http://gpcp.umd.edu/
license: "No constraints on data access or use."
maintainers:
- name: "Charles Stern"
orcid: "0000-0002-4078-0852"
github: cisaacstern
bakery:
id: "pangeo-ldeo-nsf-earthcube"
35 changes: 35 additions & 0 deletions tests/test-data/gpcp-from-gcs/feedstock-0.10.x/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import apache_beam as beam
import pandas as pd
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)

dates = [
d.to_pydatetime().strftime("%Y%m%d")
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]


def make_url(time):
url_base = "https://storage.googleapis.com/pforge-test-data"
return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"


concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, concat_dim)


recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(
file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"}
)
| StoreToZarr(
store_name="gpcp",
combine_dims=pattern.combine_dim_keys,
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
gcsfs
pangeo-forge-recipes>=0.10.0,<0.11
s3fs
23 changes: 13 additions & 10 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import sys
import tempfile
from importlib.metadata import distributions, version
from pathlib import Path

import pytest
import xarray as xr
from packaging.version import parse as parse_version

from pangeo_forge_runner.commands.bake import Bake

TEST_DATA_DIR = Path(__file__).parent.parent / "test-data"


@pytest.fixture
def recipes_uninstalled():
Expand Down Expand Up @@ -141,10 +144,7 @@ def test_gpcp_bake(
no_input_cache,
recipes_version_ref,
):
if recipes_version_ref == "0.9.x-dictobj" or (
recipes_version_ref == "0.10.x-dictobj" and recipe_id
):
# TODO: clarify fixturing story to avoid this hackiness
if recipes_version_ref == "0.10.x-dictobj" and recipe_id:
pytest.skip(
"We only test dictobjs for recipes >0.10.0, and without recipe_id's"
)
Expand Down Expand Up @@ -190,11 +190,9 @@ def test_gpcp_bake(
"pangeo-forge-runner",
"bake",
"--repo",
"https://github.com/pforgetest/gpcp-from-gcs-feedstock.git",
"--ref",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipes_version_ref,
str(TEST_DATA_DIR / "gpcp-from-gcs"),
"--feedstock-subdir",
f"feedstock-{recipes_version_ref}",
"--json",
"-f",
f.name,
Expand All @@ -216,7 +214,7 @@ def test_gpcp_bake(
if custom_job_name:
assert job_name.startswith(custom_job_name)
else:
assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-")
assert job_name.startswith("local-gpcp-2dfrom-2dgcs-feedstock-")

if "dictobj" in recipes_version_ref:
assert job_name.endswith(
Expand All @@ -240,6 +238,11 @@ def test_gpcp_bake(
else:
zarr_store_full_paths = [config["TargetStorage"]["root_path"]]

# dictobj runs do not generate any datasets b/c they are not recipes
# so we've asserted what we can already, just move on
if recipes_version_ref.endswith("dictobj"):
return

# Open the generated datasets with xarray!
for path in zarr_store_full_paths:
print(f"Opening dataset for {path = }")
Expand Down

0 comments on commit 340dc74

Please sign in to comment.