diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index c2024d3b..55fd2760 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -5,6 +5,7 @@ import hashlib import os import re +import secrets import string import time from importlib.metadata import distributions @@ -168,6 +169,21 @@ def autogenerate_job_name(self): return job_name + def add_unique_suffix_to_flink_jobs(self, per_recipe_unique_job_name): + """FlinkOperatorBakery job names always need to be unique + + to accommodate reruns and race conditions (two users running same recipe) + """ + if self.bakery_class == FlinkOperatorBakery: + unique_suffix = "".join( + secrets.choice(string.ascii_letters + string.digits) for _ in range(5) + ) + # character length limitations for k8s is already handled downstream + # in FlinkOperatorBakery.get_pipeline_options + return self.job_name + "-" + unique_suffix + else: + return per_recipe_unique_job_name + def start(self): """ Start the baking process @@ -252,6 +268,11 @@ def start(self): else: per_recipe_unique_job_name = None + # no-op here if self.bakery_class != FlinkOperatorBakery + per_recipe_unique_job_name = self.add_unique_suffix_to_flink_jobs( + per_recipe_unique_job_name + ) + requirements_path = feedstock.feedstock_dir / "requirements.txt" if requirements_path.exists(): extra_options["requirements_file"] = str(requirements_path) diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 429d2974..42d66cfc 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -11,6 +11,8 @@ import xarray as xr from packaging.version import parse as parse_version +from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery +from pangeo_forge_runner.bakery.local import LocalDirectBakery from pangeo_forge_runner.commands.bake import Bake TEST_DATA_DIR = Path(__file__).parent.parent / "test-data" @@ -121,6 +123,32 @@ def recipes_version_ref(request): ) +@pytest.mark.parametrize( + ("job_name", "bakery_class", "expected_job_startswith"), + ( + ["recipe", FlinkOperatorBakery, "recipe-"], + ["recipe", LocalDirectBakery, "recipe"], + [None, LocalDirectBakery, None], + ), +) +def test_add_unique_suffix_to_flink_jobs( + job_name, bakery_class, expected_job_startswith +): + bake = Bake() + bake.job_name = job_name + bake.bakery_class = bakery_class + + if bakery_class == FlinkOperatorBakery: + actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name) + assert actual_job_name.startswith(expected_job_startswith) + pattern = r"^[a-zA-Z]+-[0-9a-zA-Z]{5}$" + assert bool(re.search(pattern, actual_job_name)) + else: + actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name) + assert job_name == actual_job_name + assert actual_job_name == expected_job_startswith + + @pytest.mark.parametrize( ("recipe_id", "expected_error", "custom_job_name", "no_input_cache"), (