diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index f9638f9b..47b32383 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -1,6 +1,7 @@ """ Command to run a pangeo-forge recipe """ +import hashlib import os import re import string @@ -244,6 +245,20 @@ def start(self): extra_options = {} for name, recipe in recipes.items(): + if len(recipes) > 1: + recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5] + per_recipe_unique_job_name = ( + self.job_name[: 62 - 6] + "-" + recipe_name_hash + ) + self.log.info( + f"Deploying > 1 recipe. Modifying base {self.job_name = } for recipe " + f"{name = } with {recipe_name_hash = }. Submitting job with modified " + f"{per_recipe_unique_job_name = }. Note: job names must be <= 63 chars. " + "If job_name was > 57 chars, it was truncated to accomodate modification." + ) + else: + per_recipe_unique_job_name = None + # if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt # file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes if isinstance(recipe, PTransform): @@ -256,7 +271,7 @@ def start(self): ) pipeline_options = bakery.get_pipeline_options( - job_name=self.job_name, + job_name=(per_recipe_unique_job_name or self.job_name), # FIXME: Bring this in from meta.yaml? container_image=self.container_image, extra_options=extra_options, @@ -301,7 +316,10 @@ def start(self): # Some bakeries are blocking - if Beam is configured to use them, calling # pipeline.run() blocks. Some are not. We handle that here, and provide # appropriate feedback to the user too. - extra = {"recipe": name, "job_name": self.job_name} + extra = { + "recipe": name, + "job_name": (per_recipe_unique_job_name or self.job_name), + } if bakery.blocking: self.log.info( f"Running job for recipe {name}\n", diff --git a/tests/rewriter-tests/callable-args-injection-dictobj/original.py b/tests/rewriter-tests/callable-args-injection-dictobj/original.py new file mode 100644 index 00000000..0eea8a93 --- /dev/null +++ b/tests/rewriter-tests/callable-args-injection-dictobj/original.py @@ -0,0 +1,5 @@ +def some_callable(some_argument): + pass + + +recipes = {"a": some_callable(), "b": some_callable()} diff --git a/tests/rewriter-tests/callable-args-injection-dictobj/params.py b/tests/rewriter-tests/callable-args-injection-dictobj/params.py new file mode 100644 index 00000000..8b50eacc --- /dev/null +++ b/tests/rewriter-tests/callable-args-injection-dictobj/params.py @@ -0,0 +1,4 @@ +# Parameters to be passed to RecipeRewriter constructor +params = dict( + prune=False, callable_args_injections={"some_callable": {"some_argument": 42}} +) diff --git a/tests/rewriter-tests/callable-args-injection-dictobj/rewritten.py b/tests/rewriter-tests/callable-args-injection-dictobj/rewritten.py new file mode 100644 index 00000000..8b0f7e5e --- /dev/null +++ b/tests/rewriter-tests/callable-args-injection-dictobj/rewritten.py @@ -0,0 +1,16 @@ +def some_callable(some_argument): + pass + + +recipes = { + "a": some_callable( + some_argument=_CALLABLE_ARGS_INJECTIONS.get("some_callable", {}).get( # noqa + "some_argument" + ) + ), + "b": some_callable( + some_argument=_CALLABLE_ARGS_INJECTIONS.get("some_callable", {}).get( # noqa + "some_argument" + ) + ), +} diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 9ea12079..464c6522 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -1,3 +1,4 @@ +import hashlib import json import re import subprocess @@ -101,6 +102,20 @@ def test_container_name_validation(container_image, raises): assert bake.container_image == container_image +@pytest.fixture(params=["recipe_object", "dict_object"]) +def recipes_version_ref(request): + pfr_version = parse_version(version("pangeo-forge-recipes")) + if pfr_version >= parse_version("0.10"): + recipes_version_ref = "0.10.x" + else: + recipes_version_ref = "0.9.x" + return ( + recipes_version_ref + if not request.param == "dict_object" + else f"{recipes_version_ref}-dictobj" + ) + + @pytest.mark.parametrize( ("recipe_id", "expected_error", "custom_job_name"), ( @@ -114,7 +129,17 @@ def test_container_name_validation(container_image, raises): [None, None, "special-name-for-job"], ), ) -def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): +def test_gpcp_bake( + minio, recipe_id, expected_error, custom_job_name, recipes_version_ref +): + if recipes_version_ref == "0.9.x-dictobj" or ( + recipes_version_ref == "0.10.x-dictobj" and recipe_id + ): + # TODO: clarify fixturing story to avoid this hackiness + pytest.skip( + "We only test dictobjs for recipes >0.10.0, and without recipe_id's" + ) + fsspec_args = { "key": minio["username"], "secret": minio["password"], @@ -148,12 +173,6 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): if custom_job_name: config["Bake"].update({"job_name": custom_job_name}) - pfr_version = parse_version(version("pangeo-forge-recipes")) - if pfr_version >= parse_version("0.10"): - recipe_version_ref = "0.10.x" - else: - recipe_version_ref = "0.9.x" - with tempfile.NamedTemporaryFile("w", suffix=".json") as f: json.dump(config, f) f.flush() @@ -165,7 +184,7 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): "--ref", # in the test feedstock, tags are named for the recipes version # which was used to write the recipe module - recipe_version_ref, + recipes_version_ref, "--json", "-f", f.name, @@ -180,14 +199,20 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): else: assert proc.returncode == 0 - for line in stdout: - if "Running job for recipe gpcp" in line: - job_name = json.loads(line)["job_name"] + job_name_logs = [ + json.loads(line) for line in stdout if "Running job for recipe " in line + ] + job_names = {line["recipe"]: line["job_name"] for line in job_name_logs} + for recipe_name, job_name in job_names.items(): + if custom_job_name: + assert job_name.startswith(custom_job_name) + else: + assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-") - if custom_job_name: - assert job_name == custom_job_name - else: - assert job_name.startswith("gh-pforgetest-gpcp-from-gcs-") + if "dictobj" in recipes_version_ref: + assert job_name.endswith( + hashlib.sha256(recipe_name.encode()).hexdigest()[:5] + ) # In pangeo-forge-recipes>=0.10.0, the actual zarr store is produced in a # *subpath* of target_storage.rootpath, rather than in the @@ -195,25 +220,34 @@ def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): # versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495 # has more information - if pfr_version >= parse_version("0.10"): - zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/" + if recipes_version_ref == "0.10.x": + zarr_store_full_paths = [config["TargetStorage"]["root_path"] + "gpcp/"] + elif recipes_version_ref == "0.10.x-dictobj": + zarr_store_root_path = config["TargetStorage"]["root_path"] + zarr_store_full_paths = [ + zarr_store_root_path + store_name + for store_name in ["gpcp-dict-key-0", "gpcp-dict-key-1"] + ] else: - zarr_store_path = config["TargetStorage"]["root_path"] - # Open the generated dataset with xarray! - gpcp = xr.open_dataset( - # We specify a store_name of "gpcp" in the test recipe - zarr_store_path, - backend_kwargs={"storage_options": fsspec_args}, - engine="zarr", - ) - - assert ( - gpcp.title - == "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3" - ) - # --prune prunes to two time steps by default, so we expect 2 items here - assert len(gpcp.precip) == 2 - print(gpcp) + zarr_store_full_paths = [config["TargetStorage"]["root_path"]] + + # Open the generated datasets with xarray! + for path in zarr_store_full_paths: + print(f"Opening dataset for {path = }") + ds = xr.open_dataset( + # We specify a store_name of "gpcp" in the test recipe + path, + backend_kwargs={"storage_options": fsspec_args}, + engine="zarr", + ) + + assert ( + ds.title + == "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3" + ) + # --prune prunes to two time steps by default, so we expect 2 items here + assert len(ds.precip) == 2 + print(ds) # `mc` isn't the best way, but we want to display all the files in our minio with tempfile.TemporaryDirectory() as mcd: