diff --git a/.github/workflows/flink.yaml b/.github/workflows/flink.yaml index 81a04284..cd71dccb 100644 --- a/.github/workflows/flink.yaml +++ b/.github/workflows/flink.yaml @@ -39,7 +39,7 @@ jobs: - name: Setup FlinkOperator run: | - FLINK_OPERATOR_VERSION=1.3.0 + FLINK_OPERATOR_VERSION=1.5.0 helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-${FLINK_OPERATOR_VERSION} helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --wait diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index adde2108..140b908b 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -10,7 +10,7 @@ import escapism from apache_beam.pipeline import PipelineOptions -from traitlets import Dict, Unicode +from traitlets import Dict, Integer, Unicode from .base import Bakery @@ -128,6 +128,28 @@ class FlinkOperatorBakery(Bakery): """, ) + parallelism = Integer( + None, + allow_none=True, + config=True, + help=""" + The degree of parallelism to be used when distributing operations onto workers. + If the parallelism is not set, the configured Flink default is used, + or 1 if none can be found. + """, + ) + + max_parallelism = Integer( + None, + allow_none=True, + config=True, + help=""" + The pipeline wide maximum degree of parallelism to be used. + The maximum parallelism specifies the upper limit for dynamic scaling + and the number of key groups used for partitioned state. + """, + ) + def make_flink_deployment(self, name: str, worker_image: str): """ Return YAML for a FlinkDeployment @@ -236,6 +258,13 @@ def get_pipeline_options( print(f"You can run '{' '.join(cmd)}' to make the Flink Dashboard available!") + for k, v in dict( + parallelism=self.parallelism, + max_parallelism=self.max_parallelism, + ).items(): + if v: # if None, don't pass these options to Flink + extra_options |= {k: v} + # Set flags explicitly to empty so Apache Beam doesn't try to parse the commandline # for pipeline options - we have traitlets doing that for us. opts = dict( diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 03e31913..1a09015c 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -3,9 +3,11 @@ """ import os import re +import string import time from pathlib import Path +import escapism from apache_beam import Pipeline, PTransform from traitlets import Bool, Type, Unicode, validate @@ -113,8 +115,19 @@ def autogenerate_job_name(self): Autogenerate a readable job_name """ # special case local checkouts, as no contentprovider is used + safe_chars = string.ascii_lowercase + string.digits if os.path.exists(self.repo): - return f"local-{os.path.basename(self.repo)}" + name = "local-" + name += escapism.escape( + os.path.basename(os.path.abspath(self.repo)), + safe=safe_chars, + escape_char="-", + ) + if self.feedstock_subdir != "feedstock": + name += "-" + escapism.escape( + self.feedstock_subdir, safe=safe_chars, escape_char="-" + ) + return name.lower() # special-case github because it is so common if self.repo.startswith("https://github.com/"): diff --git a/pangeo_forge_runner/commands/base.py b/pangeo_forge_runner/commands/base.py index 63e8d492..30c29593 100644 --- a/pangeo_forge_runner/commands/base.py +++ b/pangeo_forge_runner/commands/base.py @@ -7,7 +7,7 @@ from pythonjsonlogger import jsonlogger from repo2docker import contentproviders -from traitlets import Bool, Instance, List, Unicode +from traitlets import Bool, Dict, Instance, List, Unicode from traitlets.config import Application # Common aliases we want to support in *all* commands @@ -41,6 +41,21 @@ class BaseCommand(Application): log_level = logging.INFO + logging_config = Dict( + {}, + config=True, + help=""" + Logging configuration for this python application. + + When set, this value is passed to logging.config.dictConfig, + and can be used to configure how logs *throughout the application* + are handled, not just for logs from this application alone. + + See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig + for more details. + """, + ) + repo = Unicode( "", config=True, @@ -199,6 +214,13 @@ def initialize(self, argv=None): # Load traitlets config from a config file if present self.load_config_file(self.config_file) + # Allow arbitrary logging config if set + # We do this first up so any custom logging we set up ourselves + # is not affected, as by default dictConfig will replace all + # existing config. + if self.logging_config: + logging.config.dictConfig(self.logging_config) + # The application communicates with the outside world via # stdout, and we structure this communication via logging. # So let's setup the default logger to log to stdout, rather diff --git a/tests/conftest.py b/tests/conftest.py index a67ba59d..66049dfa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,18 +57,3 @@ def minio(local_ip): proc.wait() assert proc.returncode == 0 - - -@pytest.fixture -def recipes_version_ref(): - # FIXME: recipes version matrix is currently determined by github workflows matrix - # in the future, it should be set by pangeo-forge-runner venv feature? - pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines() - recipes_version = [ - p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes") - ][0] - # the recipes_version is a 3-element semantic version of form `0.A.B` where A is either minor - # version `9` or `10`. the test feedstock (pforgetest/gpcp-from-gcs-feedstock) has tags for - # each of these minor versions, of the format `0.A.x`, so we translate the installed version - # of pangeo-forge-recipes to one of the valid tags (either `0.9.x` or `0.10.x`) here. - return f"0.{recipes_version.split('.')[1]}.x" diff --git a/tests/integration/test_dataflow_integration.py b/tests/integration/test_dataflow_integration.py index 20a4f146..9b5aa54b 100644 --- a/tests/integration/test_dataflow_integration.py +++ b/tests/integration/test_dataflow_integration.py @@ -2,12 +2,19 @@ import subprocess import tempfile import time +from importlib.metadata import version import pytest import xarray as xr +from packaging.version import parse as parse_version -def test_dataflow_integration(recipes_version_ref): +def test_dataflow_integration(): + pfr_version = parse_version(version("pangeo-forge-recipes")) + if pfr_version >= parse_version("0.10"): + recipe_version_ref = "0.10.x" + else: + recipe_version_ref = "0.9.x" bucket = "gs://pangeo-forge-runner-ci-testing" config = { "Bake": { @@ -40,7 +47,7 @@ def test_dataflow_integration(recipes_version_ref): "--ref", # in the test feedstock, tags are named for the recipes version # which was used to write the recipe module - recipes_version_ref, + recipe_version_ref, "--json", "-f", f.name, @@ -93,8 +100,8 @@ def test_dataflow_integration(recipes_version_ref): # open the generated dataset with xarray! target_path = config["TargetStorage"]["root_path"].format(job_name=job_name) - if recipes_version_ref == "0.10.x": - # in pangeo-forge-recipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg + if pfr_version >= parse_version("0.10"): + # in pangeo-forge-eecipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg # is appended to the formatted root path at execution time. for ref `0.10.x`, # the value of that kwarg is "gpcp", so we append that here. target_path += "/gpcp" diff --git a/tests/integration/test_flink.py b/tests/integration/test_flink.py index 631c44e3..65202afd 100644 --- a/tests/integration/test_flink.py +++ b/tests/integration/test_flink.py @@ -42,13 +42,13 @@ def test_flink_bake(minio): "pangeo-forge-runner", "bake", "--repo", - "https://github.com/pangeo-forge/gpcp-feedstock.git", + "https://github.com/pforgetest/gpcp-from-gcs-feedstock.git", "--ref", - "2cde04745189665a1f5a05c9eae2a98578de8b7f", + "beam-refactor", "-f", f.name, ] - proc = subprocess.run(cmd) + proc = subprocess.run(cmd, capture_output=True) assert proc.returncode == 0 diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 6f3752bf..08defa0f 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -2,9 +2,11 @@ import re import subprocess import tempfile +from importlib.metadata import version import pytest import xarray as xr +from packaging.version import parse as parse_version from pangeo_forge_runner.commands.bake import Bake @@ -50,9 +52,7 @@ def test_job_name_validation(job_name, raises): [None, None, "special-name-for-job"], ), ) -def test_gpcp_bake( - minio, recipe_id, expected_error, custom_job_name, recipes_version_ref -): +def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name): fsspec_args = { "key": minio["username"], "secret": minio["password"], @@ -86,6 +86,12 @@ def test_gpcp_bake( if custom_job_name: config["Bake"].update({"job_name": custom_job_name}) + pfr_version = parse_version(version("pangeo-forge-recipes")) + if pfr_version >= parse_version("0.10"): + recipe_version_ref = "0.10.x" + else: + recipe_version_ref = "0.9.x" + with tempfile.NamedTemporaryFile("w", suffix=".json") as f: json.dump(config, f) f.flush() @@ -97,7 +103,7 @@ def test_gpcp_bake( "--ref", # in the test feedstock, tags are named for the recipes version # which was used to write the recipe module - recipes_version_ref, + recipe_version_ref, "--json", "-f", f.name, @@ -126,7 +132,8 @@ def test_gpcp_bake( # root path itself. This is a compatibility break vs the previous # versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495 # has more information - if recipes_version_ref == "0.10.x": + + if pfr_version >= parse_version("0.10"): zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/" else: zarr_store_path = config["TargetStorage"]["root_path"] diff --git a/tests/unit/test_flink.py b/tests/unit/test_flink.py new file mode 100644 index 00000000..a5ec5f5b --- /dev/null +++ b/tests/unit/test_flink.py @@ -0,0 +1,42 @@ +from typing import Optional +from unittest.mock import patch + +import pytest + +from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery + + +@pytest.mark.parametrize("parallelism, max_parallelism", [(None, None), (100, 100)]) +def test_pipelineoptions( + parallelism: Optional[int], + max_parallelism: Optional[int], +): + """ + Quickly validate some of the PipelineOptions set + """ + fob = FlinkOperatorBakery() + fob.parallelism = parallelism + fob.max_parallelism = max_parallelism + + # FlinkOperatorBakery.get_pipeline_options calls `kubectl` in a subprocess, + # so we patch subprocess here to skip that behavior for this test + with patch("pangeo_forge_runner.bakery.flink.subprocess"): + po = fob.get_pipeline_options("job", "some-container:some-tag", {}) + # some flink args, e.g. 'parallelism', are apparently 'unknown_options' from + # the perspective of PipelineOptions, so we retain those here for the test. + # it doesn't seem like their 'unknown' status prevents them from being passed to + # flink in an actual deployment, though. + opts = po.get_all_options(retain_unknown_options=True) + + assert opts["flink_version"] == "1.15" + + for optional_arg, value in dict( + parallelism=parallelism, + max_parallelism=max_parallelism, + ).items(): + # if these args are not passed, we don't want them to appear in + # the pipeline opts, so we verify here that is actually happening. + if value is None: + assert optional_arg not in opts + else: + assert opts[optional_arg] == value