Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into add-release-yaml
Browse files Browse the repository at this point in the history
  • Loading branch information
cisaacstern committed Aug 25, 2023
2 parents df635f2 + 5b2faba commit f5c4db0
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:

- name: Setup FlinkOperator
run: |
FLINK_OPERATOR_VERSION=1.3.0
FLINK_OPERATOR_VERSION=1.5.0
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-${FLINK_OPERATOR_VERSION}
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --wait
Expand Down
31 changes: 30 additions & 1 deletion pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import escapism
from apache_beam.pipeline import PipelineOptions
from traitlets import Dict, Unicode
from traitlets import Dict, Integer, Unicode

from .base import Bakery

Expand Down Expand Up @@ -128,6 +128,28 @@ class FlinkOperatorBakery(Bakery):
""",
)

parallelism = Integer(
None,
allow_none=True,
config=True,
help="""
The degree of parallelism to be used when distributing operations onto workers.
If the parallelism is not set, the configured Flink default is used,
or 1 if none can be found.
""",
)

max_parallelism = Integer(
None,
allow_none=True,
config=True,
help="""
The pipeline wide maximum degree of parallelism to be used.
The maximum parallelism specifies the upper limit for dynamic scaling
and the number of key groups used for partitioned state.
""",
)

def make_flink_deployment(self, name: str, worker_image: str):
"""
Return YAML for a FlinkDeployment
Expand Down Expand Up @@ -236,6 +258,13 @@ def get_pipeline_options(

print(f"You can run '{' '.join(cmd)}' to make the Flink Dashboard available!")

for k, v in dict(
parallelism=self.parallelism,
max_parallelism=self.max_parallelism,
).items():
if v: # if None, don't pass these options to Flink
extra_options |= {k: v}

# Set flags explicitly to empty so Apache Beam doesn't try to parse the commandline
# for pipeline options - we have traitlets doing that for us.
opts = dict(
Expand Down
15 changes: 14 additions & 1 deletion pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
"""
import os
import re
import string
import time
from pathlib import Path

import escapism
from apache_beam import Pipeline, PTransform
from traitlets import Bool, Type, Unicode, validate

Expand Down Expand Up @@ -113,8 +115,19 @@ def autogenerate_job_name(self):
Autogenerate a readable job_name
"""
# special case local checkouts, as no contentprovider is used
safe_chars = string.ascii_lowercase + string.digits
if os.path.exists(self.repo):
return f"local-{os.path.basename(self.repo)}"
name = "local-"
name += escapism.escape(
os.path.basename(os.path.abspath(self.repo)),
safe=safe_chars,
escape_char="-",
)
if self.feedstock_subdir != "feedstock":
name += "-" + escapism.escape(
self.feedstock_subdir, safe=safe_chars, escape_char="-"
)
return name.lower()

# special-case github because it is so common
if self.repo.startswith("https://github.com/"):
Expand Down
24 changes: 23 additions & 1 deletion pangeo_forge_runner/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pythonjsonlogger import jsonlogger
from repo2docker import contentproviders
from traitlets import Bool, Instance, List, Unicode
from traitlets import Bool, Dict, Instance, List, Unicode
from traitlets.config import Application

# Common aliases we want to support in *all* commands
Expand Down Expand Up @@ -41,6 +41,21 @@ class BaseCommand(Application):

log_level = logging.INFO

logging_config = Dict(
{},
config=True,
help="""
Logging configuration for this python application.
When set, this value is passed to logging.config.dictConfig,
and can be used to configure how logs *throughout the application*
are handled, not just for logs from this application alone.
See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig
for more details.
""",
)

repo = Unicode(
"",
config=True,
Expand Down Expand Up @@ -199,6 +214,13 @@ def initialize(self, argv=None):
# Load traitlets config from a config file if present
self.load_config_file(self.config_file)

# Allow arbitrary logging config if set
# We do this first up so any custom logging we set up ourselves
# is not affected, as by default dictConfig will replace all
# existing config.
if self.logging_config:
logging.config.dictConfig(self.logging_config)

# The application communicates with the outside world via
# stdout, and we structure this communication via logging.
# So let's setup the default logger to log to stdout, rather
Expand Down
15 changes: 0 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,3 @@ def minio(local_ip):
proc.wait()

assert proc.returncode == 0


@pytest.fixture
def recipes_version_ref():
# FIXME: recipes version matrix is currently determined by github workflows matrix
# in the future, it should be set by pangeo-forge-runner venv feature?
pip_list = subprocess.check_output("pip list".split()).decode("utf-8").splitlines()
recipes_version = [
p.split()[-1] for p in pip_list if p.startswith("pangeo-forge-recipes")
][0]
# the recipes_version is a 3-element semantic version of form `0.A.B` where A is either minor
# version `9` or `10`. the test feedstock (pforgetest/gpcp-from-gcs-feedstock) has tags for
# each of these minor versions, of the format `0.A.x`, so we translate the installed version
# of pangeo-forge-recipes to one of the valid tags (either `0.9.x` or `0.10.x`) here.
return f"0.{recipes_version.split('.')[1]}.x"
15 changes: 11 additions & 4 deletions tests/integration/test_dataflow_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
import subprocess
import tempfile
import time
from importlib.metadata import version

import pytest
import xarray as xr
from packaging.version import parse as parse_version


def test_dataflow_integration(recipes_version_ref):
def test_dataflow_integration():
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = "0.10.x"
else:
recipe_version_ref = "0.9.x"
bucket = "gs://pangeo-forge-runner-ci-testing"
config = {
"Bake": {
Expand Down Expand Up @@ -40,7 +47,7 @@ def test_dataflow_integration(recipes_version_ref):
"--ref",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipes_version_ref,
recipe_version_ref,
"--json",
"-f",
f.name,
Expand Down Expand Up @@ -93,8 +100,8 @@ def test_dataflow_integration(recipes_version_ref):

# open the generated dataset with xarray!
target_path = config["TargetStorage"]["root_path"].format(job_name=job_name)
if recipes_version_ref == "0.10.x":
# in pangeo-forge-recipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg
if pfr_version >= parse_version("0.10"):
# in pangeo-forge-eecipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg
# is appended to the formatted root path at execution time. for ref `0.10.x`,
# the value of that kwarg is "gpcp", so we append that here.
target_path += "/gpcp"
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def test_flink_bake(minio):
"pangeo-forge-runner",
"bake",
"--repo",
"https://github.com/pangeo-forge/gpcp-feedstock.git",
"https://github.com/pforgetest/gpcp-from-gcs-feedstock.git",
"--ref",
"2cde04745189665a1f5a05c9eae2a98578de8b7f",
"beam-refactor",
"-f",
f.name,
]
proc = subprocess.run(cmd)
proc = subprocess.run(cmd, capture_output=True)

assert proc.returncode == 0

Expand Down
17 changes: 12 additions & 5 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import re
import subprocess
import tempfile
from importlib.metadata import version

import pytest
import xarray as xr
from packaging.version import parse as parse_version

from pangeo_forge_runner.commands.bake import Bake

Expand Down Expand Up @@ -50,9 +52,7 @@ def test_job_name_validation(job_name, raises):
[None, None, "special-name-for-job"],
),
)
def test_gpcp_bake(
minio, recipe_id, expected_error, custom_job_name, recipes_version_ref
):
def test_gpcp_bake(minio, recipe_id, expected_error, custom_job_name):
fsspec_args = {
"key": minio["username"],
"secret": minio["password"],
Expand Down Expand Up @@ -86,6 +86,12 @@ def test_gpcp_bake(
if custom_job_name:
config["Bake"].update({"job_name": custom_job_name})

pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = "0.10.x"
else:
recipe_version_ref = "0.9.x"

with tempfile.NamedTemporaryFile("w", suffix=".json") as f:
json.dump(config, f)
f.flush()
Expand All @@ -97,7 +103,7 @@ def test_gpcp_bake(
"--ref",
# in the test feedstock, tags are named for the recipes version
# which was used to write the recipe module
recipes_version_ref,
recipe_version_ref,
"--json",
"-f",
f.name,
Expand Down Expand Up @@ -126,7 +132,8 @@ def test_gpcp_bake(
# root path itself. This is a compatibility break vs the previous
# versions of pangeo-forge-recipes. https://github.com/pangeo-forge/pangeo-forge-recipes/pull/495
# has more information
if recipes_version_ref == "0.10.x":

if pfr_version >= parse_version("0.10"):
zarr_store_path = config["TargetStorage"]["root_path"] + "gpcp/"
else:
zarr_store_path = config["TargetStorage"]["root_path"]
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/test_flink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Optional
from unittest.mock import patch

import pytest

from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery


@pytest.mark.parametrize("parallelism, max_parallelism", [(None, None), (100, 100)])
def test_pipelineoptions(
parallelism: Optional[int],
max_parallelism: Optional[int],
):
"""
Quickly validate some of the PipelineOptions set
"""
fob = FlinkOperatorBakery()
fob.parallelism = parallelism
fob.max_parallelism = max_parallelism

# FlinkOperatorBakery.get_pipeline_options calls `kubectl` in a subprocess,
# so we patch subprocess here to skip that behavior for this test
with patch("pangeo_forge_runner.bakery.flink.subprocess"):
po = fob.get_pipeline_options("job", "some-container:some-tag", {})
# some flink args, e.g. 'parallelism', are apparently 'unknown_options' from
# the perspective of PipelineOptions, so we retain those here for the test.
# it doesn't seem like their 'unknown' status prevents them from being passed to
# flink in an actual deployment, though.
opts = po.get_all_options(retain_unknown_options=True)

assert opts["flink_version"] == "1.15"

for optional_arg, value in dict(
parallelism=parallelism,
max_parallelism=max_parallelism,
).items():
# if these args are not passed, we don't want them to appear in
# the pipeline opts, so we verify here that is actually happening.
if value is None:
assert optional_arg not in opts
else:
assert opts[optional_arg] == value

0 comments on commit f5c4db0

Please sign in to comment.