From 86ce8ef1a3cad8b34c710fbf06bcb9bd733149ac Mon Sep 17 00:00:00 2001 From: ranchodeluxe Date: Wed, 29 Nov 2023 07:46:27 -0800 Subject: [PATCH 1/6] flink unique names --- pangeo_forge_runner/commands/bake.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 34050130..f4ba1a25 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -259,6 +259,15 @@ def start(self): else: per_recipe_unique_job_name = None + # FlinkOperatorBakery job names need to be unique regardless of the number of recipes + if self.bakery_class == FlinkOperatorBakery and not per_recipe_unique_job_name: + recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5] + # character length limitations for k8s is already handled downstream + # in FlinkOperatorBakery.get_pipeline_options + per_recipe_unique_job_name = ( + self.job_name + "-" + recipe_name_hash + ) + # if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt # file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes if isinstance(recipe, PTransform): From a20fc4a19a4697acb76da8d78868b6f9fd8292f2 Mon Sep 17 00:00:00 2001 From: ranchodeluxe Date: Wed, 29 Nov 2023 07:47:56 -0800 Subject: [PATCH 2/6] moar --- pangeo_forge_runner/commands/bake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index f4ba1a25..09062f14 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -259,7 +259,7 @@ def start(self): else: per_recipe_unique_job_name = None - # FlinkOperatorBakery job names need to be unique regardless of the number of recipes + # FlinkOperatorBakery job names need to be unique regardless of the number of recipes for reruns if self.bakery_class == FlinkOperatorBakery and not per_recipe_unique_job_name: recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5] # character length limitations for k8s is already handled downstream From d125710585c019dfac5ed869f3d6c1900bd217a5 Mon Sep 17 00:00:00 2001 From: ranchodeluxe Date: Wed, 29 Nov 2023 12:38:48 -0800 Subject: [PATCH 3/6] moar --- pangeo_forge_runner/commands/bake.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 09062f14..aa8e609e 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -4,6 +4,7 @@ import hashlib import os import re +import secrets import string import time from importlib.metadata import distributions @@ -259,13 +260,14 @@ def start(self): else: per_recipe_unique_job_name = None - # FlinkOperatorBakery job names need to be unique regardless of the number of recipes for reruns + # FlinkOperatorBakery job names need to be unique regardless of the number + # of recipes to accommodate reruns and race conditions (two users running same recipe) if self.bakery_class == FlinkOperatorBakery and not per_recipe_unique_job_name: - recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5] + unique_suffix = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(4)) # character length limitations for k8s is already handled downstream # in FlinkOperatorBakery.get_pipeline_options per_recipe_unique_job_name = ( - self.job_name + "-" + recipe_name_hash + self.job_name + "-" + unique_suffix ) # if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt From b8978e82ce5362a241e76d1ae7e9c8ab3787ed01 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 29 Nov 2023 20:39:22 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pangeo_forge_runner/commands/bake.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index aa8e609e..9650d4cc 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -262,13 +262,17 @@ def start(self): # FlinkOperatorBakery job names need to be unique regardless of the number # of recipes to accommodate reruns and race conditions (two users running same recipe) - if self.bakery_class == FlinkOperatorBakery and not per_recipe_unique_job_name: - unique_suffix = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(4)) + if ( + self.bakery_class == FlinkOperatorBakery + and not per_recipe_unique_job_name + ): + unique_suffix = "".join( + secrets.choice(string.ascii_letters + string.digits) + for _ in range(4) + ) # character length limitations for k8s is already handled downstream # in FlinkOperatorBakery.get_pipeline_options - per_recipe_unique_job_name = ( - self.job_name + "-" + unique_suffix - ) + per_recipe_unique_job_name = self.job_name + "-" + unique_suffix # if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt # file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes From 95a848df0346bde97ca6b0aede4315c7f637a2e6 Mon Sep 17 00:00:00 2001 From: ranchodeluxe Date: Thu, 30 Nov 2023 08:06:58 -0800 Subject: [PATCH 5/6] add tests --- pangeo_forge_runner/commands/bake.py | 28 +++++++++++++++++++--------- tests/unit/test_bake.py | 25 +++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index aa8e609e..d3e9acd8 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -172,6 +172,21 @@ def autogenerate_job_name(self): return job_name + def add_unique_suffix_to_flink_jobs(self, per_recipe_unique_job_name): + """FlinkOperatorBakery job names always need to be unique + + to accommodate reruns and race conditions (two users running same recipe) + """ + if self.bakery_class == FlinkOperatorBakery: + unique_suffix = "".join( + secrets.choice(string.ascii_letters + string.digits) for _ in range(4) + ) + # character length limitations for k8s is already handled downstream + # in FlinkOperatorBakery.get_pipeline_options + return self.job_name + "-" + unique_suffix + else: + return per_recipe_unique_job_name + def start(self): """ Start the baking process @@ -260,15 +275,10 @@ def start(self): else: per_recipe_unique_job_name = None - # FlinkOperatorBakery job names need to be unique regardless of the number - # of recipes to accommodate reruns and race conditions (two users running same recipe) - if self.bakery_class == FlinkOperatorBakery and not per_recipe_unique_job_name: - unique_suffix = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(4)) - # character length limitations for k8s is already handled downstream - # in FlinkOperatorBakery.get_pipeline_options - per_recipe_unique_job_name = ( - self.job_name + "-" + unique_suffix - ) + # no-op here if self.bakery_class != FlinkOperatorBakery + per_recipe_unique_job_name = self.add_unique_suffix_to_flink_jobs( + per_recipe_unique_job_name + ) # if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt # file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 5281b908..e6ebc9c1 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -10,6 +10,8 @@ import xarray as xr from packaging.version import parse as parse_version +from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery +from pangeo_forge_runner.bakery.local import LocalDirectBakery from pangeo_forge_runner.commands.bake import Bake @@ -116,6 +118,29 @@ def recipes_version_ref(request): ) +@pytest.mark.parametrize( + ("job_name", "bakery_class", "expected_job_startswith"), + ( + ["recipe", FlinkOperatorBakery, "recipe-"], + ["recipe", LocalDirectBakery, "recipe"], + [None, LocalDirectBakery, None], + ), +) +def test_add_unique_suffix_to_flink_jobs( + job_name, bakery_class, expected_job_startswith +): + bake = Bake() + if bakery_class == FlinkOperatorBakery: + actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name) + assert actual_job_name.startswith(expected_job_startswith) + pattern = r"^[a-zA-Z]+-[0-9a-zA-Z]{5}$" + assert bool(re.search(pattern, actual_job_name)) + else: + actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name) + assert actual_job_name == actual_job_name + assert actual_job_name == expected_job_startswith + + @pytest.mark.parametrize( ("recipe_id", "expected_error", "custom_job_name", "no_input_cache"), ( From 49abe579286c61f8bbb5c4f2dd5c2aef519df033 Mon Sep 17 00:00:00 2001 From: ranchodeluxe Date: Thu, 30 Nov 2023 08:25:39 -0800 Subject: [PATCH 6/6] fix --- pangeo_forge_runner/commands/bake.py | 2 +- tests/unit/test_bake.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index d3e9acd8..c3e30b67 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -179,7 +179,7 @@ def add_unique_suffix_to_flink_jobs(self, per_recipe_unique_job_name): """ if self.bakery_class == FlinkOperatorBakery: unique_suffix = "".join( - secrets.choice(string.ascii_letters + string.digits) for _ in range(4) + secrets.choice(string.ascii_letters + string.digits) for _ in range(5) ) # character length limitations for k8s is already handled downstream # in FlinkOperatorBakery.get_pipeline_options diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index e6ebc9c1..3f31249a 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -130,6 +130,9 @@ def test_add_unique_suffix_to_flink_jobs( job_name, bakery_class, expected_job_startswith ): bake = Bake() + bake.job_name = job_name + bake.bakery_class = bakery_class + if bakery_class == FlinkOperatorBakery: actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name) assert actual_job_name.startswith(expected_job_startswith) @@ -137,7 +140,7 @@ def test_add_unique_suffix_to_flink_jobs( assert bool(re.search(pattern, actual_job_name)) else: actual_job_name = bake.add_unique_suffix_to_flink_jobs(job_name) - assert actual_job_name == actual_job_name + assert job_name == actual_job_name assert actual_job_name == expected_job_startswith