Skip to content

Commit

Permalink
Merge pull request #139 from pangeo-forge/dict-obj-unique-names-2
Browse files Browse the repository at this point in the history
Reimplement unique job names from #84
  • Loading branch information
cisaacstern authored Nov 20, 2023
2 parents 90479ab + 16a821a commit 6cba6bc
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 35 deletions.
22 changes: 20 additions & 2 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Command to run a pangeo-forge recipe
"""
import hashlib
import os
import re
import string
Expand Down Expand Up @@ -244,6 +245,20 @@ def start(self):
extra_options = {}

for name, recipe in recipes.items():
if len(recipes) > 1:
recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5]
per_recipe_unique_job_name = (
self.job_name[: 62 - 6] + "-" + recipe_name_hash
)
self.log.info(
f"Deploying > 1 recipe. Modifying base {self.job_name = } for recipe "
f"{name = } with {recipe_name_hash = }. Submitting job with modified "
f"{per_recipe_unique_job_name = }. Note: job names must be <= 63 chars. "
"If job_name was > 57 chars, it was truncated to accomodate modification."
)
else:
per_recipe_unique_job_name = None

# if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt
# file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes
if isinstance(recipe, PTransform):
Expand All @@ -256,7 +271,7 @@ def start(self):
)

pipeline_options = bakery.get_pipeline_options(
job_name=self.job_name,
job_name=(per_recipe_unique_job_name or self.job_name),
# FIXME: Bring this in from meta.yaml?
container_image=self.container_image,
extra_options=extra_options,
Expand Down Expand Up @@ -301,7 +316,10 @@ def start(self):
# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
# appropriate feedback to the user too.
extra = {"recipe": name, "job_name": self.job_name}
extra = {
"recipe": name,
"job_name": (per_recipe_unique_job_name or self.job_name),
}
if bakery.blocking:
self.log.info(
f"Running job for recipe {name}\n",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def some_callable(some_argument):
pass


recipes = {"a": some_callable(), "b": some_callable()}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Parameters to be passed to RecipeRewriter constructor
params = dict(
prune=False, callable_args_injections={"some_callable": {"some_argument": 42}}
)
16 changes: 16 additions & 0 deletions tests/rewriter-tests/callable-args-injection-dictobj/rewritten.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def some_callable(some_argument):
pass


recipes = {
"a": some_callable(
some_argument=_CALLABLE_ARGS_INJECTIONS.get("some_callable", {}).get( # noqa
"some_argument"
)
),
"b": some_callable(
some_argument=_CALLABLE_ARGS_INJECTIONS.get("some_callable", {}).get( # noqa
"some_argument"
)
),
}
100 changes: 67 additions & 33 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import json
import re
import subprocess
Expand Down Expand Up @@ -101,6 +102,20 @@ def test_container_name_validation(container_image, raises):
assert bake.container_image == container_image


@pytest.fixture(params=["recipe_object", "dict_object"])
def recipes_version_ref(request):
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipes_version_ref = "0.10.x"
else:
recipes_version_ref = "0.9.x"
return (
recipes_version_ref
if not request.param == "dict_object"
else f"{recipes_version_ref}-dictobj"
)


@pytest.mark.parametrize(
("recipe_id", "expected_error", "custom_job_name"),
(
Expand All @@ -114,7 +129,17 @@ def test_container_name_validation(container_image, raises):
[None, None, "special-name-for-job"],
),
)
def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
def test_gpcp_bake(
minio, recipe_id, expected_error, custom_job_name, recipes_version_ref
):
if recipes_version_ref == "0.9.x-dictobj" or (
recipes_version_ref == "0.10.x-dictobj" and recipe_id
):
# TODO: clarify fixturing story to avoid this hackiness
pytest.skip(
"We only test dictobjs for recipes >0.10.0, and without recipe_id's"
)

fsspec_args = {
"key": minio["username"],
"secret": minio["password"],
Expand Down Expand Up @@ -148,12 +173,6 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
if custom_job_name:
config["Bake"].update({"job_name": custom_job_name})

pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = "0.10.x"
else:
recipe_version_ref = "0.9.x"

with tempfile.NamedTemporaryFile("w", suffix=".json") as f:
json.dump(config, f)
f.flush()
Expand All @@ -165,7 +184,7 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
"--ref",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipe_version_ref,
recipes_version_ref,
"--json",
"-f",
f.name,
Expand All @@ -180,40 +199,55 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
else:
assert proc.returncode == 0

for line in stdout:
if "Running job for recipe gpcp" in line:
job_name = json.loads(line)["job_name"]
job_name_logs = [
json.loads(line) for line in stdout if "Running job for recipe " in line
]
job_names = {line["recipe"]: line["job_name"] for line in job_name_logs}
for recipe_name, job_name in job_names.items():
if custom_job_name:
assert job_name.startswith(custom_job_name)
else:
assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-")

if custom_job_name:
assert job_name == custom_job_name
else:
assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-")
if "dictobj" in recipes_version_ref:
assert job_name.endswith(
hashlib.sha256(recipe_name.encode()).hexdigest()[:5]
)

# In pangeo-forge-recipes>=0.10.0, the actual zarr store is produced in a
# *subpath* of target_storage.rootpath, rather than in the
# root path itself. This is a compatibility break vs the previous
# versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495
# has more information

if pfr_version >= parse_version("0.10"):
zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/"
if recipes_version_ref == "0.10.x":
zarr_store_full_paths = [config["TargetStorage"]["root_path"] + "gpcp/"]
elif recipes_version_ref == "0.10.x-dictobj":
zarr_store_root_path = config["TargetStorage"]["root_path"]
zarr_store_full_paths = [
zarr_store_root_path + store_name
for store_name in ["gpcp-dict-key-0", "gpcp-dict-key-1"]
]
else:
zarr_store_path = config["TargetStorage"]["root_path"]
# Open the generated dataset with xarray!
gpcp = xr.open_dataset(
# We specify a store_name of "gpcp" in the test recipe
zarr_store_path,
backend_kwargs={"storage_options": fsspec_args},
engine="zarr",
)

assert (
gpcp.title
== "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3"
)
# --prune prunes to two time steps by default, so we expect 2 items here
assert len(gpcp.precip) == 2
print(gpcp)
zarr_store_full_paths = [config["TargetStorage"]["root_path"]]

# Open the generated datasets with xarray!
for path in zarr_store_full_paths:
print(f"Opening dataset for {path = }")
ds = xr.open_dataset(
# We specify a store_name of "gpcp" in the test recipe
path,
backend_kwargs={"storage_options": fsspec_args},
engine="zarr",
)

assert (
ds.title
== "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3"
)
# --prune prunes to two time steps by default, so we expect 2 items here
assert len(ds.precip) == 2
print(ds)

# `mc` isn't the best way, but we want to display all the files in our minio
with tempfile.TemporaryDirectory() as mcd:
Expand Down

0 comments on commit 6cba6bc

Please sign in to comment.