Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement unique job names from #84 #139

Merged
merged 7 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Command to run a pangeo-forge recipe
"""
import hashlib
import os
import re
import string
Expand Down Expand Up @@ -244,6 +245,20 @@
extra_options = {}

for name, recipe in recipes.items():
if len(recipes) > 1:
recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5]
per_recipe_unique_job_name = (

Check warning on line 250 in pangeo_forge_runner/commands/bake.py

View check run for this annotation

Codecov / codecov/patch

pangeo_forge_runner/commands/bake.py#L249-L250

Added lines #L249 - L250 were not covered by tests
self.job_name[: 62 - 6] + "-" + recipe_name_hash
)
self.log.info(

Check warning on line 253 in pangeo_forge_runner/commands/bake.py

View check run for this annotation

Codecov / codecov/patch

pangeo_forge_runner/commands/bake.py#L253

Added line #L253 was not covered by tests
f"Deploying > 1 recipe. Modifying base {self.job_name = } for recipe "
f"{name = } with {recipe_name_hash = }. Submitting job with modified "
f"{per_recipe_unique_job_name = }. Note: job names must be <= 63 chars. "
"If job_name was > 57 chars, it was truncated to accomodate modification."
)
else:
per_recipe_unique_job_name = None

# if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt
# file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes
if isinstance(recipe, PTransform):
Expand All @@ -256,7 +271,7 @@
)

pipeline_options = bakery.get_pipeline_options(
job_name=self.job_name,
job_name=(per_recipe_unique_job_name or self.job_name),
# FIXME: Bring this in from meta.yaml?
container_image=self.container_image,
extra_options=extra_options,
Expand Down Expand Up @@ -301,7 +316,10 @@
# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
# appropriate feedback to the user too.
extra = {"recipe": name, "job_name": self.job_name}
extra = {
"recipe": name,
"job_name": (per_recipe_unique_job_name or self.job_name),
}
if bakery.blocking:
self.log.info(
f"Running job for recipe {name}\n",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def some_callable(some_argument):
pass


recipes = {"a": some_callable(), "b": some_callable()}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Parameters to be passed to RecipeRewriter constructor
params = dict(
prune=False, callable_args_injections={"some_callable": {"some_argument": 42}}
)
16 changes: 16 additions & 0 deletions tests/rewriter-tests/callable-args-injection-dictobj/rewritten.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def some_callable(some_argument):
pass


recipes = {
"a": some_callable(
some_argument=_CALLABLE_ARGS_INJECTIONS.get("some_callable", {}).get( # noqa
"some_argument"
)
),
"b": some_callable(
some_argument=_CALLABLE_ARGS_INJECTIONS.get("some_callable", {}).get( # noqa
"some_argument"
)
),
}
100 changes: 67 additions & 33 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import json
import re
import subprocess
Expand Down Expand Up @@ -101,6 +102,20 @@ def test_container_name_validation(container_image, raises):
assert bake.container_image == container_image


@pytest.fixture(params=["recipe_object", "dict_object"])
def recipes_version_ref(request):
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipes_version_ref = "0.10.x"
else:
recipes_version_ref = "0.9.x"
return (
recipes_version_ref
if not request.param == "dict_object"
else f"{recipes_version_ref}-dictobj"
)


@pytest.mark.parametrize(
("recipe_id", "expected_error", "custom_job_name"),
(
Expand All @@ -114,7 +129,17 @@ def test_container_name_validation(container_image, raises):
[None, None, "special-name-for-job"],
),
)
def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
def test_gpcp_bake(
minio, recipe_id, expected_error, custom_job_name, recipes_version_ref
):
if recipes_version_ref == "0.9.x-dictobj" or (
recipes_version_ref == "0.10.x-dictobj" and recipe_id
):
# TODO: clarify fixturing story to avoid this hackiness
pytest.skip(
"We only test dictobjs for recipes >0.10.0, and without recipe_id's"
)

fsspec_args = {
"key": minio["username"],
"secret": minio["password"],
Expand Down Expand Up @@ -148,12 +173,6 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
if custom_job_name:
config["Bake"].update({"job_name": custom_job_name})

pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = "0.10.x"
else:
recipe_version_ref = "0.9.x"

with tempfile.NamedTemporaryFile("w", suffix=".json") as f:
json.dump(config, f)
f.flush()
Expand All @@ -165,7 +184,7 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
"--ref",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipe_version_ref,
recipes_version_ref,
"--json",
"-f",
f.name,
Expand All @@ -180,40 +199,55 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
else:
assert proc.returncode == 0

for line in stdout:
if "Running job for recipe gpcp" in line:
job_name = json.loads(line)["job_name"]
job_name_logs = [
json.loads(line) for line in stdout if "Running job for recipe " in line
]
job_names = {line["recipe"]: line["job_name"] for line in job_name_logs}
for recipe_name, job_name in job_names.items():
if custom_job_name:
assert job_name.startswith(custom_job_name)
else:
assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-")

if custom_job_name:
assert job_name == custom_job_name
else:
assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-")
if "dictobj" in recipes_version_ref:
assert job_name.endswith(
hashlib.sha256(recipe_name.encode()).hexdigest()[:5]
)

# In pangeo-forge-recipes>=0.10.0, the actual zarr store is produced in a
# *subpath* of target_storage.rootpath, rather than in the
# root path itself. This is a compatibility break vs the previous
# versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495
# has more information

if pfr_version >= parse_version("0.10"):
zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/"
if recipes_version_ref == "0.10.x":
zarr_store_full_paths = [config["TargetStorage"]["root_path"] + "gpcp/"]
elif recipes_version_ref == "0.10.x-dictobj":
zarr_store_root_path = config["TargetStorage"]["root_path"]
zarr_store_full_paths = [
zarr_store_root_path + store_name
for store_name in ["gpcp-dict-key-0", "gpcp-dict-key-1"]
]
else:
zarr_store_path = config["TargetStorage"]["root_path"]
# Open the generated dataset with xarray!
gpcp = xr.open_dataset(
# We specify a store_name of "gpcp" in the test recipe
zarr_store_path,
backend_kwargs={"storage_options": fsspec_args},
engine="zarr",
)

assert (
gpcp.title
== "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3"
)
# --prune prunes to two time steps by default, so we expect 2 items here
assert len(gpcp.precip) == 2
print(gpcp)
zarr_store_full_paths = [config["TargetStorage"]["root_path"]]

# Open the generated datasets with xarray!
for path in zarr_store_full_paths:
print(f"Opening dataset for {path = }")
ds = xr.open_dataset(
# We specify a store_name of "gpcp" in the test recipe
path,
backend_kwargs={"storage_options": fsspec_args},
engine="zarr",
)

assert (
ds.title
== "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3"
)
# --prune prunes to two time steps by default, so we expect 2 items here
assert len(ds.precip) == 2
print(ds)

# `mc` isn't the best way, but we want to display all the files in our minio
with tempfile.TemporaryDirectory() as mcd:
Expand Down
Loading