From 916a851c03afa9f287181d9b5059d0ee0c2fd7cb Mon Sep 17 00:00:00 2001 From: YuviPanda Date: Sun, 19 Feb 2023 16:36:26 -0800 Subject: [PATCH 01/12] Fix automatic job name detection for local runs Without this, the job names being generated were not valid dataflow names --- pangeo_forge_runner/commands/bake.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index f48ac946..c32e0944 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -3,9 +3,11 @@ """ import os import re +import string import time from pathlib import Path +import escapism from apache_beam import Pipeline, PTransform from traitlets import Bool, Type, Unicode, validate @@ -113,8 +115,19 @@ def autogenerate_job_name(self): Autogenerate a readable job_name """ # special case local checkouts, as no contentprovider is used + safe_chars = string.ascii_lowercase + string.digits if os.path.exists(self.repo): - return f"local-{os.path.basename(self.repo)}" + name = "local-" + name += escapism.escape( + os.path.basename(os.path.abspath(self.repo)), + safe=safe_chars, + escape_char="-", + ) + if self.feedstock_subdir != "feedstock": + name += "-" + escapism.escape( + self.feedstock_subdir, safe=safe_chars, escape_char="-" + ) + return name.lower() # special-case github because it is so common if self.repo.startswith("https://github.com/"): From 3cd642c078e3da985a7e82745cb4c8b41076a47f Mon Sep 17 00:00:00 2001 From: YuviPanda Date: Fri, 24 Feb 2023 17:57:41 -0800 Subject: [PATCH 02/12] Support setting arbitrary logging config --- pangeo_forge_runner/commands/base.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/pangeo_forge_runner/commands/base.py b/pangeo_forge_runner/commands/base.py index 63e8d492..30c29593 100644 --- a/pangeo_forge_runner/commands/base.py +++ b/pangeo_forge_runner/commands/base.py @@ -7,7 +7,7 @@ from pythonjsonlogger import jsonlogger from repo2docker import contentproviders -from traitlets import Bool, Instance, List, Unicode +from traitlets import Bool, Dict, Instance, List, Unicode from traitlets.config import Application # Common aliases we want to support in *all* commands @@ -41,6 +41,21 @@ class BaseCommand(Application): log_level = logging.INFO + logging_config = Dict( + {}, + config=True, + help=""" + Logging configuration for this python application. + + When set, this value is passed to logging.config.dictConfig, + and can be used to configure how logs *throughout the application* + are handled, not just for logs from this application alone. + + See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig + for more details. + """, + ) + repo = Unicode( "", config=True, @@ -199,6 +214,13 @@ def initialize(self, argv=None): # Load traitlets config from a config file if present self.load_config_file(self.config_file) + # Allow arbitrary logging config if set + # We do this first up so any custom logging we set up ourselves + # is not affected, as by default dictConfig will replace all + # existing config. + if self.logging_config: + logging.config.dictConfig(self.logging_config) + # The application communicates with the outside world via # stdout, and we structure this communication via logging. # So let's setup the default logger to log to stdout, rather From df167859dcd606eef04330c56c71a28dae72f0d2 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 14:00:48 -0700 Subject: [PATCH 03/12] add flink parallelism args --- pangeo_forge_runner/bakery/flink.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index adde2108..1d4c032c 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -10,7 +10,7 @@ import escapism from apache_beam.pipeline import PipelineOptions -from traitlets import Dict, Unicode +from traitlets import Dict, Integer, Unicode from .base import Bakery @@ -128,6 +128,26 @@ class FlinkOperatorBakery(Bakery): """, ) + parallelism = Integer( + -1, + config=True, + help=""" + The degree of parallelism to be used when distributing operations onto workers. + If the parallelism is not set, the configured Flink default is used, + or 1 if none can be found. + """, + ) + + max_parallelism = Integer( + -1, + config=True, + help=""" + The pipeline wide maximum degree of parallelism to be used. + The maximum parallelism specifies the upper limit for dynamic scaling + and the number of key groups used for partitioned state. + """, + ) + def make_flink_deployment(self, name: str, worker_image: str): """ Return YAML for a FlinkDeployment @@ -250,6 +270,8 @@ def get_pipeline_options( save_main_session=True, # this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/ pickle_library="cloudpickle", + parallelism=self.parallelism, + max_parallelism=self.max_parallelism, **extra_options, ) return PipelineOptions(**opts) From 87ec8b85f287b2f2edfa87694126395c71471191 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 15:09:47 -0700 Subject: [PATCH 04/12] upgrade flink operator to 1.5.0 --- .github/workflows/flink.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yaml b/.github/workflows/flink.yaml index 81a04284..cd71dccb 100644 --- a/.github/workflows/flink.yaml +++ b/.github/workflows/flink.yaml @@ -39,7 +39,7 @@ jobs: - name: Setup FlinkOperator run: | - FLINK_OPERATOR_VERSION=1.3.0 + FLINK_OPERATOR_VERSION=1.5.0 helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-${FLINK_OPERATOR_VERSION} helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --wait From 267034e636f4ba4533158d86a1d4c34971ec60c1 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 16:24:03 -0700 Subject: [PATCH 05/12] set default parallelism to None --- pangeo_forge_runner/bakery/flink.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index 1d4c032c..cf6ba36b 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -129,7 +129,8 @@ class FlinkOperatorBakery(Bakery): ) parallelism = Integer( - -1, + None, + allow_none=True, config=True, help=""" The degree of parallelism to be used when distributing operations onto workers. @@ -139,7 +140,8 @@ class FlinkOperatorBakery(Bakery): ) max_parallelism = Integer( - -1, + None, + allow_none=True, config=True, help=""" The pipeline wide maximum degree of parallelism to be used. From 231e35df6d49e11166ac65b6333e34b8f4ea72c7 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 16:45:48 -0700 Subject: [PATCH 06/12] print sterr if flink proc fails --- tests/integration/test_flink.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_flink.py b/tests/integration/test_flink.py index 631c44e3..519c2493 100644 --- a/tests/integration/test_flink.py +++ b/tests/integration/test_flink.py @@ -48,9 +48,12 @@ def test_flink_bake(minio): "-f", f.name, ] - proc = subprocess.run(cmd) + proc = subprocess.run(cmd, capture_output=True) - assert proc.returncode == 0 + if proc.returncode != 0: + print(proc.stderr.decode()) + + assert proc.returncode == 0 # bail if proc did not succeed # We should have some kinda 'has this completed?' check here # Instead, I just wait for 3min From 0f100acb0eac02190b1582700a3b5beb57cbb168 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 16:56:36 -0700 Subject: [PATCH 07/12] revert stderr print, capture_output=True gives us the trace. --- tests/integration/test_flink.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/test_flink.py b/tests/integration/test_flink.py index 519c2493..a1c9fab7 100644 --- a/tests/integration/test_flink.py +++ b/tests/integration/test_flink.py @@ -50,10 +50,7 @@ def test_flink_bake(minio): ] proc = subprocess.run(cmd, capture_output=True) - if proc.returncode != 0: - print(proc.stderr.decode()) - - assert proc.returncode == 0 # bail if proc did not succeed + assert proc.returncode == 0 # We should have some kinda 'has this completed?' check here # Instead, I just wait for 3min From 6babb04510ffc727766d7aee1aa45c0458d8588a Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 16:57:09 -0700 Subject: [PATCH 08/12] use beam-refactor ref for flink test --- tests/integration/test_flink.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_flink.py b/tests/integration/test_flink.py index a1c9fab7..1ea61e22 100644 --- a/tests/integration/test_flink.py +++ b/tests/integration/test_flink.py @@ -44,7 +44,7 @@ def test_flink_bake(minio): "--repo", "https://github.com/pangeo-forge/gpcp-feedstock.git", "--ref", - "2cde04745189665a1f5a05c9eae2a98578de8b7f", + "beam-refactor", "-f", f.name, ] From db67f2232a49087ff3885dc3bc01aa1cace5b4d8 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 3 Aug 2023 17:02:32 -0700 Subject: [PATCH 09/12] test flink against pforgetest repo --- tests/integration/test_flink.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_flink.py b/tests/integration/test_flink.py index 1ea61e22..65202afd 100644 --- a/tests/integration/test_flink.py +++ b/tests/integration/test_flink.py @@ -42,7 +42,7 @@ def test_flink_bake(minio): "pangeo-forge-runner", "bake", "--repo", - "https://github.com/pangeo-forge/gpcp-feedstock.git", + "https://github.com/pforgetest/gpcp-from-gcs-feedstock.git", "--ref", "beam-refactor", "-f", From 611a590371affd94e3241af34ad131e412998747 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 4 Aug 2023 15:57:17 -0700 Subject: [PATCH 10/12] dont pass parallelism args if they are none --- pangeo_forge_runner/bakery/flink.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index cf6ba36b..140b908b 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -258,6 +258,13 @@ def get_pipeline_options( print(f"You can run '{' '.join(cmd)}' to make the Flink Dashboard available!") + for k, v in dict( + parallelism=self.parallelism, + max_parallelism=self.max_parallelism, + ).items(): + if v: # if None, don't pass these options to Flink + extra_options |= {k: v} + # Set flags explicitly to empty so Apache Beam doesn't try to parse the commandline # for pipeline options - we have traitlets doing that for us. opts = dict( @@ -272,8 +279,6 @@ def get_pipeline_options( save_main_session=True, # this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/ pickle_library="cloudpickle", - parallelism=self.parallelism, - max_parallelism=self.max_parallelism, **extra_options, ) return PipelineOptions(**opts) From 1979e7e92eff4bdacd168a9af362de4d39abdc6a Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 8 Aug 2023 13:38:00 -0700 Subject: [PATCH 11/12] add flink pipeline opts test --- tests/unit/test_flink.py | 42 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 tests/unit/test_flink.py diff --git a/tests/unit/test_flink.py b/tests/unit/test_flink.py new file mode 100644 index 00000000..a5ec5f5b --- /dev/null +++ b/tests/unit/test_flink.py @@ -0,0 +1,42 @@ +from typing import Optional +from unittest.mock import patch + +import pytest + +from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery + + +@pytest.mark.parametrize("parallelism, max_parallelism", [(None, None), (100, 100)]) +def test_pipelineoptions( + parallelism: Optional[int], + max_parallelism: Optional[int], +): + """ + Quickly validate some of the PipelineOptions set + """ + fob = FlinkOperatorBakery() + fob.parallelism = parallelism + fob.max_parallelism = max_parallelism + + # FlinkOperatorBakery.get_pipeline_options calls `kubectl` in a subprocess, + # so we patch subprocess here to skip that behavior for this test + with patch("pangeo_forge_runner.bakery.flink.subprocess"): + po = fob.get_pipeline_options("job", "some-container:some-tag", {}) + # some flink args, e.g. 'parallelism', are apparently 'unknown_options' from + # the perspective of PipelineOptions, so we retain those here for the test. + # it doesn't seem like their 'unknown' status prevents them from being passed to + # flink in an actual deployment, though. + opts = po.get_all_options(retain_unknown_options=True) + + assert opts["flink_version"] == "1.15" + + for optional_arg, value in dict( + parallelism=parallelism, + max_parallelism=max_parallelism, + ).items(): + # if these args are not passed, we don't want them to appear in + # the pipeline opts, so we verify here that is actually happening. + if value is None: + assert optional_arg not in opts + else: + assert opts[optional_arg] == value From 1f4af180c789e296102f27f0510eaecb7dd3e4a6 Mon Sep 17 00:00:00 2001 From: YuviPanda Date: Fri, 18 Aug 2023 23:21:53 -0700 Subject: [PATCH 12/12] Cleanup how we pick beam or no-beam versions of recipes to test - Explicitly find the version we care about in a way that works for local installs too - current code doesn't clearly parse output of local .dev installs of pangeo-forge-recipes. - The fixture made it appear to be a general pattern in use across repos - but this tagging ref is actually only true for a specific repo. So move the parsing code close to where it is used, rather than in a more general place. --- tests/conftest.py | 15 --------------- tests/integration/test_dataflow_integration.py | 15 +++++++++++---- tests/unit/test_bake.py | 17 ++++++++++++----- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index a67ba59d..66049dfa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,18 +57,3 @@ def minio(local_ip): proc.wait() assert proc.returncode == 0 - - -@pytest.fixture -def recipes_version_ref(): - # FIXME: recipes version matrix is currently determined by github workflows matrix - # in the future, it should be set by pangeo-forge-runner venv feature? - pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines() - recipes_version = [ - p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes") - ][0] - # the recipes_version is a 3-element semantic version of form `0.A.B` where A is either minor - # version `9` or `10`. the test feedstock (pforgetest/gpcp-from-gcs-feedstock) has tags for - # each of these minor versions, of the format `0.A.x`, so we translate the installed version - # of pangeo-forge-recipes to one of the valid tags (either `0.9.x` or `0.10.x`) here. - return f"0.{recipes_version.split('.')[1]}.x" diff --git a/tests/integration/test_dataflow_integration.py b/tests/integration/test_dataflow_integration.py index 20a4f146..9b5aa54b 100644 --- a/tests/integration/test_dataflow_integration.py +++ b/tests/integration/test_dataflow_integration.py @@ -2,12 +2,19 @@ import subprocess import tempfile import time +from importlib.metadata import version import pytest import xarray as xr +from packaging.version import parse as parse_version -def test_dataflow_integration(recipes_version_ref): +def test_dataflow_integration(): + pfr_version = parse_version(version("pangeo-forge-recipes")) + if pfr_version >= parse_version("0.10"): + recipe_version_ref = "0.10.x" + else: + recipe_version_ref = "0.9.x" bucket = "gs://pangeo-forge-runner-ci-testing" config = { "Bake": { @@ -40,7 +47,7 @@ def test_dataflow_integration(recipes_version_ref): "--ref", # in the test feedstock, tags are named for the recipes version # which was used to write the recipe module - recipes_version_ref, + recipe_version_ref, "--json", "-f", f.name, @@ -93,8 +100,8 @@ def test_dataflow_integration(recipes_version_ref): # open the generated dataset with xarray! target_path = config["TargetStorage"]["root_path"].format(job_name=job_name) - if recipes_version_ref == "0.10.x": - # in pangeo-forge-recipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg + if pfr_version >= parse_version("0.10"): + # in pangeo-forge-eecipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg # is appended to the formatted root path at execution time. for ref `0.10.x`, # the value of that kwarg is "gpcp", so we append that here. target_path += "/gpcp" diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 6f3752bf..08defa0f 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -2,9 +2,11 @@ import re import subprocess import tempfile +from importlib.metadata import version import pytest import xarray as xr +from packaging.version import parse as parse_version from pangeo_forge_runner.commands.bake import Bake @@ -50,9 +52,7 @@ def test_job_name_validation(job_name, raises): [None, None, "special-name-for-job"], ), ) -def test_gpcp_bake( - minio, recipe_id, expected_error, custom_job_name, recipes_version_ref -): +def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): fsspec_args = { "key": minio["username"], "secret": minio["password"], @@ -86,6 +86,12 @@ def test_gpcp_bake( if custom_job_name: config["Bake"].update({"job_name": custom_job_name}) + pfr_version = parse_version(version("pangeo-forge-recipes")) + if pfr_version >= parse_version("0.10"): + recipe_version_ref = "0.10.x" + else: + recipe_version_ref = "0.9.x" + with tempfile.NamedTemporaryFile("w", suffix=".json") as f: json.dump(config, f) f.flush() @@ -97,7 +103,7 @@ def test_gpcp_bake( "--ref", # in the test feedstock, tags are named for the recipes version # which was used to write the recipe module - recipes_version_ref, + recipe_version_ref, "--json", "-f", f.name, @@ -126,7 +132,8 @@ def test_gpcp_bake( # root path itself. This is a compatibility break vs the previous # versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495 # has more information - if recipes_version_ref == "0.10.x": + + if pfr_version >= parse_version("0.10"): zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/" else: zarr_store_path = config["TargetStorage"]["root_path"]