Skip to content

Commit

Permalink
Merge pull request #114 from pangeo-forge/gcorradini/unpin-beam-fix-i…
Browse files Browse the repository at this point in the history
…ntegration-test

Fix Flink Integration Tests
  • Loading branch information
cisaacstern authored Nov 6, 2023
2 parents a4dd732 + 1d6198f commit b8571e1
Show file tree
Hide file tree
Showing 10 changed files with 401 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dataflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r dev-requirements.txt
python -m pip install -e .
python -m pip install -e .[dataflow]
python -m pip install -U ${{ matrix.recipes-version }}
# FIXME: should gcsfs actually be part of some optional installs in setup.py?
Expand Down
77 changes: 59 additions & 18 deletions .github/workflows/flink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,35 @@ jobs:
github.event.label.name == 'test-flink' ||
contains( github.event.pull_request.labels.*.name, 'test-all') ||
contains( github.event.pull_request.labels.*.name, 'test-flink')
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: [ "3.9", "3.10", "3.11" ]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
"pangeo-forge-recipes==0.10.3",
]
beam-version: [
"apache-beam==2.47.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "apache-beam==2.48.0",
# "apache-beam==2.49.0",
# "apache-beam==2.50.0",
"apache-beam==2.51.0",
]
# keep here to be explicit for future users what version
# of Flink we are supporting
flink-version: [ "1.16", ]

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}

# Starts a k8s cluster with NetworkPolicy enforcement and installs both
# kubectl and helm
Expand Down Expand Up @@ -64,43 +88,60 @@ jobs:
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'


- name: Set up Python 3.9
uses: actions/setup-python@v3
with:
python-version: 3.9

- name: 'Setup minio + mc'
- name: 'Setup minio cli mc'
run: |
wget --quiet https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x minio
mv minio /usr/local/bin/minio
wget --quiet https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/mc
minio --version
mc --version
- name: Setup "${{ matrix.beam-version }}" to satisfy pf-runner installs
run: |
pip install ${{ matrix.beam-version }}
- name: Install dependencies & our package
run: |
python -m pip install --upgrade pip
python -m pip install -r dev-requirements.txt
python -m pip install -e .
python -m pip install -e .[flink]
- name: Set up min.io as a k3s service
run: |
MYACCESSKEY=$(openssl rand -hex 16)
MYSECRETKEY=$(openssl rand -hex 16)
kubectl create secret generic minio-secrets --from-literal=MINIO_ACCESS_KEY=$MYACCESSKEY --from-literal=MINIO_SECRET_KEY=$MYSECRETKEY
kubectl apply -f tests/integration/flink/minio-manifest.yaml
- name: Install socat so kubectl port-forward will work
run: |
# Not sure if this is why kubectl proxy isn't working, but let's try
sudo apt update --yes && sudo apt install --yes socat
- name: Test with pytest
id: testrunner
continue-on-error: true
run: |
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/test_flink.py
kubectl get pod -A
kubectl describe pod
beamversion=$(echo ${{ matrix.beam-version }} | cut -d"=" -f3)
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/flink/test_flink_integration.py \
--flinkversion=${{ matrix.flink-version }} \
--pythonversion=${{ matrix.python-version }} \
--beamversion=$beamversion
- name: Dump logs from pods after failure so we don't have ssh
if: steps.testrunner.outcome == 'failure'
run: |
echo "The previous step failed or timed out. Dumping logs..."
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v manager | grep recipe- | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### TASK MANAGER ######################"
kubectl get pod | grep manager | head -n1 | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
# force GH action to show failed result
exit 128
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v2
2 changes: 1 addition & 1 deletion pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class FlinkOperatorBakery(Bakery):
blocking = True

flink_version = Unicode(
"1.15",
"1.16",
config=True,
help="""
Version of Flink to use.
Expand Down
21 changes: 18 additions & 3 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from .. import Feedstock
from ..bakery.base import Bakery
from ..bakery.flink import FlinkOperatorBakery
from ..bakery.local import LocalDirectBakery
from ..plugin import get_injections, get_injectionspecs_from_entrypoints
from ..storage import InputCacheStorage, MetadataCacheStorage, TargetStorage
Expand Down Expand Up @@ -108,15 +109,29 @@ def _validate_job_name(self, proposal):
help="""
Container image to use for this job.
Defaults to letting beam automatically figure out the image to use,
For GCP DataFlow leaving it blank defaults to letting beam
automatically figure out the image to use for the workers
based on version of beam and python in use.
Should be accessible to whatever Beam runner is being used.
For Flink it's required that you pass an beam image
for the version of python and beam you are targeting
for example: apache/beam_python3.10_sdk:2.51.0
more info: https://hub.docker.com/layers/apache/
Note that some runners (like the local one) may not support this!
""",
)

@validate("container_image")
def _validate_container_image(self, proposal):
if self.bakery_class == FlinkOperatorBakery and not proposal.value:
raise ValueError(
"'container_name' is required when using the 'FlinkOperatorBakery' "
"for the version of python and apache-beam you are targeting. "
"See the sdk images available: https://hub.docker.com/layers/apache/"
)
return proposal.value

def autogenerate_job_name(self):
"""
Autogenerate a readable job_name
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
"escapism",
"jsonschema",
"traitlets",
"apache-beam[gcp]",
"importlib-metadata",
],
extras_require={
"dataflow": ["apache-beam[gcp]"],
"flink": ["apache-beam>=2.47.0"],
},
entry_points={
"console_scripts": ["pangeo-forge-runner=pangeo_forge_runner.cli:main"]
},
Expand Down
64 changes: 64 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import os
import secrets
import signal
Expand Down Expand Up @@ -57,3 +58,66 @@ def minio(local_ip):
proc.wait()

assert proc.returncode == 0


@pytest.fixture(scope="session")
def minio_service():
cmd = [
"kubectl",
"get",
"service/minio-service",
"-o=jsonpath='{.spec.clusterIP}:{.spec.ports[0].port}'",
]
proc = subprocess.run(cmd, capture_output=True, text=True)
assert proc.returncode == 0
svc_address = proc.stdout.strip('"').strip("'")
endpoint = f"http://{svc_address}"

cmd = [
"kubectl",
"get",
"secret/minio-secrets",
"-o=jsonpath='{.data.MINIO_ACCESS_KEY}'",
]
proc = subprocess.run(cmd, capture_output=True, text=True)
assert proc.returncode == 0
myaccesskey = proc.stdout
myaccesskey = base64.b64decode(myaccesskey).decode()

cmd = [
"kubectl",
"get",
"secret/minio-secrets",
"-o=jsonpath='{.data.MINIO_SECRET_KEY}'",
]
proc = subprocess.run(cmd, capture_output=True, text=True)
assert proc.returncode == 0
mysecretkey = proc.stdout
mysecretkey = base64.b64decode(mysecretkey).decode()

# enter
yield {"endpoint": endpoint, "username": myaccesskey, "password": mysecretkey}

# exit
return


def pytest_addoption(parser):
parser.addoption("--flinkversion", action="store", default="1.16")
parser.addoption("--pythonversion", action="store", default="3.9")
parser.addoption("--beamversion", action="store", default="2.47.0")


@pytest.fixture
def flinkversion(request):
return request.config.getoption("--flinkversion")


@pytest.fixture
def pythonversion(request):
return request.config.getoption("--pythonversion")


@pytest.fixture
def beamversion(request):
return request.config.getoption("--beamversion")
68 changes: 68 additions & 0 deletions tests/integration/flink/minio-manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: minio-pv-claim
labels:
app: minio-storage-claim
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
spec:
selector:
matchLabels:
app: minio-server
strategy:
type: Recreate
template:
metadata:
labels:
app: minio-server
spec:
containers:
- name: minio
image: minio/minio
args:
- server
- /data
env:
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secrets
key: MINIO_ACCESS_KEY
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secrets
key: MINIO_SECRET_KEY
ports:
- containerPort: 9000
name: minio
volumeMounts:
- name: data
mountPath: "/data"
volumes:
- name: data
persistentVolumeClaim:
claimName: minio-pv-claim
---
apiVersion: v1
kind: Service
metadata:
name: minio-service
spec:
type: ClusterIP
ports:
- port: 9000
targetPort: 9000
protocol: TCP
selector:
app: minio-server
Loading

0 comments on commit b8571e1

Please sign in to comment.