Skip to content

Save Job History on Flink #432

Save Job History on Flink

Save Job History on Flink #432

Workflow file for this run

name: Run Flink Test
branches: [ "main" ]
branches: [ "main" ]
types: [ opened, reopened, synchronize, labeled ]
- cron: '0 4 * * *' # run once a day at 4 AM
# run on:
# - all pushes to main
# - schedule defined above
# - a PR was just labeled 'test-flink' or 'test-all'
# - a PR with 'test-flink' or 'test-all' label was opened, reopened, or synchronized
if: |
github.event_name == 'push' ||
github.event_name == 'schedule' || == 'test-all' || == 'test-flink' ||
contains( github.event.pull_request.labels.*.name, 'test-all') ||
contains( github.event.pull_request.labels.*.name, 'test-flink')
runs-on: ubuntu-latest
fail-fast: false
python-version: [ "3.9", "3.10", "3.11" ]
recipes-version: [
beam-version: [
# save some cycles and infer it might
# work on all versions between lowest and highest
# "apache-beam==2.48.0",
# "apache-beam==2.49.0",
# "apache-beam==2.50.0",
# keep here to be explicit for future users what version
# of Flink we are supporting
flink-version: [ "1.16", ]
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
python-version: ${{ matrix.python-version }}
# Starts a k8s cluster with NetworkPolicy enforcement and installs both
# kubectl and helm
# ref:
- uses: jupyterhub/action-k3s-helm@v3
metrics-enabled: false
traefik-enabled: false
docker-enabled: true
- name: Setup CertManager
run: |
# Setup cert-manager, required by Flink Operator
kubectl apply -f${CERT_MANAGER_VERSION}/cert-manager.yaml
- name: Wait for CertManager to be ready
uses: jupyterhub/action-k8s-await-workloads@v1
timeout: 150
max-restarts: 1
namespace: cert-manager
- name: Setup FlinkOperator
run: |
helm repo add flink-operator-repo${FLINK_OPERATOR_VERSION}
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --wait
kubectl get pod -A
kubectl get crd -A
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
- name: 'Setup minio cli mc'
run: |
wget --quiet
chmod +x mc
mv mc /usr/local/bin/mc
mc --version
- name: Setup "${{ matrix.beam-version }}" to satisfy pf-runner installs
run: |
pip install ${{ matrix.beam-version }}
- name: Install dependencies & our package
run: |
python -m pip install --upgrade pip
python -m pip install -r dev-requirements.txt
python -m pip install -e .[flink]
- name: Set up as a k3s service
run: |
MYACCESSKEY=$(openssl rand -hex 16)
MYSECRETKEY=$(openssl rand -hex 16)
kubectl create secret generic minio-secrets --from-literal=MINIO_ACCESS_KEY=$MYACCESSKEY --from-literal=MINIO_SECRET_KEY=$MYSECRETKEY
kubectl apply -f tests/integration/flink/minio-manifest.yaml
- name: Install socat so kubectl port-forward will work
run: |
# Not sure if this is why kubectl proxy isn't working, but let's try
sudo apt update --yes && sudo apt install --yes socat
- name: Test with pytest
id: testrunner
continue-on-error: true
run: |
beamversion=$(echo ${{ matrix.beam-version }} | cut -d"=" -f3)
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/flink/ \
--flinkversion=${{ matrix.flink-version }} \
--pythonversion=${{ matrix.python-version }} \
- name: Dump logs from pods after failure so we don't have ssh
if: steps.testrunner.outcome == 'failure'
run: |
echo "The previous step failed or timed out. Dumping logs..."
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v manager | grep recipe- | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### TASK MANAGER ######################"
kubectl get pod | grep manager | head -n1 | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
# force GH action to show failed result
exit 128
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v2