diff --git a/docs/tutorial/flink.md b/docs/tutorial/flink.md index 9d7cc4cf..1b644497 100644 --- a/docs/tutorial/flink.md +++ b/docs/tutorial/flink.md @@ -74,7 +74,7 @@ we must answer: ### Where Data is Discovered Recipe input is defined in the recipe itself. Most recipes will define a `pangeo_forge_recipes.patterns.FilePattern` that provides the pipeline with input file locations. -The example below taken from the [integration test recipe](https://github.com/pforgetest/gpcp-from-gcs-feedstock/blob/main/feedstock/recipe.py) +The example below taken from [this example recipe](https://github.com/pforgetest/gpcp-from-gcs-feedstock/blob/main/feedstock/recipe.py) ```python import apache_beam as beam diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 942a2b2a..c2024d3b 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -11,7 +11,7 @@ from pathlib import Path import escapism -from apache_beam import Pipeline +from apache_beam import Pipeline, PTransform from traitlets import Bool, Type, Unicode, validate from .. import Feedstock @@ -266,10 +266,13 @@ def start(self): # Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline # for pipeline options - we have traitlets doing that for us. pipeline = Pipeline(options=pipeline_options, argv=[]) + # Chain our recipe to the pipeline. This mutates the `pipeline` object! - # We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam' - # method that returns a transform. - pipeline | recipe + # We expect `recipe` to be 1) a beam PTransform or 2) or a a string that leverages the + # `dict_object:` see `tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/meta.yaml` + # as an example + if isinstance(recipe, PTransform): + pipeline | recipe # Some bakeries are blocking - if Beam is configured to use them, calling # pipeline.run() blocks. Some are not. We handle that here, and provide diff --git a/tests/integration/flink/test_flink_integration.py b/tests/integration/flink/test_flink_integration.py index 6f1d9260..dcf2e22a 100644 --- a/tests/integration/flink/test_flink_integration.py +++ b/tests/integration/flink/test_flink_integration.py @@ -3,10 +3,13 @@ import tempfile import time from importlib.metadata import version +from pathlib import Path import xarray as xr from packaging.version import parse as parse_version +TEST_DATA_DIR = Path(__file__).parent.parent.parent / "test-data" + def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion): fsspec_args = { @@ -17,7 +20,7 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion): pfr_version = parse_version(version("pangeo-forge-recipes")) if pfr_version >= parse_version("0.10"): - recipe_version_ref = str(pfr_version) + recipe_version_ref = "0.10.x" bucket = "s3://gpcp-out" config = { @@ -59,11 +62,9 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion): "pangeo-forge-runner", "bake", "--repo", - "https://github.com/pforgetest/gpcp-from-gcs-feedstock.git", - "--ref", - # in the test feedstock, tags are named for - # the recipe version used to write the recipe module - recipe_version_ref, + str(TEST_DATA_DIR / "gpcp-from-gcs"), + "--feedstock-subdir", + f"feedstock-{recipe_version_ref}", "-f", f.name, ] diff --git a/tests/integration/test_dataflow_integration.py b/tests/integration/test_dataflow_integration.py index 2e56c285..aa8baaeb 100644 --- a/tests/integration/test_dataflow_integration.py +++ b/tests/integration/test_dataflow_integration.py @@ -1,18 +1,25 @@ import json import subprocess +import sys import tempfile import time from importlib.metadata import version +from pathlib import Path import pytest import xarray as xr from packaging.version import parse as parse_version +TEST_DATA_DIR = Path(__file__).parent.parent / "test-data" + def test_dataflow_integration(): + python_version = ( + f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + ) pfr_version = parse_version(version("pangeo-forge-recipes")) if pfr_version >= parse_version("0.10"): - recipe_version_ref = str(pfr_version) + recipe_version_ref = "0.10.x" else: raise ValueError( f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer." @@ -22,6 +29,7 @@ def test_dataflow_integration(): "Bake": { "prune": True, "bakery_class": "pangeo_forge_runner.bakery.dataflow.DataflowBakery", + "job_name": f"gpcp-from-gcs-py{python_version.replace('.','')}-v{''.join([str(i) for i in pfr_version.release])}", }, "DataflowBakery": {"temp_gcs_location": bucket + "/temp"}, "TargetStorage": { @@ -41,11 +49,9 @@ def test_dataflow_integration(): "pangeo-forge-runner", "bake", "--repo", - "https://github.com/pforgetest/gpcp-from-gcs-feedstock.git", - "--ref", - # in the test feedstock, tags are named for the recipes version - # which was used to write the recipe module - recipe_version_ref, + str(TEST_DATA_DIR / "gpcp-from-gcs"), + "--feedstock-subdir", + f"feedstock-{recipe_version_ref}", "--json", "-f", f.name, @@ -70,6 +76,15 @@ def test_dataflow_integration(): # okay, time to start checking if the job is done show_job = f"gcloud dataflow jobs show {job_id} --format=json".split() + show_job_errors = [ + "gcloud", + "logging", + "read", + f'resource.type="dataflow_step" AND resource.labels.job_id="{job_id}" AND severity>=ERROR', + "--limit", + "500", + "--format=json", + ] while True: elapsed = time.time() - start print(f"Time {elapsed = }") @@ -93,6 +108,10 @@ def test_dataflow_integration(): # still running, let's give it another 30s then check again time.sleep(30) else: + # try to get some output to the stdout so we don't have to log into the GCP console + state_proc = subprocess.run(show_job_errors, capture_output=True) + assert state_proc.returncode == 0 + print(json.loads(state_proc.stdout)) # consider any other state a failure pytest.fail(f"{state = } is neither 'Done' nor 'Running'") diff --git a/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/meta.yaml b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/meta.yaml new file mode 100644 index 00000000..e99761e2 --- /dev/null +++ b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/meta.yaml @@ -0,0 +1,3 @@ +title: "Test meta.yaml with list of recipes" +recipes: + - dict_object: 'recipe:recipes' diff --git a/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/recipe.py b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/recipe.py new file mode 100644 index 00000000..63f27dac --- /dev/null +++ b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/recipe.py @@ -0,0 +1 @@ +recipes = {"test_1": "test_1", "test_2": "test_2"} diff --git a/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/requirements.txt b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/requirements.txt new file mode 100644 index 00000000..54cf4171 --- /dev/null +++ b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x-dictobj/requirements.txt @@ -0,0 +1 @@ +pangeo-forge-recipes>=0.10.0,<0.11 diff --git a/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/meta.yaml b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/meta.yaml new file mode 100644 index 00000000..8dc6847c --- /dev/null +++ b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/meta.yaml @@ -0,0 +1,31 @@ +title: "Global Precipitation Climatology Project" +description: > + Global Precipitation Climatology Project (GPCP) Daily Version 1.3 gridded, merged ty + satellite/gauge precipitation Climate data Record (CDR) from 1996 to present. +pangeo_forge_version: "0.9.0" +pangeo_notebook_version: "2022.06.02" +recipes: + - id: gpcp-from-gcs + object: "recipe:recipe" +provenance: + providers: + - name: "NOAA NCEI" + description: "National Oceanographic & Atmospheric Administration National Centers for Environmental Information" + roles: + - host + - licensor + url: https://www.ncei.noaa.gov/products/global-precipitation-climatology-project + - name: "University of Maryland" + description: > + University of Maryland College Park Earth System Science Interdisciplinary Center + (ESSIC) and Cooperative Institute for Climate and Satellites (CICS). + roles: + - producer + url: http://gpcp.umd.edu/ + license: "No constraints on data access or use." +maintainers: + - name: "Charles Stern" + orcid: "0000-0002-4078-0852" + github: cisaacstern +bakery: + id: "pangeo-ldeo-nsf-earthcube" diff --git a/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/recipe.py b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/recipe.py new file mode 100644 index 00000000..48baa13e --- /dev/null +++ b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/recipe.py @@ -0,0 +1,35 @@ +import apache_beam as beam +import pandas as pd +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +from pangeo_forge_recipes.transforms import ( + OpenURLWithFSSpec, + OpenWithXarray, + StoreToZarr, +) + +dates = [ + d.to_pydatetime().strftime("%Y%m%d") + for d in pd.date_range("1996-10-01", "1999-02-01", freq="D") +] + + +def make_url(time): + url_base = "https://storage.googleapis.com/pforge-test-data" + return f"{url_base}/gpcp/v01r03_daily_d{time}.nc" + + +concat_dim = ConcatDim("time", dates, nitems_per_file=1) +pattern = FilePattern(make_url, concat_dim) + + +recipe = ( + beam.Create(pattern.items()) + | OpenURLWithFSSpec() + | OpenWithXarray( + file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"} + ) + | StoreToZarr( + store_name="gpcp", + combine_dims=pattern.combine_dim_keys, + ) +) diff --git a/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/requirements.txt b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/requirements.txt new file mode 100644 index 00000000..ef09cace --- /dev/null +++ b/tests/test-data/gpcp-from-gcs/feedstock-0.10.x/requirements.txt @@ -0,0 +1,3 @@ +gcsfs +pangeo-forge-recipes>=0.10.0,<0.11 +s3fs diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index f10fcb41..429d2974 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -5,6 +5,7 @@ import sys import tempfile from importlib.metadata import distributions, version +from pathlib import Path import pytest import xarray as xr @@ -12,6 +13,8 @@ from pangeo_forge_runner.commands.bake import Bake +TEST_DATA_DIR = Path(__file__).parent.parent / "test-data" + @pytest.fixture def recipes_uninstalled(): @@ -141,10 +144,7 @@ def test_gpcp_bake( no_input_cache, recipes_version_ref, ): - if recipes_version_ref == "0.9.x-dictobj" or ( - recipes_version_ref == "0.10.x-dictobj" and recipe_id - ): - # TODO: clarify fixturing story to avoid this hackiness + if recipes_version_ref == "0.10.x-dictobj" and recipe_id: pytest.skip( "We only test dictobjs for recipes >0.10.0, and without recipe_id's" ) @@ -190,11 +190,9 @@ def test_gpcp_bake( "pangeo-forge-runner", "bake", "--repo", - "https://github.com/pforgetest/gpcp-from-gcs-feedstock.git", - "--ref", - # in the test feedstock, tags are named for the recipes version - # which was used to write the recipe module - recipes_version_ref, + str(TEST_DATA_DIR / "gpcp-from-gcs"), + "--feedstock-subdir", + f"feedstock-{recipes_version_ref}", "--json", "-f", f.name, @@ -216,7 +214,7 @@ def test_gpcp_bake( if custom_job_name: assert job_name.startswith(custom_job_name) else: - assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-") + assert job_name.startswith("local-gpcp-2dfrom-2dgcs-feedstock-") if "dictobj" in recipes_version_ref: assert job_name.endswith( @@ -240,6 +238,11 @@ def test_gpcp_bake( else: zarr_store_full_paths = [config["TargetStorage"]["root_path"]] + # dictobj runs do not generate any datasets b/c they are not recipes + # so we've asserted what we can already, just move on + if recipes_version_ref.endswith("dictobj"): + return + # Open the generated datasets with xarray! for path in zarr_store_full_paths: print(f"Opening dataset for {path = }")